WebFlux, RSocket i paginacja

0

Cześć,

opiszę troszkę na okrętkę, bo problem może leży gdzieś indziej i ktoś podpowie :) Swoją aplikację zacząłem pisać w WebFluxie z RSocket (WebSocket) i całkiem spoko to szło, dopóki nie zacząłem korzystać z niej na telefonie. Każda blokada telefonu, zmiana sieci, słaby zasięg itp zrywa połączenie, a cholerstwo nie zawsze chce się automatycznie połączyć ponownie, albo łączy się bardzo długo. Walczyłem trochę z rsocket-js, ale ogólnie poddałem się i zacząłem przeklepywać kontrolery na RESTowe. Szło gładko dopóki nie trafiłem na dwa takie, które robią coś reaktywnie:

    fun reactiveRecipeList(recipeListFilter: RecipeListFilterDto, userId: String): Flux<RecipeListItemDto> {
        return Flux.fromIterable(recipeService.items)
           ///....
            .flatMap { recipe ->
                Mono.just(recipeFullDetails(recipe, userId, recipeListFilter))
                    .subscribeOn(Schedulers.parallel())
            }
            .sort(RecipeListItemSortable().sort(recipeListFilter.sortType))
///...
    }

Kolekcja przepisów na razie <1k rekordów, ale w recipeFullDetails cisnę niemal całą logikę aplikacji. Z RSocket działało to spoko, bo wykonywało się raz, user dostawał 10 pierwszych rekordów we froncie, a jak scrollował niżej to dociągało 10 kolejnych - backpressure.
Próbując to w rescie poczyniłem coś takiego:

data class PaginationRequest(val skip: Long = 0, val take: Long = 10)

    @PostMapping("/test")
    fun pagination(@RequestBody paginationRequest: PaginationRequest): Flux<String> {
        val intRange = IntRange(1, 100)
        val flatMapSequential = Flux.fromIterable(intRange)
            .flatMap { n ->
                Mono.just(n)
                    .map { i ->
                        log.info("changing $i...")
                        "resolve $i to string \n"
                    }
                    .subscribeOn(Schedulers.parallel())
            }
            .sort()
            .skip(paginationRequest.skip)
            .take(paginationRequest.take)
        return flatMapSequential
    }

Problem jest taki, że jak user pociągnie 10 rekordów a potem chce kolejne 10 to cała operacja się wykonuje na nowo. Co mogę zrobić?

  • pakować cały wynik userowi na front - odpada, bo duży i muli przeglądarkę
  • limitować dane wejściowe - odpada, bo sortuję wg najlepszego wyniku, a to wiem dopiero po wykoniu operacji w Mono
  • olać i zostawić tak dopóki nie ma problemu z wydajnością i rachunkiem za AWS
  • ?

Na razie zostaję przy opcji nr 3, ale ma ktoś jeszcze jakiś pomysł na taki przypadek?

1

A nie potrzebujesz tutaj przypadkiem hot publishera?

https://www.vinsguru.com/reactor-hot-publisher-vs-cold-publisher/

0

@anckor: dzięki, ciekawy artykuł. W sumie nie chodzi mi o to, żeby jednego Publishera dzielić przez wielu subskrybentów, tylko żeby jeden subskrybent mógł kontrolować ile elementów jest w stanie przetworzyć (paginacja) i dociągać kolejne - czyli tzw. backpressure.

W RSocket jest to dostępne out of the box. W przypadku REST ciężko to ogarnąć, konsumpcja SSE, zakładanie jakiejś sesji. W sumie po to jest rsocket-js, żeby się w tym samemu nie babrać :)

Na razie odpuszczam temat. Załadowałem całego JSONa do frontu (330 KB) i zrobiłem paginację w JS. Lokalnie działa ok, zobaczymy jak później jak ten json urośnie.

1 użytkowników online, w tym zalogowanych: 0, gości: 1