Problemy z wydajnością przy odpytywaniu zewnętrznego API

0

Mam aplikację, która obecnie co 3 sekundy odpytuje zewnętrzne API z określonymi parametrami, których jest +/- 300. Oznacza to 300 zapytań REST co 3 sekundy, każde zapytanie zwraca 60 elementów, które porównuje elementami które do tej pory pobrałem, jeżeli pojawi się nowy - publikuje go dalej. Pierwszy problem pojawił się ze strony WebClient którego używam, lokalnie zaczęło się sypać przy +/- 60-70 query, na VPSie daje radę na 100, powyżej tej liczby ponad 50% zapytań kończy się błędem

reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response

Dlatego też, mam 3 równolegle działające aplikację i każda z nich obsługuje po 100 zapytań, a wyniki publikowane są RESTem do kolejnego serwisu, może i lepsza byłaby kolejka, aczkolwiek tych strzałów jest około 100-150 dziennie, tak więc tutaj zdecydowanie nie ma problemów.

Jest za to poważny problem z wydajnością tych zapytań do zewnętrznego API, nowe elementy pojawiają się w aplikacji która obsługuję 30 zapytań o średnio 60 sekund szybciej niż w aplikacji obsługującej 300 zapytań (a dokładnie w 3 aplikacjach po 100). Na monitorze zasobów dostepnym na VPSie widzę 50% użycie CPU, a za to przez ps -p <pid> %cpu, %mem widzę około 25% użycia CPU i pamięci na 1 aplikację (czyli sumarycznie wychodzi przynajmniej 75%), a przed chwilą poleciało

There is insufficient memory for the Java Runtime Environment to continue.

Native memory allocation (mmap) failed to map 204472320 bytes for committing reserved memory.

Jeszcze jedna rzecz mnie martwi, hosting udostępnia jeszcze jeden monitor, w którym co 5 sekund odpytuje VPSa o użycie zasobów przez poszczególne procesy, prawie nigdy nie jest tak że mam coś w stylu:
service1 25% cpu
service2 25% cpu
service3 25% cpu

A zamiast tego jest
service1 70% cpu
service2 80% cpu
service3 1% cpu

Pomijając fakt, że faktycznie to się nie sumuję do 100%, to zawsze 1 serwis jest z tyłu, a po chwili inny, czyli tak jakby nie pracują one jednocześnie cały czas.

Pytanie więc, co robić? Dołożenie pamięci pomoże? Obecnie było to 4gb. Poza tym, czy problem z tym że ewidentnie zapytania kończyły się wolniej, to wina procesora? 300 równoległych zapytań przez WebClient na 3 różnych aplikacjach to za dużo?

Niżej zostawiam kod,

Każdy response to maksymalnie 60 takich obiektów

public class Item implements Serializable {
    private String id;
    private String name;
    //constructor, getter, setter
}

No i sam mechanizm odpytywania API, jest on uruchamiany po uwczesnym zainicjalizowaniu initialItems stanem początkowym.

    private final Map<String, Item> initialItems = new ConcurrentHashMap<>();

    @Scheduled(fixedRate = 3000)
    private void collectData() {
        try {
            queryRepository.getSearchQueries()
                    .forEach(query -> reactiveClient.collectData(query)
                            .subscribe(this::processResponse));
        } catch (Throwable t) {
            logger.severe(t.getMessage());
        }
    }

    private void processResponse(final ApiResponse response) {
        response.collectItems().forEach(item -> {
            if (!initialItems.containsKey(item.getId())) {
                initialItems.put(item.getId(), item);
                eventPublisher.publishEvent(new NewItemEvent(this, item));
            }
        });
    }

    public Mono<ApiResponse> collectData(final SearchQuery query) {
        final var accessToken = authenticationService.getAccessToken();
        return invoke(query.toUrlParams(), accessToken);
    }

    private Mono<ApiResponse> invoke(final String endpoint, final String accessToken) {
        return webClient.get()
                .uri(endpoint)
                .header("Authorization", accessToken)
                .retrieve()
                .bodyToMono(ApiResponse.class)
                .onErrorReturn(ApiResponse.emptyResponse());
    }

queryRepository działa na zwykłej HashMap, to nie jest żadna baza danych.
authenticationService wykonuje zapytanie REST tylko gdy token wygasa, czyli raz na kilka godzin, reszte czasu zwraca zapisany token.

