Zwracanie Flux w Reactive Spring

0

Hej mam aplikację która używa webfluxa i reactive mongo. Mam klasę która ma pole value i chciałem zwrócić wszystkie eleemnty dla których pole value == "test".
Napisałem metodę w repozytorium

interface DataRepository extends ReactiveMongoRepository<Data, String> {
    Flux<Note> findAllByValue(final String value);
}

I metodę kontrollera

@GetMapping("/{value}")
    public Flux<Data> getAllByValue(@PathVariable("value") final String value){
        return dataRepository.findAllByValue(final String value);
    }

I kiedy tak się odpytam otrzymuję java.lang.IllegalArgumentException: Multi-value reactive types not supported in view resolution:. Miał ktoś już podobny problem i zna rozwiązanie albo jak to w tym webfluxie poprawnie zrobić

Poniżej stacktrace

java.lang.IllegalArgumentException: Multi-value reactive types not supported in view resolution: reactor.core.publisher.Flux<com.data.Data>
	at org.springframework.web.reactive.result.view.ViewResolutionResultHandler.handleResult(ViewResolutionResultHandler.java:184) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.web.reactive.DispatcherHandler.handleResult(DispatcherHandler.java:176) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.web.reactive.DispatcherHandler.lambda$handle$2(DispatcherHandler.java:161) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:290) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:323) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoDefaultIfEmpty.subscribe(MonoDefaultIfEmpty.java:37) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:803) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:70) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:381) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE]
	at reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:397) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_162]
1

Ale czemu w repo masz Flux<Note> a nie Flux<Data i co to w ogóle za klasa Data?

0

to tam źle się przekopiowało powinno być Data nie Note, a Data to reprezentacja dokumentu w mongo

@Document
public final class Data {
    ...
    private String value;
    ...

I chciałem żeby mi zwróciło wszystkie obiekty których pole value ma wartość jakąś tam którą piszę w zapytaniu

1

Słowo "Data" w tym przypadku jest złą nazwą nieniosącą żadnego znaczenia. Klasy i obiekty powinny być rzeczownikami lub wyrażeniami rzeczownikowymi, takimi jak Customer, WikiPage, Account czy tez AddressParser. Należy unikać w nazwach klas słów takich jak Manager, Processor, Data lub Info. Nazwy klas nie powinny być czasownikami.

0

Zrób test zobacz czy uda ci się strzelić do tej metody:

    @GetMapping(value = "/test", produces = "application/stream+json")
    public Flux<Long> test() {
        return Flux
                .interval(Duration.ofSeconds(1)).onBackpressureDrop();
    }
0

Mam 3 streamy typu Flux.fromStream(...) , chcialbym zrobic zip tych trzech i wypelnic Dto/Json z 3 listami.
jak to najlepiej zrobic? czy zip zamknie mi stream jak jeden ze streamow sie skonczy i bedzie cancel ?

0

Jak jeden z Fluxów się skończy, to pozostałe użyte w zip dostaną tę informację i subskrybcja na nich zostanie anulowana. Czyli tak.

0
Michał Sikora napisał(a):

Jak jeden z Fluxów się skończy, to pozostałe użyte w zip dostaną tę informację i subskrybcja na nich zostanie anulowana. Czyli tak.

Czyli to co bym chcial zrobic to zebranie kilku streamow niezaleznie od siebie i napchanie ich do obiektu. Nic innego mi nie przychodzi do glowy. Jak to najlepiej osiagnac?

Chyba, ze mozna cos zaradzic na ten zip?

0

Zależy od tego jak mają wyglądać dane wejściowe i wyjściowe. Jeżeli wejścia zwracają dane tego samego typu to na przykład.

Flux<Integer> flux1 = Flux.range(0, 7);
Flux<Integer> flux2 = Flux.range(0, 5);
Flux<Integer> flux3 = Flux.range(0, 4);
Mono<Data> result = Flux.merge(flux1, flux2, flux3)
    .collectList()
    .map(Data::new);

Jeżeli są różne to np.

Mono<List<Integer>> mono1 = Flux.range(0, 7).collectList();
Mono<List<String>> mono2 = Flux.just("a", "b", "c").collectList();
Mono<List<Boolean>> mono3 = Flux.just(true, false, false).collectList();
Mono<Data> result = Mono.zip(mono1, mono2, mono3)
    .map(this::dataFromTuple);

Zakładam, że strumienie nie są nieskończone.

0
Michał Sikora napisał(a):

Zależy od tego jak mają wyglądać dane wejściowe i wyjściowe. Jeżeli wejścia zwracają dane tego samego typu to na przykład.

Flux<Integer> flux1 = Flux.range(0, 7);
Flux<Integer> flux2 = Flux.range(0, 5);
Flux<Integer> flux3 = Flux.range(0, 4);
Mono<Data> result = Flux.merge(flux1, flux2, flux3)
    .collectList()
    .map(Data::new);

Jeżeli są różne to np.

Mono<List<Integer>> mono1 = Flux.range(0, 7).collectList();
Mono<List<String>> mono2 = Flux.just("a", "b", "c").collectList();
Mono<List<Boolean>> mono3 = Flux.just(true, false, false).collectList();
Mono<Data> result = Mono.zip(mono1, mono2, mono3)
    .map(this::dataFromTuple);

Zakładam, że strumienie nie są nieskończone.

Mam rozne typy, pseudo kod:

public class ResponseDto {
    private Set<Person> people;

