Problem z operacjami na bazie danych z użyciem project reactor

0

Napisałem funkcje która sprawdza czy dane w bazie danych są takie same jak otrzymane z zewnętrznego źródła, jeżeli nie są takie same to wszystkie dane z bazy są usuwane i wstawiane są te z zewnętrznego źródła. Tak wygląda ta funkcja

public <T> Mono<Void> replaceObjectsIfNotAlreadyStored(List<T> products, ReactiveCrudRepository<T, String> objRepository) {
    return objRepository.findAll()
            .collectList()
            .filter(productsList -> !areAlreadyInDb(productsList, products))
            .flatMap(products1 ->  objRepository.deleteAll()).
            doOnSuccess(products1 -> objRepository.saveAll(products).collectList());
}

Ale nie przechodzi testów

@Test
public void replaceObjectsIfNotAlreadyStoredNeg() {
    repository.saveAll(products)
            .collectList()
            .block();

    checksumService.replaceObjectsIfNotAlreadyStored(otherList, repository).block();
    List<Product> current = repository.findAll()
            .collectList()
            .block();

    assertEquals(current, otherList);
    repository.deleteAll()
            .block();
}

Zwracana lista current jest pusta, żadnymi exceptionami nic nie rzuca.
To moje pierwsze zetknięcie z programowaniem reaktywnym, więc jeżeli ktoś znajdzie błąd to prosiłbym także o wyjaśnienie dlaczego właśnie tak i co jest złego w moim podejściu.

1

Błąd jest tu:

doOnSuccess(products1 -> objRepository.saveAll(products).collectList());

Nic nie konsumuje (subskrybuje) tego strumienia objRepository.saveAll(products).collectList() (a właświwie Mono).
Zamień to doOnSuccess na flatMap i po sprawie. Po prostu powinien być flatMap.

Dodatkowo: możesz zwracać zamiast Mono<Void> Mono<List<T>>
Dodatkowo project reactor ma dedykowany moduł reactorTest https://projectreactor.io/docs/core/release/reference/docs/index.html#testing
żebyś nie musiał pisać heretyckich block(). Wtedy też trudniej popełnić błąd.

0

@jarekr000000: miałeś rację, głupio nie zauważyłem że strumień w onSuccess jest nie zasubskrybowany, ale w rozwiązaniu oprócz flatMapa musiałem dodać thenReturn ponieważ deleteAll zwracał mi puste mono i strumień nie szedł dalej. I dzięki za ten moduł do testowania.
Tak teraz wygląda kod funkcji:

    public <T> Mono<List<T>> replaceObjectsIfNotAlreadyStored(List<T> products, ReactiveCrudRepository<T, String> objRepository) {
        return objRepository.findAll()
                .collectList()
                .filter(productsList -> !areAlreadyInDb(productsList, products))
                .flatMap(products1 ->  objRepository.deleteAll())
                .thenReturn(products)
                .flatMap(products1 -> objRepository.saveAll(products).collectList());
    }

I testu

 @Test
    public void replaceObjectsIfNotAlreadyStoredNeg() {
        StepVerifier.create(repository.saveAll(products)
                .collectList())
                .expectNext(products)
                .expectComplete();

        StepVerifier.create(checksumService.replaceObjectsIfNotAlreadyStored(otherProducts, repository))
                .expectNext(otherProducts)
                .expectComplete();

        StepVerifier.create(repository.findAll()
                .collectList())
                .expectNext(otherProducts)
                .expectComplete();

        repository.deleteAll()
                .block();
    }

1 użytkowników online, w tym zalogowanych: 0, gości: 1