Asynchroniczne zapytania do bazy danych w kontrolerze

0

Mam endpointy w kontrolerze napisane w Springu które zwracają pewne dane z bazy danych (wcześniej trzeba je trochę obrobić).
Niektóre selecty po stronie bazy wykonują się za długo (~2 sekundy) a w tym czasie wątek jest zablokowany. Kontroler co prawda musi czekać na dane z bazy więc i tak zwróci je po ~2 sekundach, ale można by wykonywać inne operacje zanim baza nie zwróci wyników (np. obrabianie danych które przyszły z kolejnego requesta do wykonania następnego selecta).
Jest to dla mnie ważne bo zapytań jest na tyle dużo że muszą być kolejkowane przez ograniczoną pulę wątków.
Zacząłem eksperymentować z ComplatableFuture ale koniec końców nie działa to tak jak bym oczekiwał więc może moje rozumowanie jest złe albo wykonana implementacja. Zacznijmy od tego pierwszego.
Oczekuję:

  1. Kontroler łapie zapytanie
  2. Obrabia dane
  3. Robi select'a do bazy (~2-3 sekundy)
  4. W czasie trwania zapytania do bazy kontroler odbiera kolejny request, obrabia dane i robi zapytanie do bazy.
  5. Jeśli poprzednie zapytanie do db już się zakończyło to zwróć wyniki, jeśli nie to przyjmij kolejny request.

Natomiast u mnie tak nie działa. Kontroler może wykonać kilka operacji asynchronicznie w ramach tego samego requesta w czasie kiedy baza pracuje nad zwróceniem jakichś wyników, natomiast nie odbierze kolejnego zapytania dopóki pierwsze w całości się nie zakończyło.
Pytanie więc czy jest to w ogóle możliwe? Aktualnie zrobiłem to na Springu ale mogę to zastąpić czymkolwiek, byle podstawą była Java.

0

Może przyda się Spring WebFlux (ReactiveApi)?

0

O ile kojarzę są 2 modele:

  • polling (klient co jakiś czas, być może krótki, odpytuje, czy jest już odpowiedź na mój request appX_UID_XYZ i dostaje responsa z payloadem)
  • notification / callback - przy requstowaniu dodajesz parametr, gdzie ma być wysłana notyfikacja o zakończeniu przetwarzania requestu "call me back at http://foo/bar/notify/request/appX_UID_XYZ") i tam możesz dorzucać jakiś payload

A komu niby ma być zwrócona odpowiedź? I w jaki sposób? Dla GUI to może wyglądać inaczej (websockets?) i inaczej dla aplikacji backendowych.

0

Trochę się nie zrozumieliśmy.
Klienci pukają do moich endpointów o konkretne dane. Kontroler łapie request, przygotowuje zapytanie do bazy i pyta bazę o dane X. Operacja selecta trwa czasami 2-3 sekundy. Jak się wykona ta operacja to dane z bazy muszą być jeszcze przygotowane, obrobione itp po czym można je zwrócić.
Z punktu widzenia klienta całość będzie synchroniczna bo wszystkie te operacje muszą się wykonać aby otrzymał wyniki natomiast wątki po stronie serwera kiedy robione jest zapytanie do bazy po dane stoją i nic nie robią a mogłyby w tym czasie na warsztat brać kolejny request który trzeba zwalidować lub też przetworzyć wyniki z bazy.

@Patryk27 dodał komentarz który krótko to podsumowuje.

0

Zacznijmy od tego że żeby osiągnąć to co napisałeś w Springu nie musisz wykorzystać CompletableFuture.

Dla każdego requesta domyślnie Spring sam zrobi nowy wątek i zapytania będzie wykonywał równolegle. Jeżeli u ciebie tak nie jest to musiałeś coś pokombinować, np. zrobić customowy config, transakcje na poziomie SERIALIZABLE, manualne lockowanie wątków czy jeszcze coś innego.

Żeby ci dalej pomóc musiałbyś dać minimalny weryfikowalny przykład. (https://stackoverflow.com/help/mcve) i dał więcej informacji m.in. z jakiej bazy korzystasz.

0

Hmm, wewnątrz serwera to nie wystarczy zwykłe ExecutorService z dedykowaną pulą wątków i parę kolejek (w zależności od tego jak bardzo złożony jest flow przetwarzający)?

Jak dla mnie requesty nie biorą się znikąd i trzeba w końcu odpowiedź wysłać do klienta i albo synchronicznie (tak jak masz teraz, że wątek odbierający request jest przyblokowany, aż obliczenia będą gotowe) albo asynchronicznie (callback do klienta). 2-3 sek dla CPU to dużo, więc może być tak, że nie będziesz miał co przetwarzać, bo wąskie gardło będzie na bazie :-)

