Project Reactor + Cassandra - findById pobiera dane sprzed updejtu

0

Czołem :)

Napisałem w Project Reactor prostą metodę, której zadaniem jest dezaktywacja osób w bazie Cassandry.
Jeżeli osoba o podanym peselu istnieje + jest aktywna -> zmiana flagi -> update -> zwracamy wiadomość "Sukces"
Jeżeli osoba istnieje + nie jest aktywna -> zwracamy wiadomość "Do bani!"

Od kilku dni staram się znaleźć błąd w kodzie:

@Override
private Mono<String> deactivateUser(String pesel) {
	return Mono.just(pesel)
			.flatMap(repository::findById)
			.filter(PersonEntity::getActive)
			.map(entity -> {
				entity.setActive(false);
				return entity;
			})
			.flatMap(entity -> repository.save(entity)
					.flatMap(entity1 -> Mono.just("Sukces"))
			)
			.defaultIfEmpty("Do bani!")
			.doOnNext(System.out::println);
}

Otóż gdy wywołuje:

String pesel = "123456";
testInsert(pesel); // osoba metoda do wstawiania rekordu , woła subscribe na końcu
deactivateUser(pesel).subscribe(); 
deactivateUser(pesel).subscribe();

W pierwszym deactivate powinno wyświetlić się "Sukces" (jest) a w drugim "Do bani!" a jest "Sukces".

Pytanie - dlaczego tak się dzieje?

Co ciekawe gdy wtryniłem .doOnNext(System.out::println) po operacji findById to w drugim wywołaniu pokazuje mi usera sprzed update. Czemu?

Z góry serdecznie dzięki za pomoc!

0

Strumień sam z siebie wygląda znośnie (to setActive może być niebezpieczne). Musiałbyś pokazać jak wygląda repozytorium w teście i powiedzieć czy wszystko jest jednowątkowe.

Przy okazji mały hint.

.flatMap(entity -> repository.save(entity)
  .flatMap(entity1 -> Mono.just("Sukces")))

Możesz zamienić na to.

.flatMap(repository::save)
.map(it -> "Sukces")
0

Zdaje się, że z trójki CAP (Consistency, Availability, Partition fault tolerance), Cassandra zapewnia A i P, więc to, że nie masz spójności danych nie jest niczym nadzwyczajnym.
https://wiki.apache.org/cassandra/ArchitectureOverview

0
deactivateUser(pesel).subscribe(); 
deactivateUser(pesel).subscribe();

czy te operacje są asynchroniczne ? czy najpierw kończy się pierwsza, a potem jak już pierwsza sie skończy to zaczyna druga ? tak to działa w rxJavie (musze jasno wywołać toBlocking()), jeżeli tak, to odpowiedź jest oczywista, oba zapytania dostają w wyniku ten sam row (z active na true.)

0
rubaszny_karp napisał(a):
deactivateUser(pesel).subscribe(); 
deactivateUser(pesel).subscribe();

czy te operacje są asynchroniczne ? czy najpierw kończy się pierwsza, a potem jak już pierwsza sie skończy to zaczyna druga ? tak to działa w rxJavie (musze jasno wywołać toBlocking()), jeżeli tak, to odpowiedź jest oczywista, oba zapytania dostają w wyniku ten sam row (z active na true.)

??? Really ??? W RxJava jak nie ma żadnego subscribeOn czy czegoś podobnego to **subscribe **jest blokujące., taki default.

(kurteczka, jestem dziś trochę chorszy, ale chyba aż tak mnie nie sponiewierało, z drugiej strony karp zwykle nie bredzi ... -karpie! - sprawdź to !).

0
jarekr000000 napisał(a):

??? Really ??? W RxJava jak nie ma żadnego subscribeOn czy czegoś podobnego to **subscribe **jest blokujące., taki default.

Hmm? Przecież w RxJavie masz subscribeOn, subscribe i blockingSubscribe. W Project Reactor nie ma blockingSubscribe.

0
jarekr000000 napisał(a):
rubaszny_karp napisał(a):
deactivateUser(pesel).subscribe(); 
deactivateUser(pesel).subscribe();

czy te operacje są asynchroniczne ? czy najpierw kończy się pierwsza, a potem jak już pierwsza sie skończy to zaczyna druga ? tak to działa w rxJavie (musze jasno wywołać toBlocking()), jeżeli tak, to odpowiedź jest oczywista, oba zapytania dostają w wyniku ten sam row (z active na true.)

??? Really ??? W RxJava jak nie ma żadnego subscribeOn czy czegoś podobnego to **subscribe **jest blokujące., taki default.

(kurteczka, jestem dziś trochę chorszy, ale chyba aż tak mnie nie sponiewierało, z drugiej strony karp zwykle nie bredzi ... -karpie! - sprawdź to !).

to zależy co masz w chainie, czy masz jakąś inną thread-poole, weź sobie to odpal

public class Main {
    public static void main(String... args) throws InterruptedException {
        Observable.just(1)
                .delay(10, TimeUnit.SECONDS)//
                .map(asd -> {
                    System.out.println(asd);
                    return Observable.just(asd);
                })
                .subscribe();

        System.out.println("DUPA");
        Thread.sleep(10000);
    }
}

jak zrobisz tak, to faktycznie wykona sie na aktualnym wątku

public class Main {
    public static void main(String... args) throws InterruptedException {
        Observable.just(1)
//                .delay(10, TimeUnit.SECONDS)//
                .map(asd -> {
                    System.out.println(asd);
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return Observable.just(asd);
                })
                .subscribe();

        System.out.println("DUPA");
        Thread.sleep(10000);
    }
}

p.s nienawidzę i nie lubie (i nie znam się) na rxjavie czy reactorze, powinno się to ustawowo zakazać (właśnei dlatego jak to działa, to jest jakieś skomplikowane)
p.s.2 no dobra, może nie nie-nawidzę, ale nie ufam.

1 użytkowników online, w tym zalogowanych: 0, gości: 1