Pobieranie danych z wielu API jednocześnie

0

Mam takie zadanie:
Użytkownik podaję listę parametrów, a aplikacja w określonym interwale czasowym dla każdego parametru wykonuje zapytanie REST do zewnętrznego API. Określone wyniki chcę następnie publikowac poprzez Spring Events i odbierać w innym serwisie. Chciałbym się upewnić co do poprawności mojego rozwiązania.

Mam sobie pojedynczego workera, który jest odpowiedzialny za wykonanie zapytania, przefiltrowanie i obublikowanie wynikow.

@RequiredArgsConstructor
public class Worker implements Runnable {
    private final RestClient client;
    private final ApplicationEventPublisher publisher;
    private final String query;

    @Override
    public void run (){
        final var response = client.invoke("external-service.com/api/stuff?" + query);
        final var filteredResponse = filter(response);
        publisher.publishEvent(new MyEvent(this, filteredResponse ));
    }
    
    private Response filter(final Response response) {
     // do some filtering here
    }
}

No i serwis który buduje mi tych workerów a nastepnie ich uruchamia

@Service
public class WorkerService {

    private final Set<Worker> workers = new HashSet<>();

    public WorkerService(final QueryRepository queryRepository, final RestClient restClient, final ApplicationEventPublisher eventPublisher) {
        queryRepository.getSearchQueries().forEach(query -> workers.add(new Worker(restClient, eventPublisher, query)));
    }

    @PostConstruct
    private void initWorkers() {
        final var executors = Executors.newScheduledThreadPool(workers.size());
        workers.forEach(worker -> executors.scheduleAtFixedRate(worker, 0, 5, TimeUnit.SECONDS));
    }
}

No i teoretycznie wszystko działa, zastanawia mnie jednak czy aby na pewno ;) Obecnie operuje na kilku query, co gdy pojawi się ich 500, 1000, albo więcej?

Dodatkowo, chcę zmniejszych interwal z 5 do powiedzmy 1 sekundy. Samo zadanie workera jednak może trwać dłużej, pojedyncze query może wymagać kilku zapytań REST (wyniki są pageowane). Czy w takim przypadku kolejne wywołanie danego workera odbędzie sie 1 sekunę po zakończeniu poprzedniego zadania? Czy co sekundę będzie odpalana metoda run(), bez względu na to że poprzednie wywołanie jeszcze trwa?

Poza tym, wszystkie instancje workera korzystają z tego samego RestClient, który ma wstrzyknięty RestTemplate - czy to nie będzie problem przy większej ilości workerów?

1
  1. wg mnie to przypadek z grupy "czego nie robić w mikroserwisach" czy "microservice disaster", tak nazywają się prezentacje na YT.
    Na poziomie strategicznym ZUPEŁNIE przepracować projekt.
    O ile w monolicie można liczyć, ze dostaniesz wyniki z 500 wywołań, to w uS nie tylko wydajność kopie, ale szansa, ze wszystkie 500 się uda dąży do zera (0.999 ^ 500)

  2. Na poziomie taktycznym są jakieś asynchroniczne klienty, na springu się świat nie kończy

0
AnyKtokolwiek napisał(a):
  1. wg mnie to przypadek z grupy "czego nie robić w mikroserwisach" czy "microservice disaster", tak nazywają się prezentacje na YT.
    Na poziomie strategicznym ZUPEŁNIE przepracować projekt.
    O ile w monolicie można liczyć, ze dostaniesz wyniki z 500 wywołań, to w uS nie tylko wydajność kopie, ale szansa, ze wszystkie 500 się uda dąży do zera (0.999 ^ 500)

  2. Na poziomie taktycznym są jakieś asynchroniczne klienty, na springu się świat nie kończy

ad 1. Nie ma mowy o żadnych mikroserwisach, mam jeden monolit która uderza do zewnętrznego API

Druga sprawa, nie jest problemem część zapytań się nie uda, o ile jest to błąd rzędu 2-3%.

1

Teoretycznie dobry case na programowanie reaktywne, ponieważ przez większość czasu Twoje wątki czekają. Może zamiast pollować te endpointy mógłbyś dostawać dane, czyli odwrócić integrację z pull na push?
Rozwiązanie z własną pulą wątków też jest Ok, ale zastanów co się stanie, jeśli któryś z endpointow odpowiada bardzo długo lub jest niedostępny - w tym drugim przypadku masz puste przebiegi. Zastanowiłbym się też nad konfiguracja timeoutów, czyli Rest Client per endpoint.

0
Charles_Ray napisał(a):

