Jak sygnalizować ukończone przetwarzanie wszystkich message z Kafki?

0

Mam main-service do którego użytkownik dodaje "taski". Te taski to tak naprawdę instancje wzorca Command.

W uproszczonej postaci gromadzone taski mogą wyglądać tak:

tasks = [
{ "operation": "foo", "config": {} },
{ "operation": "foo", "config": {} },
{ "operation": "bar" },
]

Użytkownik jak w pewnym momencie stwierdzi, że już dodał te taski które chciał, to może uruchomić ich wszystkich przetwarzanie. Przetwarzanie ma się odbywać przez odpowiadające serwisy: foo-service, bar-service.

W momencie zasygnalizowania chęci uruchomienia przetwarzania, planuję żeby main-service wsadziło na Kafkę wszystkie te zgromadzone taski. Taski foo lecą na foo-topic i będą konsumowane przez foo-service. Analogicznie z taskami bar.

Przetworzenie taska polega na wykonaniu GET do jeszcze innego serwisu który zwraca Listę, później in memory transformacji tej listy, i zapisaniu wyniku do np. pliku o losowym uuid nazwie.

Wykonywanie tasków jest idempotentne i mogę je wykonywać w dowolnej kolejności.

Czyli mój foo-service będzie listenował na foo-topic i po kolei przetwarzał swoje foo taski, itd.

Pytanie:

main-service musi się dowiedzieć, że skończyły się już przetwarzać wszystkie taski. Jak to zrobić? A co jeśli przetworzenie jakiegoś tasku się nie uda?

Myślałem o zrobieniu status-topic do którego foo-service, bar-service będą wysyłały informacje o każdym pomyślnie ukończonym tasku. Mogą wysyłać uuid taska na ten topic. Następnie main-service będzie z tego topica ściągał to i jakoś (jak?) zliczał czy wszystkie taski się wykonały np. usuwając z otrzymane uuid z seta uuid. A sam w sobie set uuid miałbym może w Redisie i tworzyłbym go w momencie publikacji wszystkich tasków na kafkę.

Co tutaj mogę poprawić albo całkowicie zmienić w podejściu?

1

@Legenda:

Coś niekonsekwentnego czuję w samej idei.
Jesli są idempotentne, a w ogole są zalezne? A co, gdyby się jeden uwalił (jakieś "wrong data exception"), albo techninik wypiął wtyczkę z routera

... więc czy ta zbiorcza odpowiedź ma sens

Nie okreslasz scenariusza, wiec taki, jaki ja wtórnie sobie wyobrażam: przyszły niezaleznie, posiadają indywidulane statusy
Jeśli tworzą byt wyższego rzędu, to powinno to być zawarte konsekwentnie od poczatku projektowania do końca

0

Ja bym to zrobił tak, że na Kafke by leciał tylko event mówiący o chęci zaczęcia przetwarzania, z jakimś swoim idkiem. Z kolei wszystkie komendy, zanim je wykonam, zapisywałbym w bazie ze statusem (SUBMITTED, IN_PROGRESS, DONE etc), i w momencie otrzymania eventu zlecającego start, kolejne workery (konsumery) pobierają sobie komendy z danym id. Wmiędzyczasie zrobiłbym jakiegoś async schedulera który by szukał w bazie czy są jakiekolwiek eventy nie-done dla danego id, i jeśli nie byłoby takich, to by emitował ostateczny event w stylu all-commands-processed-event a ten main-service by na to nasłuchiwał.

Nie wiem czy to optymalne czy nie, ale zawsze daję z siebie 30%.

1