2
  1. Dołożenie pamięci być może pomoże (choć możliwe zużycie pamięci to skutek błędu). Czy faktycznie JVM korzysta z tej pamięci? There is insufficient memory for the Java Runtime Environment to continue. raczej jasno wskazuje, że taki problem wystąpił.
  2. connection prematurely closed BEFORE response niestety może oznaczać wiele - może serwer nie wyrabia i ucina połączenia? tcpdump może pomóc (pewnie są lepsze narzędzia)
  3. Straszny masz ten kod - mieszanie reactora z imperatywnym void / subscribe ale nie widzę oczywistego błędu.
    Ja tam bym collectData(query) wrzucił we fluxa i przy pomocy flatMap objechał wszystkie requesty (żeby nie robić na każdym osobno subscribe - ale po prawdzie to raczej nie pomoże, a może nawet pogorszyć (będą częściej wykonywane będą requesty).
  4. Może musisz ustawić timeouty : https://medium.com/@kalpads/configuring-timeouts-in-spring-reactive-webclient-4bc5faf56411
  5. albo jeśli serwer nie wyrabia to może trzeba będzie wrzucić jakiś throttling.
0

Trudne zagadnienie, ja bym miał dwa pomysły ogólnie:

1

Ja bym się zastanowił czy nie łapie cię zwyczajnie jakiś rate-limit od strony serwera. Jedna uwaga jaką mam, to co prawda robisz to asynchronicznie, ale czy czasem potem to twoje processResponse nie leci juz w jednym wątku? Jeśli masz więcej dostepnych rdzeni to mozna by to też wykonywać jakimś CompletableFuture z większym thread poolem. Ciekawi mnie trochę co ci tutaj zjada to CPU, te contains? Bo nic w tym kodzie nie wygląda na specjalnie cięzkie obliczeniowo.
Ostatnia sprawa to może lepiej byłoby te query rozłożyć w czasie? Tzn nie strzelać 300 co 3 sekundy na hurra, tylko jedno co 0.01s?

0
jarekr000000 napisał(a):
  1. Dołożenie pamięci być może pomoże (choć możliwe zużycie pamięci to skutek błędu). Czy faktycznie JVM korzysta z tej pamięci? There is insufficient memory for the Java Runtime Environment to continue. raczej jasno wskazuje, że taki problem wystąpił.
  1. connection prematurely closed BEFORE response niestety może oznaczać wiele - może serwer nie wyrabia i ucina połączenia? tcpdump może pomóc (pewnie są lepsze narzędzia)
  2. Straszny masz ten kod - mieszanie reactora z imperatywnym void / subscribe ale nie widzę oczywistego błędu.
    Ja tam bym collectData(query) wrzucił we fluxa i przy pomocy flatMap objechał wszystkie requesty (żeby nie robić na każdym osobno subscribe - ale po prawdzie to raczej nie pomoże, a może nawet pogorszyć (będą częściej wykonywane będą requesty).
  3. Może musisz ustawić timeouty : https://medium.com/@kalpads/configuring-timeouts-in-spring-reactive-webclient-4bc5faf56411
  4. albo jeśli serwer nie wyrabia to może trzeba będzie wrzucić jakiś throttling.
  1. Jeżeli ps -p na konkretnym pid tego serwisu zwrócił 25% memory usage to oznacza, że ten jeden serwis używał 25% całej pamięci w systemie, czyli 1gb na 1 aplikację?
    3 takie aplikacje, to już 75%, sam CentOS i jego procesy też coś używają, do tego 4ta aplikacja która dużego użycia zasobów nie powodowała, no ale jednak.

  2. Jak wspomniałem, na VPSie było lepiej i wyglądało to na cold start JVMa, pierwszą minute - dwie po starcie poleciało jakieś 200-300 wyjątków, a po 6 godzinach doszło ich może 100.

  3. W sumie to pierwsze moje podejście do reactora, tak wiec na pewno mogło byc lepiej

  4. Czyli ustawić większy timeout by dłużej czekać na odpowiedź? To miałoby pomóc wyjątkami które lecą?

  5. Nie do końca rozumiem

1

Przy okazji

if (!initialItems.containsKey(item.getId())) {
                initialItems.put(item.getId(), item);
                eventPublisher.publishEvent(new NewItemEvent(this, item));
            }

to jest słaby pattern. Zwłaszcza z ConcurrentHashMap. Może tutaj raczej nic nie psuje, ale warto takich konstrukcji unikać.
Użyj putIfAbsent albo jednej z podobnych metod.

0

Bardzo prawdopodobne, że myliłem się co do przyczyny problemu... dopisałem "na kolanie" prosty analizator wyników (przemilczmy proszę float zamiast integer ;) )

    private final AtomicInteger successCounter = new AtomicInteger(0);
    private final AtomicInteger failCounter = new AtomicInteger(0);
    private final AtomicInteger counter = new AtomicInteger(0);

    //wywołane z każdym subscribe()
    private void verifyResults(final ApiResponse response) {
        if (response.isFailed()) { // webClient.onErrorReturn(emptyResponse()); wewnatrz emptyResponse ustawiam flage isFailed na true
            failCounter.incrementAndGet();
        } else {
            successCounter.incrementAndGet();
        }
    }

    @Scheduled(fixedRate = 60000)
    private void analyzeResults() {
        final var callsPerSecond = 20; // strzał co 3 sekundy
        if (!initialItems.isEmpty()) {
            final float expectedCalls = callsPerSecond * queryRepository.getSearchQueries().size() * counter.get();
            if (expectedCalls > 0) {
                final float completedCalls = successCounter.get() + failCounter.get();
                final float successRate = (successCounter.get() / completedCalls) * 100;
                final float callRate = (completedCalls / expectedCalls) * 100;
                final float effectivenessRate = (successCounter.get() / expectedCalls) * 100;
                logger.info(
                        "\nExpected calls: " + expectedCalls + "\n" +
                                "Completed calls: " + expectedCalls + "\n" +
                                "Completed calls rate: " + callRate + "%\n" +
                                "Success rate: " + successRate + "%\n" +
                                "Effectiveness rate: " + effectivenessRate + "%\n"
                );
            }
            counter.incrementAndGet();
        }
    }

Po pierwszej minucie działania programu

==> logs/collector1.log <==
2020-10-16 1941.125 INFO 2508762 --- [ scheduling-1] p.a.a.core.CollectorService:
Expected calls: 2400.0
Completed calls: 2400.0
Completed calls rate: 100.0%
Success rate: 91.91667%
Effectiveness rate: 91.91667%

==> logs/collector3.log <==
2020-10-16 1941.592 INFO 2508764 --- [ scheduling-1] p.a.a.core.CollectorService:
Expected calls: 2400.0
Completed calls: 2400.0
Completed calls rate: 100.0%
Success rate: 90.66667%
Effectiveness rate: 90.66667%

==> logs/collector2.log <==
2020-10-16 1945.600 INFO 2508763 --- [ scheduling-1] p.a.a.core.CollectorService :
Expected calls: 2400.0
Completed calls: 2400.0
Completed calls rate: 100.0%
Success rate: 86.791664%
Effectiveness rate: 86.791664%

Po 10 minutach

==> logs/collector1.log <==
2020-10-16 1941.110 INFO 2508762 --- [ scheduling-1] p.a.a.core.CollectorService:
Expected calls: 26400.0
Completed calls: 26400.0
Completed calls rate: 100.0%
Success rate: 99.26515%
Effectiveness rate: 99.26515%

==> logs/collector3.log <==
2020-10-16 1941.540 INFO 2508764 --- [ scheduling-1] p.a.a.core.CollectorService:
Expected calls: 26400.0
Completed calls: 26400.0
Completed calls rate: 100.0%
Success rate: 99.15152%
Effectiveness rate: 99.15152%

==> logs/collector2.log <==
2020-10-16 1945.402 INFO 2508763 --- [ scheduling-1] p.a.a.core.CollectorService:
Expected calls: 26400.0
Completed calls: 26400.0
Completed calls rate: 100.0%
Success rate: 98.79924%
Effectiveness rate: 98.79924%

Jak widać efektywność wzrasta, działanie w pierwszych minutach jest gorsze zapewne przez cold start JVMa, z każdą kolejną minutą jednak jest lepiej. Zdziwiło mnie strasznie jednak to, że completedCalls == expectedCalls, uruchamiając aplikację lokalnie mam tutaj wynik rzędu 70-80% skuteczności. O ile więc nic nie przeoczyłem, oznacza to jedno, zapytania wykonują się w odpowiednim czasie. Pytanie więc...

  • (Wielce nieprawdopodobne) czy jest możliwe, że jedno zapytanie wykonuje się cześciej niż inne? oczekuję 20 wyników, 10 z query X i 10 z query Y, niemożliwe jest że query X wykonało się 5 razy a query Y 15 razy, co powoduje że ``completedCalls == expectedCalls` przez kilka minut działania aplikacji?
  • Wykonuje co 3 sekundy 120 zapytań, każde z nich zwraca maksymalnie 60 elementów. Po inicjalizacji mam więc 7200 elementów w mapie initialItems, w ciągu dnia przybędzie tutaj max 100-200 elementów tak wiec to pomijam. Każdy subscribe() oznacza więc sprawdzenie czy jakikolwiek z maksymalnie 60 elementów w danym response nie znajduje się w tych 7200. Teoretycznie, taki subscribe() będzie wołany 120 razy co 3 sekundy. Czy to brzmi jak bottleneck?
  • Dalej w kolejce jest już tylko publishEvent(), który niesie jeden znaleziony element i przed wysłaniem go dalej RESTem, ustawia mu date jako LocalDateTime.now(), trochę to nie miarodajne bo mam timestamp nie odebrania elementu a jego wysłania. W każdym razie, publikacja Springowego eventu i zrobienie setReceivedAt() raczej nie stanowi już problemu, szczególnie że do tego dochodzi 100-200 razy na dzień.

**Edit#
**
Chciałem policzyć czas spędzony na processResponse()... dopisałem coś takiego

    private void processResponse(final ApiResponse response) {
        final var start = System.nanoTime();
        response.collectItems().forEach(item -> {
            if (!initialItems.containsKey(item.getId())) {
                initialItems.put(item.getId(), item);
                eventPublisher.publishEvent(new NewItemEvent(this, item));
            }
        });
        final var time = System.nanoTime() - start;
        processingTimes.add(time);
    }

    private final List<Long> processingTimes = new CopyOnWriteArrayList<>();

I wewnątrz funkcji do analizy działania programu

final float processingTime = processingTimes.stream().mapToLong(i -> i).sum();
final float processingTimeSeconds = processingTime / 1_000_000_000;

Póki co lokalnie, ale po 2 minutach działania programu mam coś takiego

Time spent on processing: 0.0222198seconds
Processing time recordings: 4361

Co ciekawe, wykonanych zapytań miałem w tym momencie 4800, dlaczego więc zapisanych czasów wykonania funkcji processResponse() miałem o ponad 400 mniej? Oznacza to, że 400 responseów nie zostało przeprocesowanych, dlaczego?

#edit2:

Na VPSie liczba wywołań processResponse() jest taka sama jak liczba wywołań API... ale po 10 minutach działania aplikacji czas spędzony wewnątrz tej metody to < 0.5 sekundy.

0

Co ciekawe, wykonanych zapytań miałem w tym momencie 4800, dlaczego więc zapisanych czasów wykonania funkcji processResponse() miałem o ponad 400 mniej? Oznacza to, że 400 responseów nie zostało przeprocesowanych, dlaczego?

miałem 2 dni temu taki przypadek, w skrócie kod to mniej więcej coś takiego:

public Mono<Result> sendSomeging() {
 return webClient.post()
                .uri(endpoint)
                .body(Mono.just(new SomeBody(), SomeBody.class))
                .retrieve().flatmap(clientResponse -> {
                    if (clientResponse.statusCode().isError()) {
                    return clientResponse.bodyToMono(ErrorResponse.class)
                              .map(error -> new ErrorResult(error))
                    } else {
                     return clientResponse.bodyToMono(GoodResponse.class)
                               .map(response -> new GoodResult(response))
                  }
    });
}
....
Flux.range(0, 50).flatMap(integer -> sendSomegint()).collectList(). ... // oczywiście coś dalej co wywołuje strumień

i z jakiegoś powodu dostawałem listę tylko obiektów tylko z poprawnym responsem, możliwe że gdzieś indziej był jakiś mój błąd bo pierwszy raz korzystałem z WebClient, ale chodzi mi o to że sendSomeging() z jakiegoś powodu zwracał mi puste Mono lub może nawet Mono.error() którego jakoś nie przykazało dalej, sprawdził bym to w twoim przypadku jeżeli brakuje ci zapytań.

wiem że masz .onErrorReturn(ApiResponse.emptyResponse());, ale sprawdził bym też w razie czego te puste

1

@Gazel: Tu masz chyba trochę inny problem, którego objawy dość ładnie poszywają się pod typowy brak wydajności. Jednak na początek 2 pytania:

  1. Czy żądania są wysyłane w tym samym czasie ± kilka milisekund, czy masz to rozłożone w czasie?
  2. Czy dane przychodzące z zewnątrz są podobne objętościowo?

Możliwe, że dociążasz maszyny „punktowo” w czasie i jednocześnie jedna z maszyn dostaje do przetworzenia znaczne ilości danych, gdy inne mają „luzik”.

1 użytkowników online, w tym zalogowanych: 0, gości: 1