Spring Webflux + współdzielona mapa

0

Hej. Mam wystawione API które zbiera od użytkowników pewne dane a po X czasie (np. co minutę) zapisuje w bazie statystyki odnośnie tych danych. W skrócie wygląda to tak:

Endpoint:

    @Bean
    RouterFunction<ServerResponse> saveTime() {
        return route(POST("/times"),
                req -> req.body(toMono(Timestamp.class))
                        .subscribe(service::add)
                        .then(ok().build()));
    }

Service grupuje dane na mapie:

    private Map<Integer, List<LocalTime>> stats = new ConcurrentHashMap<>();

    void add(Timestamp timestamp) {
        if (stats .containsKey(timestamp.getId())) {
            stats .get(timestamp.getId()).add(timestamp.getTimestamp());
        } else {
            stats .put(timestamp.getId(), new ArrayList<>(Collections.singleton(timestamp.getTimestamp())));
        }
    }

Problem w tym że to nie zawsze działa poprawnie. Np. Kiedy co jakiś czas (np. co 10 min) uruchamiany jest job który pobiera tę listę i zapisuje statystyki do bazy to też przy okazji ją czyści żeby na nowo zbierać dane o timestampach. W chwili w której przychodzi request i jednocześnie uruchamia się cron job leci NPE w miejscu stats .get(timestamp.getId()).add(timestamp.getTimestamp()); co w sumie ma sens bo lista mogła zostać już wyczyszczona.

Mam więc 2 pytania:

  1. Jak to poprawnie powinno być zaimplementowane - nie chodzi mi o konkretną implementację ale chociaż podpowiedź bo myślałem że ConcurrentHashMap rozwiąże problem z dostępem do tej kolekcji.
  2. Czy mój endpoint który w sumie jest banalny ale czy z pkt widzenia async jest okej? Wcześniej pisałem wszystko na tomcata i w sumie Webfluxa trochę dzięki @jarekr000000 się uczę który poleca go zamiast Spring Boota. Ps. Oczywiście wiem że jeszcze można całkowicie te adnotacje wyrzucić i mam zamiar to zrobić;)
0

a) Rozwiązaniem twojego problemu może być:
AtomicReference, a w szczególności metoda updateAndGet.

AtomicReference<Map<Integer, List<LocalTime>>> stats (i wtedy już nie potrzebujesz ConcurrentHashMap).

b) Alternatywa to metoda merge w mapie. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentMap.html#merge-K-V-java.util.function.BiFunction-

0

@jarekr000000: ok poczytam, dzięki.
A jeśli chodzi o prostu endpoint to jest okej? Robiłem to wcześniej na tomcacie i jak wiadomo tam by defaut jest pula 200 wątków. Testowałem to sobie pod dosyć sporym obciążeniem i widziałem jak te wątki się mnożą na potengę z kolejnymi requestami i uzyskiwałem wydajność na poziomie 2500req/s. W przypadku WebFluxa faktycznie wątków jest mniej ale nie zayważyłem jakiegoś wzrostu wydajności (wyniki są podobne). Jeśli dobrze rozumiem to mam tu operację blokującą bo nawet jeśli przyjdzie bardzo duża liczba requestów to nie mogą być obsłużone asynchronicznie chociażby przez tę metodę: stats .get(timestamp.getId()).add(timestamp.getTimestamp()); stąd też myślałem że WebFlux będzie sobie z tym radził lepiej.

Pytanie więc czy webflux (albo jakakolwiek inna technologia) bardziej się do tego nada żeby uzyskać jakiś większy skok wydajności?

0

Jeżeli w serwisie nie masz żadnego IO to zwykle różnicy nie będzie między podejściem tomcat vs webflux.
(zwykle ). Przy tak trywialnym kodzie największy wpływ mają jakieś narzuty z parsowaniem requesta itd. w obu implementacjach - różnice wynikające z architektury są raczej niewidoczne w pomiarach.

Użycie AtomicReference powoduje, że masz faktycznie non blocking operacje. Ale nie wiadomo czy będzie różnica, to raczej chodzi o bezpieczeństwo.

0

Ogólnie WebFlux bardziej może się nadać kiedy masz wielu concurrent userów nasłuchujących endpointa (np. przez SSE) iub IO tak jak Jarek wspomina. Tutaj masz requesty i szybką odpowiedź, więc WebFlux usuwa Ci jedynie overhead związany z tworzeniem nowego wątku a imitu połączeń na Tomcat'cie przy takim prostym endpoincie nie wyczerpiesz na bank.