Widzę trzy podejścia:

  1. Synchroniczne, tj. przetwarzanie krok po kroku robi jakiś update na bazie danych, a niezależny proces robi polling. Analogicznie jak w urzędzie - składasz wniosek o paszport i dzwonisz co drugi dzień z pytaniem, czy już jest gotowy. W przypadku grupy tasków tworzysz oczywiście odpowiednią strukturę, która będzie zawierała taski podrzędne.

  2. Asynchronicznie #1 - czyli tworzymy sobie topic z wiadomościami typu main #1 foo #1 task submitted, main#1 bar #2 task finished (czyli każda wiadomość zawiera informację do jakiego koszyka należy, i jaki jest status). Odpowiedni consumer sprawdza, czy przyszły już wszystkie wiadomości tego typu - i kiedy wszystko się zgadza to wypluwa dane.

  3. Asynchronicznie #2 - czyli korzystamy z Kafka Streams API (lub inne narzędzie do przetwarzania strumieniowego z okienkami).

Wymaga to pewnej wiedzy, ale jeśli założymy taką strukturę:

class MainTask {
  Integer mainId;
  Integer[] footaskIds;
}

class FooTask {
  Integer mainId;
  Integer footaskId;
}

class MainTaskResult {
  Integer mainId;
  Integer[] completedFootaskIds;
}

Tworzymy trzy topici: service.main.result, service.main i service.foo.result. Na starcie na service.main leci informacja o rozpoczęciu zadania, a na service.foo.success trafiają wiadomości po zakończeniu danego subtaska.

I teraz tworzymy sobie apkę, która będzie joinowała oba topiki w następujący sposób:

  • przy otrzymaniu wiadomości z service.main stworzy sobie okienko (ang. window) i będzie czekała, aż przyjdą wszystkie wiadomości na service.foo.success aż do momentu, w którym wszystkie elementy przyjdą lub jakiś timeout
  • kiedy już skończy się oczekiwanie na te dane to wyemituje wiadomość MainTaskResult na service.main.result.

Oczywiście struktury mogą być inne, jak i można też tworzyć sporo etapów pośrednich, a błędy można obsłużyć w zależności do wymagań biznesowych.

  1. Asynchronicznie #2-1 - wykorzystać Akkę (lub inny silnik aktorski) do realizacji logiki podobnej jak z punktu trzeciego.

Wybór rozwiązania tak naprawdę zależy od twojej sytuacji.
Pkt. 1 - całkiem dobre podejście, niby "prostackie", ale jeśli nie masz wielkiego ruchu to da radę.
Pkt. 2 - spowoduje, że na kolejce będziesz miał dużo nadmiernych danych.
Pkt. 3 - to w cholerę roboty jak nie bawiłeś się w tego typu rzeczy i korzystałbym z tego tylko wtedy, gdy takich subtasków jest od cholery.
Pkt. 4 - oznacza, że jak rypnie ci się serwer to wszystkie informacje albo będą przetwarzane od nowa, albo lecą do śmieci.

0

@wartek01: Co to znaczy, że 'przyjdą wszystkie wiadomości'? Stream z definicji jest nieograniczony, czyli nie ma czegoś takiego jak wszystkie wiadomości. Jeżeli serwis 'main' wysypie się wygenerowaniu 10 wiadomości, a jest jeszcze 1000 to co się stanie? Po 10 bedzie wygenerowany koncowy event?

(Prawie) Zawsze w Kafce tematy mają wiele partycji, moglbyc dodać szczegóły implementacyjne przy implementacji z Kafka Streams? Ciężko to sobie wyobrazić bez raczej zaawaansowanych/skomplikowanych rzeczy z KS.

0
dmw napisał(a):

@wartek01: Co to znaczy, że 'przyjdą wszystkie wiadomości'? Stream z definicji jest nieograniczony, czyli nie ma czegoś takiego jak wszystkie wiadomości.

