Testowanie procesów asynchronicznych

0

Problem:
Proces składa się z wielu podprocecsów działających asynchronicznie. Komunikują się za pomocą Kafki.
Chcę taki proces przetestować: wysłać żądanie i gdy przyjdzie powiadomienie, że proces został ukończony to sprawdzić na bazie to co proces wypluł.

Pomockowałem sobie kafkę i proces działa, ale nie wiem jak to napisać bez tej pętli while. A nawet jak ją mam to nie wiem jak zatrzymywać test, gdy jest OK, albo jak zatrzymywać, gdy jest błąd.

void shouldRunProcessWithSuccess() {

    kafkaMock.send("start-process");
    kafkaMock.addListener(// ... jeśli otrzymasz "process-success" to { checkDatabase(); kafkaMock.stop() }
    kafkaMock.addListener( // ... jeśli otrzymasz "process-failure" to { fail(); kafkaMock.stop() }
    while (kafkaMock.isRunning()) { }

}

te consumery nigdy nie sfailują testu, bo są odpalane w innym wątku.

3

Poczytaj o Awaitility (polling). Innym sposobem, ale stosowanym w trochę innej konfiguracji, jest zastosowanie CountDownLatch (zawieszenie wątku).

2
  1. Przede wszystkim Twoje testy to jakas popierdółka a nie testy, czemu nie postawisz Kafki lokalnie? O testcontainers słyszał?
  2. Polecam Awaitility
0

To awaility to mi zastąpi tylko tę petlę while, musiałbym pobierać wszystkie wiadomości z tematu kafki i sprawdzać czy przyszedł już ten mówiący o sukcesie, a ja chciałem dodać listener który zakończy mi test z sukcesem jak się uruchomi + nie chcę by test się zakańczał dopóki się to nie stanie.

BlockQueue records = new ArrayBlockingQueue;

void shouldRunProcessWithSuccess() {

    kafkaMock.send("start-process");
    kafkaMock.addListener( // ... jeśli otrzymasz "process-failure" to { fail(); kafkaMock.stop() }
    kafkaMock.addListiner( record -> records.add(record););
    await().
    .atMost(10, SECONDS)
    .with()
    .pollInterval(1, SECONDS)
    .until(isSuccess());
}

boolean isSuccess() {
    records.poll().getHeaders().getHeader("event").equals("process-success");
}
1
NamingException napisał(a):

To awaility to mi zastąpi tylko tę petlę while, musiałbym pobierać wszystkie wiadomości z tematu kafki i sprawdzać czy przyszedł już ten mówiący o sukcesie, a ja chciałem dodać listener który zakończy mi test z sukcesem jak się uruchomi + nie chcę by test się zakańczał dopóki się to nie stanie.

No to po to właśnie możesz użyć Awaitility. Dostarcza różne utilsy do pracy właśnie z takimi asynchronicznymi procesami. Możesz np. zbudować warunek mówiący Oczekuj aż warunek X jest spełniony, ale nie dłużej niż N czasu.

A poza tym to nie wiem czy potrzebujesz tych mocków, Kafka postawiona na TestContainers w zupełności da radę bez mockowania, da się też postawić Embedded Kafka choć to nadal będzie taka atrapa.

Swoją drogą, jak to właściwie wygląda, co chcesz tym przetestować - fragment procesu znajdujący się po stronie consumera czy producera? Próbujesz testować całość procesu? Jeśli tak to IMO powinien być test integracyjny lub end-to-end, bez mocków, tylko na kompletnym organizmie.

te consumery nigdy nie sfailują testu, bo są odpalane w innym wątku.

Jak są w innym wątku to nie szkodzi, możesz użyć listenerów do zrzucania eventów do thread-safe struktury w pamięci i użyć Awaitility do oczekiwania na wynik i ew. failowania testu.

4

Test w ogóle powinien wyglądać jakoś tak:

void shouldRunProcessWithSuccess() { // ta nazwa testu nic nie mówi - napisz czego konkretnie oczekujesz dla jakich danych wejściowych 
- bardziej biznesowo!
    // given
   thereIsAProcess(...); // jakaś definicja tego procesu, setup początkowy, jeśli jest to istotne z perspektywy when lub then     

    // when
    processHasBeenStarted(); // pod spodem odpalasz logikę, najlepiej przez fasadę/cololwiek/API, które rzuca event na Kafkę

    // then
    await().atMost(1, SECONDS).untilAsserted(() -> {
        val result = processHasFinished(); // tutaj pobierasz dane z bazy za pomocą jakiejś fasady/cokolwiek/API
        // ...
    }
}

1 użytkowników online, w tym zalogowanych: 0, gości: 1