Teoretycznie dobry case na programowanie reaktywne, ponieważ przez większość czasu Twoje wątki czekają. Może zamiast pollować te endpointy mógłbyś dostawać dane, czyli odwrócić integrację z pull na push?
Rozwiązanie z własną pulą wątków też jest Ok, ale zastanów co się stanie, jeśli któryś z endpointow odpowiada bardzo długo lub jest niedostępny - w tym drugim przypadku masz puste przebiegi. Zastanowiłbym się też nad konfiguracja timeoutów, czyli Rest Client per endpoint.

Nie chciałbym się pakować w programowanie reaktywne, którego nie znam, póki co wolałbym zostać przy wątkach.

Jeżeli jeden endpoint nie odpowiada to raczej reszta też nie odpowiada, wszystkie strzały idą do tego samego API, zmienia się jedynie query params czyli. external-service.com/api/search/{query}. Jeżeli to chwilowa niedostępność to nie ma problemu, jeżeli jednak to już trwa x minut to moge wyemitować jakiś event typu 'Service unavailable' żeby poinformować użytkownika.

Co do timeoutów, chyba taki sam case jak wyżej, to są relatywnie szybkie operacje, które trwają mniej więcej tyle samo. Wyniki wracają w pageach po max 100 elementów i obsługuję to wewnątrz workera, czyli jest szansa że w ramach jednego wątku poleci kilka kolejnych zapytań REST i wtedy faktycznie praca takiego wątku będzie dłuższa niż innych. Tutaj muszę przeanalizować faktycznie dane jak to będzie działać, z tego co widziałem póki co to rzadko się zdarza coś takiego, ale jeżeli byłby to częstszy case, to wewnątrz workera musiałbym odpalić kolejne wątki na każdy page, inaczej wyniki dla danego zapytania miałbym rzadziej niż dla pozostałych.

Czy poza konfiguracją timeoutów, jest powód by robić osobnego clienta dla każdego endpointu? Jak wspomniałem, client ma wstrzyknięty RestTemplate, czyli obecnie mam 1 instancję RestClient i 1 instancje RestTemplate współdzieloną przez całą pulę workerów.

No i jak dokładnie zachowa się to:

workers.forEach(worker -> executors.scheduleAtFixedRate(worker, 0, 5, TimeUnit.SECONDS));
  • Wołane jest Worker::run
  • Worker odpytuje API
  • Worker dostaje odpowiedź
  • Mija 5 sekund, cykl się powtarza

Czy te 5 sekund zaczyna byc odliczane już przy odpaleniu Worker::run?

0

Ok, wygląda na to że moje rozwiązanie działa. W ramach testów odpalam jednak jedynie 50 workerów (czyli 50 wątkow), a ich docelowa liczba to około 1000.
Wiem, że to trochę wróżenie z fusów, ale czego się spodziewać po dodaniu tylu workerów? Wiem, że mogę po prostu dopisać i sprawdzić, ale nawet jeżeli aplikacja będzie działać, to przy takiej ilości danych ciężko stwierdzić czy to działa tak jak powinno, tzn. czy workery faktycznie uderzają do API. W planach mam napisać jakiegoś obserwatora, który co jakiś czas będzie robić healthcheck workerów, ale póki co, czysto teoretycznie:

  • jakiej maszyny / łącza potrzeba do wykonania 1000 jednoczesnych zapytań HTTP co sekundę?
  • czy faktycznie możliwe jest w 1 aplikacji osiągnąć coś takiego, czy może lepiej rozbić to na kilka osobnych instancji tego serwisu, które wyniki publikują na jakieś MQ?
  • co jeszcze może się nie udać? Póki co walczę z tym, że workery przestają pracować, prawdopodobnie leci jakiś silent exception i z tego co wyczytałem to może zabić wątek. Czy przy dodaniu 20x tyle wątków mam się spodziewać 20x tylu problemów?
2

1000 jednoczesnych zapytań / 1000 HTTP na sekundę to nie jest problem.
1000 równoczesnych wątków to już jest problem. Duże zużycie pamięci, a do tego wiele defaultowo instalowanych linuxów ma nieco mniejszy limit... i umrze.

Bym powiedział, że jednak trzeba będzie się z reaktywnym podejściem zaprzyjaźnić. Alternatywnie płacić. Albo męczyć się w konfigurowanie OS żeby dał radę.

0

W javie nowy klient http jest juz nie blokujacy dotego CompletableFuture i śmiga

0

scheduleAtFixedRate

Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.

Czyli:

  • jeśli Worker rzuca exception (a rzucać będzie) to przestaje być odpalany cyklicznie -> trzeba zadbać o exception handling
  • jeśli wykonanie akcji Workera przedłuży się, następne jej odpalenie poczeka aż bieżące się skończy ale nie ma tutaj żadnych gwarancji odnośnie tego, że nastąpi to dokładnie po <czas zakończenia poprzedniego> + period -> jeśli jest to istotne to warto rozważyć scheduleWithFixedDelay