    private Set<Address> addresses;

    private Set<Integer> ids;

i robilem wlasnie zip i map ale jak zdebugowalem to widze cancel bo jakis stream sie wczesniej skonczyl.
no chyba, ze myle sie ze schedulerami.

0

ach ok, nie zauwazylem na telefonie tego collectList , sprawdze.
jak rozumiem to juz blokuje?

0

Zależy co masz na myśli pisząc "blokuje". Wątków nie blokuje. Ale zip nie wyemituje niczego dopóki wszystkie trzy Mono się nie skończą.

0
Michał Sikora napisał(a):

Zależy co masz na myśli pisząc "blokuje". Wątków nie blokuje. Ale zip nie wyemituje niczego dopóki wszystkie trzy Mono się nie skończą.

Prawda. W moim przypadku i tak w któryms miejscu musze poczekac, wiec pod use case, który przedstawiłem jest to ok.
Moge tez zrobic subscribeOn(Schedulers...()) i to odpalic, to moze troche pomóc.

Myslalem jeszcze, ze da sie tu wykorzystac jakos backpressure czy cos.

0

Ciężko mi sobie wyobrazić, żeby backpressure miało tutaj jakiekolwiek znaczenie. Jedyny punkt ścisku to zip, ale Ty i tak zipujesz tylko trzy listy jeden raz. collectList nie powinno być problematyczne, nawet jakbyś miał milion elementów na sekundę.

Ale jeżeli chciałbyś to zrównoleglić, to subscribeOn powinno być uwzględnione per Mono, a nie na całości po zipowaniu. Zależy w sumie od źródeł danych, ale jeśli pochodzą one ze zwykłych strumieni, jak pisałeś, to tak należałoby to zrównoleglić.

0
Michał Sikora napisał(a):

Ciężko mi sobie wyobrazić, żeby backpressure miało tutaj jakiekolwiek znaczenie. Jedyny punkt ścisku to zip, ale Ty i tak zipujesz tylko trzy listy jeden raz. collectList nie powinno być problematyczne, nawet jakbyś miał milion elementów na sekundę.

Ale jeżeli chciałbyś to zrównoleglić, to subscribeOn powinno być uwzględnione per Mono, a nie na całości po zipowaniu. Zależy w sumie od źródeł danych, ale jeśli pochodzą one ze zwykłych strumieni, jak pisałeś, to tak należałoby to zrównoleglić.

Tak tak, per Mono. Dzięki za pomóc. Coś tam mi się układa z tym w głowie. Ilość operatorów przyprawia o zawrót głowy.
Jeszcze w sumie ostatnie pytanie.
Ja jeszcze zanim to wszystko odpale, to na poczatku jeszcze odpalam calosc Flux.fromIterables(), no i wszystko teraz dziala ok, z tym, ze dostaje z tego zip osobna liste mono per element z tego Flux.fromiterables

cos w stylu:

      Flux<Sex> s = Flux.fromIterable(Sex.keySet());
        return s.map(sex ->services.get(sex).getPeople(currency, ids)).subscribeOn(Schedulers.elastic());

tyle, że dostaje kilka Setów z People {} {} {} zamiast jednego {}

0

miala byc flatMapa

Flux<Sex> s = Flux.fromIterable(Sex.keySet());
        return s.flatMap(sex ->services.get(sex).getPeople(currency, ids)).subscribeOn(Schedulers.elastic());
0

Ummmm... trochę powróżę z kryształowej kuli. Zakładam, że services.get(sex).getPeople(currency, ids) zwraca Flux<Set<People>>. W takim razie, każda płeć jest mapowana na taki set lub kilka setów. Czyli kiedy zostanie wyemitowana jedna płeć, to zostanie wyemitowane ileś (od 0 do nieskończoności) setów ludzi dla tej płci. Większa ilość setów może wynikać albo z większej ilości płci niż 1, albo z większej ilości setów dla jednej płci, albo z obu powodów. Tak czy inaczej, ostatecznie musisz te dane zredukować. Możesz to zrobić np. tak.

return Flux.fromIterable(Sex.keySet())
  .flatMap(sex ->services.get(sex).getPeople(currency, ids))
  .collect(LinkedHashSet::new, Set::addAll)
  .flux() // Jeżeli chcesz zwracać Flux zamiast Mono.
  .subscribeOn(Schedulers.elastic());
0

services.get(sex).getPeople(currency, ids) -> per każdy element jest zwracany Mono ;)

0

Czyli dostaje kilka Mono tego nizej a chcialbym zmergowane...

public class ResponseDto {
    private Set<Person> people;
 
    private Set<Address> addresses;
 
    private Set<Integer> ids;
 


0

services.get(sex).getPeople(currency, ids) zwraca Mono<ResponseDto>? Naprawdę nie rozumiem, co chcesz zrobić.

Wnioskuję, że to niżej zwraca Flux<Set<Person>>.

return Flux.fromIterable(Sex.keySet())
  .flatMap(sex ->services.get(sex).getPeople(currency, ids))
  .subscribeOn(Schedulers.elastic());

W takim razie, to zwróci Mono<Set<Person>> z unikalnymi względem equals osobami.

return Flux.fromIterable(Sex.keySet())
  .flatMap(sex ->services.get(sex).getPeople(currency, ids))
  .collect(LinkedHashSet::new, Set::addAll)
  .subscribeOn(Schedulers.elastic());
0

Przepraszam. Za duzo pseudokodu.

return Flux.fromIterable(Sex.keySet())
  .flatMap(sex -> services.get(sex).getResponseDto(currency, ids))
  .collect( Collectors.toSet())
  .subscribeOn(Schedulers.elastic());

To zwroci Mono<Set<ResponseDto>.

Niezaleznie co robie to dostaje w srodku wiele list people, addresses itp.

0

Ale to już Ty się ograniczasz swoim własnym typem ResponseDto. Musisz się zastanowić, jak zaimplementowałbyś funckję z mniej więcej taką sygnaturą ResponseDto combine(ResponseDto first, ResponseDto second) i jak jej użyc w collect. Aczkolwiek wydaje mi się to dosyć kiepskie i lepiej byłoby to zrobić np. na Tuple3 i wtedy mapować na ResponseDto.

0

Ogolnie jak kiepskie by to nie bylo to wydawalo mi sie, ze da sie zrobic ;) ale dzieki za sugestie :)

A to o czym mowisz to chyba moze mi dac po prostu zip? Tylko moglbym zlapac to troche pozniej zamiast od razu mapowac.

0

Użyć zip niby można, ale musiałbyś skorzystać z jednego z dwóch poniższych przeciążeń i z map zamiast flatMap. I funkcja łącząca kilka ResponseDto w jedno dalej musi być napisana.

public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources,
                              Function<? super Object[],? extends O> combinator)

@SafeVarargs
public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator,
                                             Publisher<? extends I>... sources)
0

ostatecznie skonczylem na koncu z reduce

1 użytkowników online, w tym zalogowanych: 0, gości: 1