Endpoint agregujący informacje o systemie linii lotniczych

0

Hej
takie oto zadanie rekrutacyjne. Osobiście nie podchodzę, ale jestem ciekawa jak byście to zrobili i czego użyli.

Jest API REST które wystawia 3 operacje, np linie lotnicze, klienci, dostawcy. Każdy ten endpoint zwraca listę wyników i pobiera jeden parametr na podstawie którego generuje wyniki.
Przykładowo:
/linie?linie=L1,L2
/klienci?klienci=k1,k2,k3

Masz napisać endpoint agregujący wywołania powyższych 3 endpointów, którzy ma taką sygnaturę:
/agregator?linie=l1,l2&klienci=k1,k2&dostawcy=d1,s2

który pod spodem wywoła te 3 endpointy.

część 1, Endpoint
zaimplementuj ten endpoint tak, aby zwracać odpowiedź do klienta, gdy otrzymano wszystkie odpowiedzi z tych wszystkich 3 podanych endpointów.

cześć 2, Grupowanie wywołań po 5
zaimplementuj tak, aby gdy masz dla danego endpointu min 5 wartości jego paramteru np ?linie=l1,l2,l3,l4,l5
to dopiero wtedy wywołujesz ten endpoint. Jeśli masz mniej np 4 to go jeszcze nie wywołujesz.
Dopiero gdy pojawi się 5ty wtedy wywołujesz dany endpoint.

Przykład:
klienci api agregacyjnego wywołali:

klient1:
/agregator?linie=l1,l2&klienci=k1,k2&dostawcy=d1,d2

Tu na razie nie wywołujesz tych 3 endpointów, gdyż dla żądnego z nich nie masz min 5 wartości parametru. Są po 2.

Gdy następnie klient2 wywołał:

/agregator?linie=l7,l8,l9&klienci=k4&dostawcy=d8,s9

Wówczas agregator może wywołać endpoint /linie, tylko ten, gdzyż ma już 5 warości: l1,l2,l3,l7,l8

Część 3, timeouty
jak widać, dotychczasowa implementacja, ma wadę. Gdy klient wywoła
/agregator?linie=l1,l2,l3,l4,l5&klienci=k1,k2&dostawcy=d1,s2
to wywołamy endpoint /linie, ale nigdy nie wywołamy pozostałych.

Należy tak zmienić impl, aby gdy po upływie zdefiniowanego czasu np 5s, gdy dla danego endpointu nie mam zagregowanych min 5 wartości parametru, to mimo to wywołujemy ten endpoint.

Przykład
Klient1:
/agregator?linie=l2,l2&klienci=k1,k2&dostawcy=d1,s2

Nic się nie dzieje. Jeśli upłynie 5s od otrzymania tego requestu, to mimo braku 5 wartości dla każdego z endpointów, wywołujemy je i zwracamy wyniki do klienta.

Moje pytanie, jak byście to zaprojektowali i czego użyli: core api, jakichś zewn frameworków, kolejek itp?

Czas na wykonanie to około kilku godzin.

1
  1. źle się to czyta
  2. nawet jakby sie dobrze czytało, nie będę robił
  3. endpointy wołające endpointy to jak a czarnych przykładów "jak nie robic uS"
4

a) na pewno nie użylibyśmy javy - bo to by było smutne
b) jakieś reactive streamy + effect system i zadanie jest trywialne (reactor, fs2, ZIO/Zstream),

0