1
jarekr000000 napisał(a):

1000 równoczesnych wątków to już jest problem. Duże zużycie pamięci, a do tego wiele defaultowo instalowanych linuxów ma nieco mniejszy limit... i umrze.

Robiłem eksperyment, w którym utworzyłem 5000 wątków i każdy z nich wykonywał zapytanie http. Nie było z tym żadnego problemu. Pamięci pobrał około 1GB. Robiłem to na windowsie. Na Linuxie rzeczywiście dostrajałem liczbę handli, ale nie sądzę, że chodziło o liczbę wątków, a o liczbę nasłuchujących połączeń.

Ale problem jest inny. Kolega najwyraźniej chce przygotować atak DoS na dostawcę owego serwisu. :) Nie wyobrażam sobie serwisu, który byłby zadowolony, że ktoś mu chce zapchać pulę połączeń zamiast czekać w kolejce razem z innymi użytkownikami.

No bo właśnie większy problem stwarza obsługa wielu jednoczesnych połączeń po stronie serwera niż klienta. Portów mamy wszak tylko 65535, a to i tak tylko w teorii. A w praktyce do 40K na tomcacie udało mi się dojść :)

Eksperyment, o którym wspomniałem, miał szerszy zakres. Testowałem wiele różnych klientów i jak sobie radzą z równoległymi zapytaniami. Te frameworki, które robią to kompleksowo (np. finnagle), rezerwują kilka wątków na jeden endpoint. I to jest wg mnie idealne podejście. Nie strzelać równolegle tysiąc razy, tylko kilka. Na przykład 5. No i niech te 5 wątków ściąga zadania z puli bez przerwy. 5 wywołań na raz i tak może przekroczyć próg gościnności tamtego serwera, ale jeżeli chcemy ich tak docisnąć...

0

Nie chodzi o żaden atak, niemniej jednak 1k req / 1s faktycznie będzie przesadą i szybko spowoduje timeouty, dostawca usługi na aż tyle nie pozwoli, maksymalna liczba to coś około 15k / minute, tak więc muszę trochę zwiększyć interwał czasowy między zapytaniami.

W każdym razie, mam inny problem i nie potrafie zlokalizować źródła błędu.
API zwraca mi listę takich obiektów

public class ApiItem {
    private String id;
    private String name;
    private String receivedAt;

    //getter,setter

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ApiItem item = (ApiItem) o;
        return Objects.equals(id, item.id) &&
                Objects.equals(name, item.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, name);
    }
}

Każdy worker dostaje query które odpytuje i jego praca wygląda tak, obecnie w interwale wynoszącym 1 sekundę:

  • jeżeli to 1wsza iteracja, pobiera wszystkie elementy z API i zapisuje w initialItems
  • jezeli to kolejna iteracja, pobiera wszystkie elementy z API, sprawdza czy doszło coś nowego i jezeli tak to zapisuje to do kolekcji foundItems. Kod powinien wyjaśnić o co chodzi
public class Worker implements Runnable {
    private final RestClient client;
    private final String query;

    public Worker(RestClient client, String query) {
        this.client = client;
        this.query = query;
    }

    private final Map<String, ApiItem> initialItems = new ConcurrentHashMap<>();
    private final Map<String, ApiItem> foundItems = new ConcurrentHashMap<>();
    private boolean initialized = false;

    @Override
    public void run() {
        if (!initialized) {
            initialItems.addAll(client.invoke(query));
            initialized = true;
        } else {
            final var currentItems = client.invoke(query);
            foundItems.putAll(findDifference(currentItems));
        }
    }

    private Map<String, ApiItem> findDifference(final Map<String, ApiItem> currentItems) {
        final var itemsUnion = new ConcurrentHashMap<>(this.initialItems);
        itemsUnion.putAll(this.foundItems);
        final var difference = com.google.common.collect.Maps.difference(itemsUnion, currentItems);
        return difference.entriesOnlyOnRight();
    }
}

public Map<String, ApiItems> getFoundItems() {
        return foundItems;
}