0
Noozen napisał(a):

Zacznijmy od tego że żeby osiągnąć to co napisałeś w Springu nie musisz wykorzystać CompletableFuture.

Dla każdego requesta domyślnie Spring sam zrobi nowy wątek i zapytania będzie wykonywał równolegle.

Jak najbardziej się z tym zgadzam. Domyślna pula wątków dla Tomcata uruchomionego jako internal w Spring Boot wynosi 200. Mogę to edytować ale problem jest w tym że mi to nie wystarcza. Zdarza się że requestów jest kilka tysięcy.
Kod produkcyjny póki co nie jest istotny bo tam wszystko jest synchronicznie zrobione przez co serwer nie wyrabia i chciałbym to zrobić od nowa. Odpowiedzi do klientów nie mogę zwracać asynchronicznie, natomiast w momencie kiedy wątek nic nie robi bo zrobił synchroniczne zapytanie do DB, chciałbym je zamienić na async i w tym czasie przetwarzać kolejny request z kolejki.

1

W takim wypadku zainteresuj się WebFlux. Albo podnieś pulę do paru tys (nie rób tego :D)

EDIT: WebFlux oznaczałby również zmianę w kodzie klientów, bo odpowiedzi byłyby już event streamem

Jeśli nie chcesz zmieniać API clientów i zwiększać puli połączeń na tomcacie, pomyślałbym o postawieniu jakiegoś API gatewaya, który robiłby delay jeśli byłoby dużo przychodzących requestów.
Np: Nginx https://www.nginx.com/blog/rate-limiting-nginx/, lub Zuul (nie znam)

0

@yarel: odpowiem tutaj w nawiązaniu do komentarza bardziej szczegółówo.

Komunikacja między klientem a serwerem jest synchroniczna i tego póki co nie ruszam bo nie mam możliwości modyfikować klienta. Ruch jest już rozłożony na kilka maszyn ale to nie wystarcza. Zależy mi tylko żeby szybciej rozładować kolejkę requestów. Dla uproszczenia weźmy taki przykład.
Pula wątków dla tomcata wynosi 10. W jednym czasie dostaję 100 requestów o jakieś dane z bazy. 10 requestów będzie obsługiwanych natomiast 90 pozostałych będzie czekać. Aktualnie wygląda to tak że 10 requestów będzie walidowanych, sklejane będą jakieś tam dane i robiony są zapytania do bazy które czasami trwają np. 2 sekundy a wątek w tym czasie czeka na odpowiedź z DB. Odebrał dane to je zwraca do klienta i kolejny request jest przetwarzany.
Koniec końców obsłużenie wszystkiego zajmuje jakieś 25 sekund z czego większość czasu te wątki czekają na odpowiedź z bazy przez to że są to zapytania synchroniczne.
Chciałbym więc to poprawić. Jeśli dla analogicznej sytuacji wpadnie taka sama liczba zapytań to 90 z nich i tak będzie w kolejce natomiast jeśli 10 aktualnie pracujących wątków wyśle asynchroniczne zapytanie do bazy to mogłoby brać z kolejki kolejny request i już przygotowywać zapytanie do bazy danych. Nie przyspieszy to jakoś diametralnie bo nadal zapytania do bazy będą trwały 2-3 sekundy, natomiast wątki które teraz po wykonaniu takiego synchronicznego zapytania nie robią nic, mogłyby w tym czasie obsługiwać kolejne zapytania. Tym bardziej że wiele zapytań zajmuje znacznie mniej czasu (np. 20-30ms) a i tak są zblokowane przez to że w kolejce wcześniej trafiły sie zapytania trwające znacznie dłużej.

Ps. Podany przeze mnie czas 2-3 sekund na zapytanie bazodanowe to bardziej takie zwrócenie uwagi na to że część zapytań wykonuje się dużo dłużej niż pozostałe. Mam konkretne metryki ale też nie chcę tutaj o nich pisac więc przyjąłem że jest to po prostu długo trwające zapytanie.

0

