Rozwiązanie w Rust, nie w Java, bo serio w Java to zajęłoby mi dłużej i nie mam na bieżąco.
Zajęło mi łącznie ok. 30 minut aby mieć działające rozwiązanie obsługujące przypadki 1-3 , potem jeszcze trochę czasu aby upiększyć, dodać komentarze itp.
Technikalia związane z wysyłaniem/odbieraniem żądań przez REST celowo pozostawiłem niezrobione, bo najciekawsza była współbieżność ;)
Potrzebne biblioteki: tokio, tokio-stream, futures.
use std::future::Future;
use std::time::Duration;
use futures::stream::FuturesOrdered;
use tokio::join;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
/// Reprezentuje żądanie. Dane w żądaniu mogą być dowolne.
/// Wstawiłem inta aby można było śledzić czy nie pomieszaliśmy odpowiedzi tj.
/// czy właściwy klient dostaje wyłącznie odpowiedzi na swoje żądania.
#[derive(Default, Debug, Clone)]
struct Request(u128);
/// Odpowiedź zawierająca payload. Zawartość tego obiektu nie ma znaczenia dla logiki naszego kodu.
/// Oprócz ID żądania mamy tu string dla łatwiejszej identyfikacji endpointu.
#[derive(Default, Debug)]
struct Response(u128, &'static str);
/// Poniższe trzy funkcje realizują wysłanie żądań do endpointów i odbieranie odpowiedzi.
/// Zakładamy, że odpowiedzi są w tej samej kolejności co żądania.
/// Odpowiadamy id żądania oraz nazwą endpointa.
/// W docelowym rozwiązaniu tu trzeba by wywołać jakąś bibliotekę do REST, gRPC itp.
async fn send_request_to_endpoint_a(req: Vec<Request>) -> Vec<Response> {
req.into_iter().map(|req| Response(req.0, "A")).collect()
}
async fn send_request_to_endpoint_b(req: Vec<Request>) -> Vec<Response> {
req.into_iter().map(|req| Response(req.0, "B")).collect()
}
async fn send_request_to_endpoint_c(req: Vec<Request>) -> Vec<Response> {
req.into_iter().map(|req| Response(req.0, "C")).collect()
}
/// Buforuje żądania do jednego endpointu.
/// Wystawia API identyczne jak ww funkcje do bezpośredniego uderzania w endpointy,
/// więc bardzo łatwo dodać buforowanie do istniejącego kodu.
struct Bufferer {
// Wejście do kolejki na żądania.
// Wraz z każdym żądaniem przechowywany jest kanał zwrotny do przekazania odpowiedzi do klienta.
input: mpsc::Sender<(Request, oneshot::Sender<Response>)>,
}
impl Bufferer {
/// # Parametry:
/// `batch_size` - jak się zbierze tyle żądań, to wypychamy je do endpointu
/// `timeout` - jak upłynie timeout a nie uzbiera się wystarczająco dużo żądań, to też wypychamy
/// `target_fn` - funkcja docelowa realizująca żądanie
pub fn new<TargetFn, Res>(batch_size: usize, timeout: Duration, target_fn: TargetFn) -> Bufferer
where
// to trochę brzydkie w Rust jest, i nie, nie napisałem tego z głowy;
// dodałem te wszystkie Send/Sync/'static jak mnie kompilator upomniał że trzeba dodać :D
TargetFn: Fn(Vec<Request>) -> Res + Send + Sync + 'static,
Res: Future<Output = Vec<Response>> + Send,
{
// Kolejka przechowująca żądania.
// Dla nieznających Rusta, konwencja jest taka, że tx to koniec wysyłający (sender),
// rx to koniec odbierający (receiver).
let (input_tx, input_rx) = mpsc::channel(batch_size);
// Odpalamy task w tle przypięty do wyjścia kolejki, który będzie brał żądania grupami
// i każdą grupę wysyłał do docelowego endpointu:
tokio::spawn(async move {
let stream = ReceiverStream::new(input_rx).chunks_timeout(batch_size, timeout);
tokio::pin!(stream); // trzeba przypiąć obiekt aby nie dało się przesuwać go w pamięci, inaczej kolejna linia się nie skompiluje
while let Some(batch) = stream.next().await {
let (requests, return_channels): (Vec<Request>, Vec<oneshot::Sender<Response>>) =
batch.into_iter().unzip();
let responses = target_fn(requests).await; // <- tu uderzamy w endpoint
for (response, return_channel) in responses.into_iter().zip(return_channels) {
let _ = return_channel.send(response);
}
}
});
// Wejście kolejki wrzucamy do pola, aby send mógł wrzucać requesty do kolejki:
Bufferer { input: input_tx }
}
/// Wysyła hurtowo żądania i zwraca odpowiedzi w tej samej kolejności
pub async fn send(&self, requests: Vec<Request>) -> Vec<Response> {
// Dla każdego żądania tworzymy dodatkowo kanał zwrotny, którym otrzymamy odpowiedź
// i wrzucamy żądanie razem z kanałem zwrotnym do kolejki wejściowej
let responses = requests.into_iter().map(|req| async move {
let (return_tx, return_rx) = oneshot::channel();
self.input.send((req, return_tx)).await.unwrap();
return_rx.await.unwrap()
});
FuturesOrdered::from_iter(responses).collect().await
}
}
/// Agreguje 3 endpointy
struct Aggregator {
sender_a: Bufferer,
sender_b: Bufferer,
sender_c: Bufferer,
}
impl Aggregator {
/// Wysyła żądania równolegle do 3 endpointów i czeka aż każdy z nich odpowie i zwraca
/// wszystkie wyniki jednocześnie.
pub async fn send(
&self,
a: Vec<Request>,
b: Vec<Request>,
c: Vec<Request>,
) -> (Vec<Response>, Vec<Response>, Vec<Response>) {
join! {
self.sender_a.send(a),
self.sender_b.send(b),
self.sender_c.send(c),
}
}
}
I teraz sposób użycia tego:
#[tokio::main]
async fn main() {
let timeout = Duration::from_secs(1);
let batch_size = 3;
let aggregator = Aggregator {
sender_a: Bufferer::new(batch_size, timeout, send_request_to_endpoint_a),
sender_b: Bufferer::new(batch_size, timeout, send_request_to_endpoint_b),
sender_c: Bufferer::new(batch_size, timeout, send_request_to_endpoint_c),
};
// To będzie czekać 1 sekundę, bo mamy za mało żądań.
println!(
"{:?}",
aggregator
.send(
vec![Request(11)],
vec![Request(21)],
vec![Request(31), Request(32)]
)
.await
);
// Te 3 żądania od klientów pójdą natychmiast, bo sumarycznie mamy dokładnie po
// 3 żądania dla każdego endpointu (tak, wiem, w zadaniu było 5, ale chciałem aby przykład był kompaktowy)
println!(
"{:?}",
join! {
aggregator.send(vec![Request(11)], vec![Request(21)], vec![]),
aggregator.send(vec![Request(12), Request(13)], vec![], vec![Request(31), Request(32)]),
aggregator.send(vec![], vec![Request(22), Request(23)], vec![Request(33)]),
}
)
}
Wyniki:
([Response(11, "A")], [Response(21, "B")], [Response(31, "C"), Response(32, "C")])
(([Response(11, "A")], [Response(21, "B")], []), ([Response(12, "A"), Response(13, "A")], [], [Response(31, "C"), Response(32, "C")]), ([], [Response(22, "B"), Response(23, "B")], [Response(33, "C")]))
Oczywiście można by to pewnie jeszcze trochę ulepszyć, np. funkcje do wysyłania żądania mogłyby zwracać Result
aby obsługiwać błędy, niemniej nie będzie to już znacząco bardziej skomplikowane. Można by też trochę poszaleć i dla każdego endpointa zrobić inny typ żądanie i odpowiedzi, ale to już pozostawiam jako pracę domową.
No i oczywiście bardzo jestem ciekaw jak to będzie wyglądało w modern Java.
Jak komuś sie zechce zrobić wersję w Go, to też jestem ciekaw ;)
// edit: dla zwiększenia walorów edukacyjnych dodałem komentarze