Jakimi technologiami ogarnąć długo wykonujący się proces?

0

Mam do ogarnięcia długi proces, który wykonuje się kilka godzin codziennie i startuje o konkretnej godzinie.

Mam kilkanaście paczek faktur i innych dokumentów finansowych, od 0,2mln. do 2mln. w zależności od rozmiaru paczki.
Proces taki to wykonanie jedno po drugim wywołań procedur, api, stworzenie gdzieś na zasobie pliku itd..

np. taki proces dla jednej paczki (a trzeba odpalić ten proces dla kilku paczek):

  1. Pobierz paczkę ->
  2. przepuść przez zewnętrzne API, które odfiltruje i posortuje dokumenty ->
  3. odpal procedurę ->
    4 Jeśli typ paczki==A to odpal kolejną inną procedurę w bazie, w innym przypadku idź do kolejnego kroku ->
    5 Przepuść dokumenty przez OCR w chmurze =>
    6 Przepuść dokumenty przez api, by uzyskać wyliczenie i notę dla dokumentów =>
    7 Dołącz z poprzedniego kroku dane do dokumentów =>
    9 Wywołaj api, które stworzy wrzuci dokumenty z raportem w odpowiednie miejsca w systemie

To tylko przykładowo jak proces dla jednej paczki może wyglądać by przedstawić problem.

Mamy już system, który realizuje coś podobnego i mamy po prostu pętlę z paczkami, a w środku pętli wszystkie kroki dla danej paczki wywołują się synchronicznie.
Kolejne kroki procesu dla jednej paczki muszą się wykonywać jeden po drugim, bo każdy krok bazuje na poprzednim, wyjątki są małe i tutaj zwykłe Task.WhenAll wystarcza, zwłaszcza że wszystkie kroki procesu wykonują się długo ale w zewnętrznych systemach i trzeba tylko poczekać.

Aplikacja więc, którą muszę stworzyć jest takim jakby orkiestratorem całości i puszcza jeden krok po drugim wykorzystując zewnętrzne systemy na które nie mam wpływu poza parametrami ustawionymi w naszym panel adminie dla każdej paczki. Chciałbym żeby biznes też mógł widzieć na którym kroku procesu która paczka jest obecnie i nie wiem czy załatwi mi to jakiś framework przy okazji, czy ręcznie w bazie tą informację po zakończeniu każdego kroku zrobić.

Kolejnym problemem jest fakt, że najlepiej byłoby puścić proces dla wszystkich paczek równolegle, ale wiele z tych kroków może w danym czasie być odpalony tylko dla jednej paczki! A niektóre już można równolegle puścić i zyskać masę czasu. Co więcej kolejność przetwarzania paczek jest bardzo ważna! np. Paczka Id2 nie może wejść w krok 6 przed Paczką Id1, nawet jeśli Paczka Id2 wszystkie równoległe kroki przeszła szybciej.
Np. zamiast:

Paczka Id1: Krok1->Krok2->...->Krok9 
                                                            Paczka Id2: Krok1->Krok2->...->Krok9 
                                                                                                                       Paczka Id3: Krok1->Krok2->...->Krok9 
                                                                                                                                                                                   Paczka Id4: Krok1->Krok2->...->Krok9 

puścić to by wykonało się mniej więcej np. tak:

Paczka Id1: Krok1 -> Krok2 ->  Krok3  -> Krok4  -> Bardzo długi Krok5    -> Krok6
Paczka Id2: Krok1 -> wait    ->  Krok2  -> Krok3  -> Krok4   ->   Bardzo długi Krok5  -> Krok6
Paczka Id3: Krok1 ->         wait             -> Krok2  -> Krok3   -> Krok4  ->   Bardzo długi Krok5  -> Krok6

Aż samo się ciśnie wykorzystanie kolejek, po prostu niektóre kroki mogłyby przyjąć tylko jedno zadanie na raz, inne wiele zadań równolegle, ale jak ogarnąć kolejkę by nie przyjęła obsługi paczki Id2 jeśli wcześniej nie obsłużyła paczki Id1? Pewnie sprawdzać przy przyjęciu Paczkę Id2, sprawdzić wtedy status paczki Id1, czy już jest na następnym kroku czy przed, a jeśli przez do Paczkę Id2 wrzucić na kolejkę obok by zrobiła retry za minutę i znowu puściła do pierwszej kolejki. Oczywiście oprócz tego ogarnięcie co się dzieje jak na procesie którejś paczki