Ten przykład z czekaniem na 5 wartości w REST ma ktoś jakiś biznesowy przykład uzasadniający? Bo nie potrafię sobie wyobrazić :(

0
alicja.ewska napisał(a):

Moje pytanie, jak byście to zaprojektowali i czego użyli: core api, jakichś zewn frameworków, kolejek itp?

Czas na wykonanie to około kilku godzin.

Dużo zależy od scenariusza, ale w takim najprostszym podejściu dowolny mechanizmu streamowania pozwalający na tzw. windowing tutaj da radę. Np. kombo vert.x + RxJava3. Nie lubię RxJavy, ale wiem, że dałaby radę w tym scenariuszu.

ZrobieDobrze napisał(a):
  1. endpointy wołające endpointy to jak a czarnych przykładów "jak nie robic uS"

Nieprawda. Ostatnio koncepcja BfF ("backend for frontend") robi się coraz popularniejsza, a sama idea mikroserwisów jest taka, że jeden mikroserwis może sobie wołać inne.

Dregorio napisał(a):

Ten przykład z czekaniem na 5 wartości w REST ma ktoś jakiś biznesowy przykład uzasadniający? Bo nie potrafię sobie wyobrazić :(

To nie jest przykład biznesowy, ale raczej techniczny, w którym wymagania biznesowe grają dużą rolę.

  1. Wyobraź sobie endpoint z użytkownikami, np. /entity?ids=... (ważne, że przyjmowana jest lista). Endpoint ten woła np. bazę danych.
  2. Teraz wyobraź sobie, że dużo aplikacji non-stop woła ten endpoint, każdy z nich pojedyńczym ID. W rezultacie dostaniesz scenariusz, w którym bardzo szybko dojdzie do wysycenia puli połączeń, i przychodzące requesty będą musiały czekać, aż jakieś połączenie się zwolni - typowe starvation.

No i teraz masz kilka opcji jak to ogarnąć.

  1. Naturalnie zawsze, po prostu ZAWSZE, w takiej sytuacji ktoś rzuci "użyjmy cache'a". Tyle tylko, że cache nie zawsze się nadaje z względów biznesowych. Wystarczy, że masz np. jakiś zapis transakcji, gdzie każda zmiana generuje nowy rekord z nowym ID, i cache niewiele pomoże - jedynie będziesz w pamięci miał dużą ilość danych, która nikogo nie obchodzi.
  2. Drugą opcją jest próba skalowania, tj. zwiększenie puli połączeń. No niestety istnieją bazy danych, które nie lubią tego typu sztuczek.

Zostaje więc po prostu zamiana wielu pojedynczych zapytań na jeden duży, ot i tyle. Masz rację w tym, że tutaj takie np. Kafka Streams świetnie sobie z tego typu problemami radzi, natomiast nie widzę powodu, dla którego to endpoint nie miałby tego też robić.

0

Czy to nie jest case pod użycie ConpletableFeature + walidacja ?

0

@simonides007: Tak, tylko przez deficyty w Javie, to nie jest fajne narzędzie (brak pattern matching, checked exceptions). Może da się to napisać sensowniej, ale wyszło mi coś takiego:

package org.example;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class AsyncHelloWorld {
    private static final Random random = new Random();

    public String getGreetings(String... greetingParams) {

        var completableFutures = Arrays.stream(greetingParams).
                map(AsyncHelloWorld::getCapitalized)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[greetingParams.length]);

        try {
            CompletableFuture.allOf(completableFutures).get();
            return Arrays.stream(completableFutures).map(AsyncHelloWorld::futureValueOrEmpty).collect(Collectors.joining(" "));

        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static CompletableFuture<String> getCapitalized(String word) {
        return CompletableFuture.supplyAsync(() -> {
            int delay = random.nextInt(500);
            try {
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(word + " " + delay);
            return word.toUpperCase();
        });
    }

    private static String futureValueOrEmpty(Future<String> future) {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            return "";
        }
    }
}
package org.example;

public class Main {
    public static void main(String[] args) {
        AsyncHelloWorld asyncHelloWorld = new AsyncHelloWorld();
        String greetings = asyncHelloWorld.getGreetings("Good", "morning", "Ala", "Ola", "As");
        System.out.println(greetings);
    }
}
0

@some_ONE: ok, myślałem że pisząc że masz zupełnie inne wyobrażenie to że negujesz to rozwiązanie.

@piotrpo: bardzo spoko rozwiązanie

0

Jeżeli Java, to rzeczywsiscie Reactor etc.

CompletableFuture - Niby to zadziała ale pierwsze pytanie przy review będzie - no ok, nie robisz tego reaktywnie, czyli masz jakiś blokujący server z pula wątków, noi teraz CompletableFuture na czym będzie chodziło? Na tej samej puli wątków? Czy stworzysz kolejną pulę wątków? Niby nowa pula wątków jest lepszym (z dwojga złego) rozwiązaniem, ale oba są słabe.

A jeżeli nie chcesz się uczyć 20 bibliotek, 2 typów programowania i 8 frameworkow, to zrób to w Golang z Goroutine i po sprawie. Nagle się okazuje, że nie musisz znać tych wszystkich rzeczy i proste problemy rozwiązuje się w prosty sposób.

0

Można to napisać na wiele sposobów. Wykorzystałbym RxJavę lub Reactor i jakiś framework do serwera HTTP z API RESTowym. Np. Spring Boot (wtedy mamy Reactor out of the box), Micronaut lub jakiś inny. RxJavą ogarniesz wszystkie wymagania dot. logiki tego zadania. No i w zadaniu jest błąd, bo w przykładzie nie ma l3 a jest ta wartość wspominana w treści. Trochę dziwi mnie wymaganie określające to, że request jednego klienta ma wpływ na request kolejnego. Usługi RESTowe powinny być raczej bezstanowe. Żeby spełnić takie wymaganie, trzeba trzymać jakiś stan w aplikacji, żeby to mogło tak działać. Ten stan pewnie powinien być w jakimś cache i kiedy ma nastąpić inwalidacja tego cache? No i czy lista tych parametrów do endpointów ma jakiś limit? Bo jeśli nie, to przy kolejnych wywołaniach będzie ona rosnąć w nieskończoność. Dużo rzeczy jest tu napisanych, ale zadanie jest napisane w sposób chaotyczny, nieprecyzyjny i z błędem. Podyskutowałbym z autorem przed rozpoczęciem pisania czegokolwiek.

4

Rozwiązanie w Rust, nie w Java, bo serio w Java to zajęłoby mi dłużej i nie mam na bieżąco.
Zajęło mi łącznie ok. 30 minut aby mieć działające rozwiązanie obsługujące przypadki 1-3 , potem jeszcze trochę czasu aby upiększyć, dodać komentarze itp.
Technikalia związane z wysyłaniem/odbieraniem żądań przez REST celowo pozostawiłem niezrobione, bo najciekawsza była współbieżność ;)

Potrzebne biblioteki: tokio, tokio-stream, futures.

use std::future::Future;
use std::time::Duration;

use futures::stream::FuturesOrdered;
use tokio::join;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;

/// Reprezentuje żądanie. Dane w żądaniu mogą być dowolne.
/// Wstawiłem inta aby można było śledzić czy nie pomieszaliśmy odpowiedzi tj.
/// czy właściwy klient dostaje wyłącznie odpowiedzi na swoje żądania.
#[derive(Default, Debug, Clone)]
struct Request(u128);

/// Odpowiedź zawierająca payload. Zawartość tego obiektu nie ma znaczenia dla logiki naszego kodu.
/// Oprócz ID żądania mamy tu string dla łatwiejszej identyfikacji endpointu.
#[derive(Default, Debug)]
struct Response(u128, &'static str);

/// Poniższe trzy funkcje realizują wysłanie żądań do endpointów i odbieranie odpowiedzi.
/// Zakładamy, że odpowiedzi są w tej samej kolejności co żądania.
/// Odpowiadamy id żądania oraz nazwą endpointa.
/// W docelowym rozwiązaniu tu trzeba by wywołać jakąś bibliotekę do REST, gRPC itp.
async fn send_request_to_endpoint_a(req: Vec<Request>) -> Vec<Response> {
    req.into_iter().map(|req| Response(req.0, "A")).collect()
}
async fn send_request_to_endpoint_b(req: Vec<Request>) -> Vec<Response> {
    req.into_iter().map(|req| Response(req.0, "B")).collect()
}
async fn send_request_to_endpoint_c(req: Vec<Request>) -> Vec<Response> {
    req.into_iter().map(|req| Response(req.0, "C")).collect()
}

/// Buforuje żądania do jednego endpointu.
/// Wystawia API identyczne jak ww funkcje do bezpośredniego uderzania w endpointy,
/// więc bardzo łatwo dodać buforowanie do istniejącego kodu.
struct Bufferer {
    // Wejście do kolejki na żądania. 
    // Wraz z każdym żądaniem przechowywany jest kanał zwrotny do przekazania odpowiedzi do klienta.
    input: mpsc::Sender<(Request, oneshot::Sender<Response>)>, 
}

impl Bufferer {
    /// # Parametry:
    /// `batch_size` - jak się zbierze tyle żądań, to wypychamy je do endpointu
    /// `timeout` - jak upłynie timeout a nie uzbiera się wystarczająco dużo żądań, to też wypychamy
    /// `target_fn` - funkcja docelowa realizująca żądanie
    pub fn new<TargetFn, Res>(batch_size: usize, timeout: Duration, target_fn: TargetFn) -> Bufferer
    where
        // to trochę brzydkie w Rust jest, i nie, nie napisałem tego z głowy; 
        // dodałem te wszystkie Send/Sync/'static jak mnie kompilator upomniał że trzeba dodać :D
        TargetFn: Fn(Vec<Request>) -> Res + Send + Sync + 'static,  
        Res: Future<Output = Vec<Response>> + Send,
    {
        // Kolejka przechowująca żądania.
        // Dla nieznających Rusta, konwencja jest taka, że tx to koniec wysyłający (sender), 
        // rx to koniec odbierający (receiver).
        let (input_tx, input_rx) = mpsc::channel(batch_size);

        // Odpalamy task w tle przypięty do wyjścia kolejki, który będzie brał żądania grupami
        // i każdą grupę wysyłał do docelowego endpointu:
        tokio::spawn(async move {
            let stream = ReceiverStream::new(input_rx).chunks_timeout(batch_size, timeout);
            tokio::pin!(stream);  // trzeba przypiąć obiekt aby nie dało się przesuwać go w pamięci, inaczej kolejna linia się nie skompiluje
            while let Some(batch) = stream.next().await {
                let (requests, return_channels): (Vec<Request>, Vec<oneshot::Sender<Response>>) =
                    batch.into_iter().unzip();
                let responses = target_fn(requests).await; // <- tu uderzamy w endpoint
                for (response, return_channel) in responses.into_iter().zip(return_channels) {
                    let _ = return_channel.send(response);
                }
            }
        });

        // Wejście kolejki wrzucamy do pola, aby send mógł wrzucać requesty do kolejki:
        Bufferer { input: input_tx }
    }

    /// Wysyła hurtowo żądania i zwraca odpowiedzi w tej samej kolejności
    pub async fn send(&self, requests: Vec<Request>) -> Vec<Response> {
        // Dla każdego żądania tworzymy dodatkowo kanał zwrotny, którym otrzymamy odpowiedź
        // i wrzucamy żądanie razem z kanałem zwrotnym do kolejki wejściowej
        let responses = requests.into_iter().map(|req| async move {
            let (return_tx, return_rx) = oneshot::channel();
            self.input.send((req, return_tx)).await.unwrap();
            return_rx.await.unwrap()
        });
        FuturesOrdered::from_iter(responses).collect().await
    }
}

/// Agreguje 3 endpointy
struct Aggregator {
    sender_a: Bufferer,
    sender_b: Bufferer,
    sender_c: Bufferer,
}

impl Aggregator {
    /// Wysyła żądania równolegle do 3 endpointów i czeka aż każdy z nich odpowie i zwraca
    /// wszystkie wyniki jednocześnie.
    pub async fn send(
        &self,
        a: Vec<Request>,
        b: Vec<Request>,
        c: Vec<Request>,
    ) -> (Vec<Response>, Vec<Response>, Vec<Response>) {
        join! {
            self.sender_a.send(a),
            self.sender_b.send(b),
            self.sender_c.send(c),
        }
    }
}

I teraz sposób użycia tego:

#[tokio::main]
async fn main() {
    let timeout = Duration::from_secs(1);
    let batch_size = 3;
    let aggregator = Aggregator {
        sender_a: Bufferer::new(batch_size, timeout, send_request_to_endpoint_a),
        sender_b: Bufferer::new(batch_size, timeout, send_request_to_endpoint_b),
        sender_c: Bufferer::new(batch_size, timeout, send_request_to_endpoint_c),
    };

    // To będzie czekać 1 sekundę, bo mamy za mało żądań.
    println!(
        "{:?}",
        aggregator
            .send(
                vec![Request(11)],
                vec![Request(21)],
                vec![Request(31), Request(32)]
            )
            .await
    );

    // Te 3 żądania od klientów pójdą natychmiast, bo sumarycznie mamy dokładnie po
    // 3 żądania dla każdego endpointu (tak, wiem, w zadaniu było 5, ale chciałem aby przykład był kompaktowy)
    println!(
        "{:?}",
        join! {
            aggregator.send(vec![Request(11)], vec![Request(21)], vec![]),
            aggregator.send(vec![Request(12), Request(13)], vec![], vec![Request(31), Request(32)]),
            aggregator.send(vec![], vec![Request(22), Request(23)], vec![Request(33)]),
        }
    )
}

Wyniki:

([Response(11, "A")], [Response(21, "B")], [Response(31, "C"), Response(32, "C")])
(([Response(11, "A")], [Response(21, "B")], []), ([Response(12, "A"), Response(13, "A")], [], [Response(31, "C"), Response(32, "C")]), ([], [Response(22, "B"), Response(23, "B")], [Response(33, "C")]))

Oczywiście można by to pewnie jeszcze trochę ulepszyć, np. funkcje do wysyłania żądania mogłyby zwracać Result aby obsługiwać błędy, niemniej nie będzie to już znacząco bardziej skomplikowane. Można by też trochę poszaleć i dla każdego endpointa zrobić inny typ żądanie i odpowiedzi, ale to już pozostawiam jako pracę domową.

No i oczywiście bardzo jestem ciekaw jak to będzie wyglądało w modern Java.
Jak komuś sie zechce zrobić wersję w Go, to też jestem ciekaw ;)

// edit: dla zwiększenia walorów edukacyjnych dodałem komentarze

1 użytkowników online, w tym zalogowanych: 0, gości: 1