Pomijam fakt, że nowo znalezione przedmioty są emitowane jako Event, mapa foundItems powinna teoretycznie zawierać wszystkie nowe przedmioty znalezione w API od momentu zakończenia 1wszej iteracji workera. Problem w tym, że nie zawsze tak jest. Scenariusz wygląda następująco - aplikacja działa godzinę - dwie na VPSie, teoretycznie wszystko jest ok, ale wiem że API powinno zwrócić nowe elementy, a nie widze ich w swojej aplikacji. Odpalam tę samą aplikację lokalnie, gdzie stawiam breakpoint w interesującym mnie workerze i widzę, że faktycznie, w odpowiedzi z API są dodatkowe elementy, których jednak nie sposób znaleźć w aplikacji działającej na VPSie. W logach brak śladu jakichkolwiek błędów rzuconych w czasie gdy w API pojawiły się nowe przedmioty, z jakiegoś jednak powodu prawdopodobnie funkcja findDifference() nie znalazła nic nowego dla kilku otrzymanych przedmiotów. Dodam również że ID jest unikalne w zewnętrznym systemie.

Zdaje sobie sprawę, że może być ciężko stwierdzić co się dzieje na podstawie tych informacji, ale każdy potencjalny trop jest mile widziany.

0

Jesteś pewien, że w logach masz wszystko co aplikacja wypluwa? Jeśli wykonanie metody run() skończy się błędem, to nie będzie ono już ponawiane (zgodnie z tym jak działa scheduleAtFixedRate), co może dawać wrażenie braku zmiany stanu (bo zakładam że obiekty typu Worker gdzieś sobie żyją także poza executorem i można je o stan odpytywać). Takie błędy z executorów zwykle lądują na stderr, więc jeśli nie masz dobrze skonfigurowanego logowania to możesz je przegapić. Dodaj try .. catch w metodzie run() i loguj błędy explicite.

0
damianem napisał(a):

Jesteś pewien, że w logach masz wszystko co aplikacja wypluwa? Jeśli wykonanie metody run() skończy się błędem, to nie będzie ono już ponawiane (zgodnie z tym jak działa scheduleAtFixedRate), co może dawać wrażenie braku zmiany stanu (bo zakładam że obiekty typu Worker gdzieś sobie żyją także poza executorem i można je o stan odpytywać). Takie błędy z executorów zwykle lądują na stderr, więc jeśli nie masz dobrze skonfigurowanego logowania to możesz je przegapić. Dodaj try .. catch w metodzie run() i loguj błędy explicite.

Zrobiłem dokładnie to co piszesz, zapomniałem o tym wspomnieć pisząc poprzedni post. Co więcej, mam VPSa bez CI/CD, wbijam przez ssh, robie git pull mvn clean install. No i się okazało, że zapomniałem zaciągnąć zmian z tym fixem, dlatego lokalnie wszystko działało... Tak więc problem solved.

W każdym razie, poczytałem trochę o WebClient i spłodziłem coś takiego

  @PostConstruct
   public void execute() {
       Executors.newSingleThreadScheduledExecutor()
               .scheduleAtFixedRate(() ->
                               queryRepository.getSearchQueries().forEach(this::collectData),
                       0,
                       3,
                       TimeUnit.SECONDS);
   }

   private void collectData(final SearchQuery query) {
       final var iteration = new int[]{0};
       invoke(query, 0).expand(res ->
               this.handleResponse(res, query, iteration))
               .flatMap(response -> Flux.fromIterable(response.collectItems()))
               .subscribe(this::publish);
   }

   private Mono<ApiResponse> handleResponse(final ApiResponse response, final SearchQuery searchQuery, final int[] iteration) {
       if (hasNextPage(response)) {
           return invoke(searchQuery, calculateOffset(++iteration[0]));
       } else {
           return Mono.empty();
       }
   }

   private Mono<ApiResponse> invoke(final SearchQuery query, final int offset) {
       final var url = offset == 0 ? query.toUrlParams() : query.toUrlParamsWithOffset(offset);
       return doInvoke(url).onErrorReturn(ApiResponse.emptyResponse());
   }

   private Mono<ApiResponse> doInvoke(final String endpoint) {
       return webClient.get()
               .uri(endpoint)
               .retrieve()
               .bodyToMono(ApiResponse.class);
   }

Wydaje się dużo prostsze i lżejsze niż ostatnio, zastanawia mnie jednak kilka kwestii.

  • Tworzę newSingleThreadScheduledExecutor() który co 3 sekundy odpala nie blokującą operacje, oznacza to że poprzedni request się jeszcze nie skończył a ja już mogę wołać kolejny?
  • Jak wspomniałem wcześniej, pojedyncze błędy mnie nie obchodzą, mogę więc zrobić coś takiego onErrorReturn(ApiResponse.emptyResponse()). Zauważyłem jednak, że przy użyciu WebClient tych błędow jest dużo więcej, w dosłownie 30 sekund działania poleciało +/- 15 błędów

java.net.SocketException: Connection reset

albo

reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response

Czemu? To normalne przy reaktywnym podejściu?

1 użytkowników online, w tym zalogowanych: 0, gości: 1