UWAGA: Jeśli hostujesz to na Tomcat'cie to i tak tworzy Ci się nowy wątek z każdym requestem. Czyli ma to jakiś sens tylko pod warunkiem jeśli hostujesz tę WebFluxową apkę na Netty.

0
jarekr000000 napisał(a):

Jeżeli w serwisie nie masz żadnego IO to zwykle różnicy nie będzie między podejściem tomcat vs webflux.

Ok tak się spodziewałem.

Użycie AtomicReference powoduje, że masz faktycznie non blocking operacje. Ale nie wiadomo czy będzie różnica, to raczej chodzi o bezpieczeństwo.
Póki co próbuję z Merge bo mam wrażenie że jest bardziej czytelne ale spróbuję też z AtomicReference.

qbns napisał(a):

Czyli ma to jakiś sens tylko pod warunkiem jeśli hostujesz tę WebFluxową apkę na Netty.

Oczywiście uruchamiam na Netty.

Pytanie więc czy da się w ogóle poprawić jakoś znacząco wydajność w tak małej aplikacji? Vert.x lub Ratpack w ogóle cokolwiek tu zmieni? Rozumiem że kodu jest niewiele i nie ma tu czasochłonnych operacji ale może sam framework/biblioteka robi jakiś narzut.

1

A spróbuj nie zapisywać wartości do Map, tylko Flux Processora, do którego będziesz dodawał elementy. Z tego processora możesz exposować buffered streama, który dla subskrybentów będzie udostępniał co 10 minut zapisane rekordy. Myślę, że to bardziej reaktywne podejście i mooooże będzie szybsze (sam jestem zainteresowany wynikiem :D)

EDIT: I btw. 2500 req/s brzmi słabo przy takiej prostej logice, chyba że masz kiepski sprzęt. U mnie dużo bardziej skomplikowany serwis znosi do 5000 insertów na jednym, testowym hoście (CPU i7-6820HQ), tylko że ja robię tylko na streamach, mam ich bardzo wiele + ~2000 klientów na bieżąco je nasłuchuje.

0
qbns napisał(a):

EDIT: I btw. 2500 req/s brzmi słabo przy takiej prostej logice, chyba że masz kiepski sprzęt. U mnie dużo bardziej skomplikowany serwis znosi do 5000 insertów na jednym, testowym hoście (CPU i7-6820HQ), tylko że ja robię tylko na streamach, mam ich bardzo wiele + ~2000 klientów na bieżąco je nasłuchuje.

@qbns Też mam takie wrażenie stąd też przepisałem to na Webfluxa :/
Sprzęt nienajgorszy bo I7 - 7gen
Zastanawiam się ciągle skąd ta słaba wydajność skoro tak naprawdę tam jest tylko endpoint który przyjmuje request, zapisuje na mapie i zwraca 200 -OK
Jedyne co mi przychodzi do głowy to to że te dane są grupowane po Id usera który je wysłał więc ta operacja nie może być wykonywana wielowątkowo i pewnie tworzy się wąskie gardło.

0

Jak iczym mierzysz?

Zrob profiling, to wyjdzie (może).

0

Właśnie o to samo chciałem zapytać bo trochę uprościłem swój kod a i tak się zdarza że mam 1500 req/s więc może ja coś źle odpalam.
Testuje to JMeterem. Utworzyłem Thread Group:
jmeter
Dodaje tam Simple Controler z prostym requestem a do tego Summary Report który po wykonaniu wszystkich requestów daje taki wynik:
summary

Jeśli dobrze rozumiem to Throughput oznacza performance mojego serwisu.

EDIT
@jarekr000000 @qbns postawiłem apkę na Vert.x z jednym endpointem GET i puściłem ten sam test jMeterem i też mam tak niską wydajność (~2k req/s).
Może ja coś źle ustawiam. Jakieś inne propozycje jak to sensownie przetestować?

1

Testuje to JMeterem.

To jakby jasno wskazuje co jest problemem. Pewnie jeszcze na localhost?
Jmeter wariatem prędkości nie jest - do testowania dużych ciężkawych serwisów się nadaje, ale na małe/milisekundowe jest problematyczny.
Sam pożera, z tego co kojarze kosmiczne ilości wątków, i ram :-)

1 użytkowników online, w tym zalogowanych: 0, gości: 1