Stream z definicji jest nieograniczony (inaczej byłby batchem), ale okienko to właśnie ograniczony wycinek streamu.
Żeby zobrazować: masz jakiś topic z wiadomościami typu MyMessage generowanymi dokładnie co sekundę. Wtedy:

  1. TimeoutWindow przy ustawionym timeoucie na 15 sekund i prostą agregacją do listy zamieni ci strumień cosekundowych wiadomości MyMessage na strumień copiętnastosekundowych wiadomości typu List<MyMessage>.
  2. Z kolei CountWindow przy np. limicie 1000 odpali się, kiedy zbierze się 1000 wiadomości, czyli co 1000 sekund będzie emitował jeden obiekt typu List<MyMessage> na wyjściu.

W przypadku OPa być może bez customizacji się nie obejdzie. Sam takich rzeczy nie robiłem na Kafka Streams API, ale na Flinku podłączonym do Kafki już tak. Natomiast jestem przekonany, że się da, chociaż jeśli wiadomo mniej-więcej jaki jest czas oczekiwania to nawet takie SlidingWindow może wystarczyć.

Jeżeli serwis 'main' wysypie się wygenerowaniu 10 wiadomości, a jest jeszcze 1000 to co się stanie? Po 10 bedzie wygenerowany koncowy event?

Zazwyczaj takie rzeczy ogarnia się - jak to napisałem - timeoutem. W przypadku mojego przykładu wiadomość wychodząca z okienka powinna zawierać informację ile rekordów się spodziewano (dane z service.main), ile przyszło (dane z service.foo.success), a na wyjściu powinieneś mieć informację dotyczącą liczby spodziewanych rekordów i liczby rekordów, które przyszły. Jeśli okienko doczekało się timeoutu to wylatuje po prostu info z rozjazdem pomiędzy oczekiwaną liczbą, a liczbą która faktycznie przyszła.

(Prawie) Zawsze w Kafce tematy mają wiele partycji, moglbyc dodać szczegóły implementacyjne przy implementacji z Kafka Streams?

Ogólnie - jeśli weźmiemy "moje" struktury to jedynym naturalnym kluczem jest mainId (tj. MainTask.mainId lub FooTask.mainId) i po tym spokojnie można partycjonować. Co prawda nie jestem przekonany czy trzeba (skoro wiadomości są idempotentne więc nie ma znaczenia w jakiej kolejności trafi do okienka), ale na pewno nie zaszkodzi.

1

Poczytaj o wzorcach process manager i request-reply, a następnie weź się za detale implementacyjne. Najpierw design, potem wybór narzędzi.

https://www.enterpriseintegrationpatterns.com/patterns/messaging/ProcessManager.html
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html

Od tych komentarzy o Akka i Kafka Streams można serio raka dostać. (Już nie) modne słowa, brakuje jeszcze k8s i Service mesh. Na pewno ktoś Wam to wszystko zestawi i utrzyma :) devops/admin już leci, żeby to zestawić i potem utrzymywać :)

0
Charles_Ray napisał(a):

Od tych komentarzy o Akka i Kafka Streams można serio raka dostać. (Już nie) modne słowa, brakuje jeszcze k8s i Service mesh. Na pewno ktoś Wam to wszystko zestawi i utrzyma :) devops/admin już leci, żeby to zestawić i potem utrzymywać :)

Eee... Chyba nie kojarzysz, ale tutaj nie ma za bardzo co utrzymywać. A przynajmniej nie bardziej, niż np. przy utrzymywaniu mikroserwisu w Spring Boocie. Jak zechcesz to możesz sobie zbudować aplikację w JARze i odpalić, jeśli tylko jest gdzieś tam klaster kafkowy (a jest).

Właśnie dlatego zazwyczaj piszę "użyj Kafka Streams API" zamiast choćby i wspomnianego Flinka - bo w 90% operacji "czystokafkowych" możliwości tego kombajnu typu trzymanie dużych stanów czy zaczytywanie z różnych źródeł jest niepotrzebna. I dosyć trudno mi uznać te streamsy za coś modnego, bo po prostu jeśli przeskoczysz pewną barierę to operowanie na strumieniach robi się bardzo proste.

1 użytkowników online, w tym zalogowanych: 0, gości: 1