Sprawa skomplikowana, ale nie wiem czego użyć za bardzo. Może to ogarnąć w jakiś silnik BPMN, skorzystać z kolejek do tego, może dobry by tu był stack particular z Saga i NServiceBus (ale nie temat wymaga spędzenia do rozpoznania masę godzin, za nim się zorientuję czy byłoby to dobre do użycia narzędzie), czy może dużo więcej samemu napisać kodu do obsługi i się pobawić MediatR i Hangfirem (Hangfire wykorzystywałem tylko do odpalenia joba wg skonfigurowanego crona i nie wiem czy to narzędzie, które ogarnie taki workflow jobów ze stepami)

Miał ktoś styczność z podobnymi problemami?

0

Czy każdy dokument w paczce jest odrębnym obiektem? Czy jesteś w stanie odpalić dla każdego dokumentu osobne zadanie? Jeśli tak, to by rozważył wykorzystanie Task.Run równolegle dla każdego obiektu. Wykorzystasz w ten sposób procesor w 100%.

1

Ja kiedyś do podobnego procesu używałem MassTransita (odpowiednik NServiceBusa).

Tak jak piszesz trochę czasu zeszło na rozpoznanie i rozwiązywanie problemów specyficznych dla środowiska rozproszonego, ale ostatecznie działało dobrze.

2

Ja bym to zrobił na kolejkach (MSMQ czy RabbitMQ) i kilku procesach/kilku zestawach wątków.

  1. Pierwszy proces/wątek wrzuca paczki pierwszej kolejki, np. "PackagesAwaitingForDownloadQueue". Nie musi to być wątek, może to być jakieś REST/SOAP API lub coś co nasłuchuje na zmiany w katalogu.
  2. Drugi proces/wątek pobiera wiadomość z powyższej kolejki, przesuwa do kolejnej, np. "PackagesCurrentlyDownloadedQueue" (taka operacja w postacie kolejek "in progress" umożliwi proste monitorowanie tego, co się w danym momencie dzieje oraz możliwość łatwego ogarnięcia stanu procesów po restarcie maszyny czy któregoś z procesów) i odpala wątek-worker, który ściąga paczkę. Wątek-worker po zakończeniu ściągania przesuwa wiadomość do następnej kolejki, np. "PackagesToBeFilteredQueue" (albo zrobi to główny wątek po wykryciu zakończenia przetwarzania). Głowny wątek może czekać na zakończenie działania wątku-workera lub uruchamiać od razu kolejny/kolejne w skonfigurowanej ilości, jeśli według Ciebie sensowne jest pobieranie 2+ paczek jednocześnie. Jeśli sens ma ściąganie tylko jednej paczki naraz możesz uprościć proces i zrobić ściąganie w głównym wątku, acz fajnie by było mieć identyczną architekturę w każdym kroku procesu i sterować ilością jednocześnie przetwarzanych wiadomości za pomocą ustawień.
  3. Trzeci proces/wątek pobiera wiadomości z paczki "PackagesToBeFilteredQueue" i robi w zasadzie to samo, co w punkcie drugim, tylko ma własną konfigurację (kolejki nazywają się inaczej i ma być może inną ilość jednocześnie pracujących wątków) oraz wątki-workery robią co innego.
  4. To samo co w punkcie trzecim i to samo dla kolejnych kroków.
    ...
    n. Ostatni krok w wątku-workerze (lub głównym wątku zależnie od Twoich preferencji) przesuwa wiadomości do kolejki "FinishedProcessingQueue", ewentualnie wywala wiadomości w kosmos.

Taka architektura pozwala na elastyczne dodawanie/usuwanie kroków, sterowanie równoległością przetwarzania każdego kroku z osobna (1 wątek, n wątków, bez ograniczeń), łatwy podgląd tego, co się dzieje, łatwe restartowanie przetwarzania konkretnych kroków lub całego procesu.

Przy starcie aplikacji możesz przesuwać wiadomości z kolejek in-progress do kolejek to-do. Jeśli ściągane/wysyłane paczki i dokumenty są duże, a masz możliwość wznawiania pobierania/wysyłania, to warto rozważyć wykorzystanie tego mechanizmu.

1 użytkowników online, w tym zalogowanych: 0, gości: 1