Hmm, jeśli umiesz rozróżnić na podstawie requestu, które zapytania będą ciężkie, a które nie, mógłbyś utworzyć osobne Tomcat connector pools. Jedna byłaby do obsługiwania szybkich zapytań, druga - do wolnych. Wiąże się to z tym, że serwis wystawiony byłby na dwóch portach, ale jeśli pomiędzy klientem a serwerem masz jakieś proxy, to możesz wdrożyć taką zmianę bez zauważalnej różnicy dla klienta.

1

Rozumiem że chciałbyś zrobić nowy wątek przed zapytaniem SQL i robić coś sobie w tle. Niestety w praktyce nic tym nie osiągniesz. Dopóki SQL się nie skonczy to trzymasz połączenie na Tomcacie, innymi słowy nigdy nie będziesz ptrzetwarzał więcej niz te 200 połączeń w puli naraz.

Jeżeli chcesz zwolnić wątek zanim dasz odpowiedź do klienta, to tak jak napisał qbns, musisz wykorzystać np. WebFluxa który nie bazuje na serwletach.

Oprócz tego pozostaje ci: optymalizacja SQL, zwiększenie puli wątków, skalowanie wszerz i ew. żonglowanie połaczeniami co też już opisał qbns.

0

Prędkość wejściowa: 100 requestów / sekundę
Czas przetwarzania E2E: 5 sekundy na request => 1 wątek przetwarza 0,2 req /sekundę

Chcesz przetworzyć 100 requestów przetwarzasz w 100 / 0,2 sekund => 500 sekund.

Chcesz skrócić czas oczekiwania na przetworzenie -> więcej otwartych okienek
10 okienek -> 50 sekund czekania
20 okienek -> 25 sekund czekania w kolejce
50 okienek -> 10 sekund czekania w kolejce
200 okienek -> 2.5 sekundy czekania w kolejce

Dlaczego przy 200 wątkach nie masz 2.5 sekundy na obsługę całego requestu? (Nie masz, bo piszesz, że czas obsługi na bazie 2-3 sek.)
Dlaczego masz 2-3 sek na odpowiedź z bazy?

0

moźe lepszym rozwiązaniem było by skeszowanie wyniku zapytania i filtrowanie dla kazdego requsta ?

0

@yarel: Pomysł z podziałem na krótkie i długie zapytania jest jak najbardziej ciekawy i zaraz to sobie przemyślę a tym czasem podrzucę mój pomysł który zaimplementowałem. Znalazłem taki artykuł:
https://nickebbitt.github.io/blog/2017/03/22/async-web-service-using-completable-future
Muszę to testować na jakichś mniejszych obciążeniach które sam jestem wstanie wygenerować więc ustawiłem sobie pulę wątków dla Tomcata w Spring boot na 2 wątki.

server:
  port: 9095
  tomcat:
    max-threads: 2

Zaimplementowałem najpierw zwykły kontroler. Kontroler dostaje jakieś ID i zwraca customera z bazy ale żeby móc testować to podczas walidacji wątek jest usypiany na 1 sekundę a podczas pobierania z bazy danych na 3 sekundy. Cała operacja więc od przyjęcia ID do zwrócenia customera trwa 4 sekundy. Jeśli przyjdą 2 requesty to mając pulę 2 wątków obsłużone zostaną oba jednocześnie w ciągu 4 sekund. Jeśli trafią się 4 zapytania to pierwsze 2 będą zwrócone po 4 sekundach a 2 następne będą czekały w kolejce (najpierw czekają na zakończenie poprzednich 4 sekundy + 4 sekundy podczas wykonywania operacji = łącznie 8s). Itd itd.

@GetMapping("/customer2/{id}")
    ResponseEntity<Customer> getById2(@PathVariable long id) throws InterruptedException {
        logger.info("Request received");

        dataValidation();
        Customer customer = fetchFromDbSync(id);

        logger.info("Servlet thread released");

        return ResponseEntity.ok(customer);
    }

    private void dataValidation() throws InterruptedException {
        logger.info("Start data validation");
        //some important operation
        TimeUnit.SECONDS.sleep(1);
        logger.info("Completed data validation");
    }

    private Customer fetchFromDbSync(long id) throws InterruptedException {
        logger.info("Start processing request");

        //imitation of long db operation
        TimeUnit.SECONDS.sleep(3);
        Customer customer = repository.findById(id);

        logger.info("Completed processing request");
        return customer;
    }

Zrobiłem test Apache Bench:

ab -n 10 -c 10 http://localhost:9095/customer2/2

10 zapytań na 10 wątkach więc 8 będzie wrzuconych do kolejki.
Wynik jest taki:
title
Pierwszy request jest najkrótszy - 4 sekundy a każdy kolejny trwa dłużej ponieważ wszystko jest synchroniczne.

Natomiast jak zmodyfikowałem metodę kontrolera:

  @GetMapping("/async/customer2/{id}")
    CompletableFuture<ResponseEntity<Customer>> asyncGetById2(@PathVariable long id) throws InterruptedException {
        logger.info("Request received");

        dataValidation();

        CompletableFuture<Customer> completableFuture =
                CompletableFuture.supplyAsync(() -> fetchFromDbSync(id));

        logger.info("Servlet thread released");

        return completableFuture
                .thenApplyAsync(ResponseEntity::ok);
    }

To wynik jest znacznie lepszy:
title
Najkrótszy request zajął 5 sekund (pewnie dlatego że jak metoda fetchFromDbSync zakończyła działanie to wynik nie mógł być zwrócony ponieważ wszystkie wątki były zajęte) ale za to łączny czas wykonania wszystkich requestów spadł do 8 sekund (poprzednio było 20 s) a średni czas wyniósł 6 sekund na request (wcześniej było 12s).

Jak się przejrzy logi:

2018-11-13 1416.674 INFO 22188 --- [nio-9095-exec-1] c.t.customer.AsyncCustomerController : Request received
2018-11-13 1416.674 INFO 22188 --- [nio-9095-exec-1] c.t.customer.AsyncCustomerController : Start data validation
2018-11-13 1416.674 INFO 22188 --- [nio-9095-exec-2] c.t.customer.AsyncCustomerController : Request received
2018-11-13 1416.674 INFO 22188 --- [nio-9095-exec-2] c.t.customer.AsyncCustomerController : Start data validation
2018-11-13 1417.675 INFO 22188 --- [nio-9095-exec-2] c.t.customer.AsyncCustomerController : Completed data validation
2018-11-13 1417.675 INFO 22188 --- [nio-9095-exec-1] c.t.customer.AsyncCustomerController : Completed data validation
2018-11-13 1417.675 INFO 22188 --- [nio-9095-exec-2] c.t.customer.AsyncCustomerController : Servlet thread released
2018-11-13 1417.675 INFO 22188 --- [nPool-worker-13] c.t.customer.AsyncCustomerController : Start processing request
2018-11-13 1417.675 INFO 22188 --- [nio-9095-exec-1] c.t.customer.AsyncCustomerController : Servlet thread released
2018-11-13 1417.675 INFO 22188 --- [onPool-worker-8] c.t.customer.AsyncCustomerController : Start processing request
2018-11-13 1417.677 INFO 22188 --- [nio-9095-exec-2] c.t.customer.AsyncCustomerController : Request received
2018-11-13 1417.677 INFO 22188 --- [nio-9095-exec-1] c.t.customer.AsyncCustomerController : Request received
2018-11-13 1417.677 INFO 22188 --- [nio-9095-exec-2] c.t.customer.AsyncCustomerController : Start data validation
2018-11-13 1417.677 INFO 22188 --- [nio-9095-exec-1] c.t.customer.AsyncCustomerController : Start data validation
...

To wygląda na to że wszystkie operacje wykonywane są na wątku 1 i 2 a zapytanie do bazy danych jest asynchroniczne i wątek nie czeka na odpowiedź z bazy tylko zaczyna obsługiwać kolejne zapytanie (Request received).
Nie wiem czy mój tok rozumowania jest poprawny ale wydaje mi się że działa tak jak bym chciał. Pytanie tylko czy w takim podejściu przy dużej liczbie zapytań ta asynchroniczność nie wyjdzie bokiem i wszystko się nie rozjedzie?

0
  1. Takie completable future jakie odpalasz używa common fork join pool i ma n-1 wątków, gdzie n to liczba rdzeni które masz. Myśle ze jednak chciałeś tam wrzucić custom pool jakiś? Bo teraz to trochę bez sensu benchmark.
  2. Chcesz mieć ciastko i zjeść ciastko, a tak się nie da. Jak chcesz low latency to niestety będzie trzeba zrezygnować z synchronicznego blokowania odpowiedzi.

1 użytkowników online, w tym zalogowanych: 0, gości: 1