Retry z save i kafką

0

Hej,

Mam w Springu taką uproszczoną metodę która nasłuchuje topic, ma wykonać logikę i zapisać do bazy (mongo) nową encję, i wrzucić na kafkę message z sukcesem.

Dla prostoty przykłady metoda została okrojona z wszelkich mapperów.

Chciałbym w jakiś w miarę dobry sposób obsłużyć pesymistyczne przypadki.

animalRestaurantService.cook jest idempotentne i pod spodem ma logikę, która dzwoni RESTem do innych mikroserwisów, więc łatwo może się wywalić.

Oprócz tego inne rzeczy też się mogą wywalić: repository.save, kafkaTemplate.send – mogą rzucić wyjątkami.

Dlatego utworzyłem podstawowy RetryTemplate, pod spodem jest skonfigurowany z exponential backoffem i timeoutami.

Czy aktualny kod wygląda w ogóle wygląda normalnie, czy nie budzi jakichś oczywistych 'wtf'? Czego tu brakuje? Jakie są best practices, co rozważyć? O jakich problemach mam w ogóle tutaj myśleć?

Szczególnie ciekawi mnie sytuacja jeśli save się powiedzie, ale kafka się wywali – czy próbować zapewniać tutaj jakoś atomiczność operacji obu operacji (save+kafka publish) np. outbox patternem?

@KafkaListener(topics = KafkaConstants.ANIMALS_BORN, groupId = "well-fed-animals")
public void onAnimalCreated(AnimalBornEvent event) {
    try {
        retryTemplate.execute((RetryCallback<Void, Exception>) context -> {
            final Food food = animalRestaurantService.cook(event.getAnimal());
            repository.save(food);
            kafkaTemplate.send(KafkaConstants.FOOD_COOKED, new FoodCookedEvent(food));
            return null;
        });
    } catch (final Exception ex) {
        log.error("Error during food cooking or sending event to kafka for {}", event, ex);
        kafkaTemplate.send(KafkaConstants.ANIMALS_STARVED_TO_DEATH, new StarvedAnimalEvent(event.getAnimal());
    }
}

W przypadku niepowodzenia koniecznie chcę to opublikować to na kafkę żeby cofnąć operacje w innym serwisie

0

@Legenda: jeśli retryTemplate to https://docs.spring.io/spring-retry/docs/api/current/org/springframework/retry/support/RetryTemplate.html, a repository nie przechwytuje żadnej transakcji z góry (a na to wygląda) to masz potencjalny błąd. Jeśli linijka 7 (tj. wysyłka do Kafki) wywali ci się, to i tak pójdzie zapis do bazy danych. Co więcej, ten zapis do bazy danych pójdzie kilkukrotnie - bo retryTemplate wykona to kilkukrotnie.

To, co bym zrobił w twoim przypadku to odpalił transakcję bazodanową (za pomocą np. TransactionManager) i zrobił coś w stylu:

@KafkaListener(topics = KafkaConstants.ANIMALS_BORN, groupId = "well-fed-animals")
public void onAnimalCreated(AnimalBornEvent event) {
    try {
        retryTemplate.execute((RetryCallback<Void, Exception>) context -> processAnimal(event.getAnimal());
    } catch (final Exception ex) {
        log.error("Error during food cooking or sending event to kafka for {}", event, ex);
        kafkaTemplate.send(KafkaConstants.ANIMALS_STARVED_TO_DEATH, new StarvedAnimalEvent(event.getAnimal());
    }
}

private Void processAnimal(Animal animal) {
  return transactionManager.execute(status -> {
      Food food = animalRestaurantService.cook(animal);
      repository.save(food);
      kafkaTemplate.send(KafkaConstants.FOOD_COOKED, new FoodCookedEvent(food));

      return ((Void) null);
  });
}

Czyli:

  1. Pobieramy potrzebne informacje
  2. Otwieramy transakcję bazodanową
    2.1. Zapisujemy dane do bazy danych
    2.2. Wysyłamy dane na kolejkę
  3. W przypadku, gdy któryś z powyższych się wywali to leci info na Kafkę i cofamy transakcję bazodanową. W rezultacie masz czystą bazę danych oraz informację o błędzie na kolejce.

Ps. kod niekoniecznie się musi skompilować bo pisałem bez IDE ;)

0

Jak zadziała ten KafkaListener, jak po prostu rzucisz wyjątek z tej metody konsumującej event? Nie dostaniesz ponowienia z paczki?

Po drugie, z tego co się orientowałem X lat temu, to Spring Kafka wspiera DLQ - nieobsluzone eventy powinny lądować na takim topicu.

Po trzecie, Mongo nie wspiera(ło) transakcji, wiec mało prawdopodobne, że będziesz w stanie zaimplementować outbox pattern. Na szczęście ten save powinien być idempotentny (upsert), wiec powinno być OK.

Pamiętaj, że nie masz exactly-once delivery, wiec zbadaj jak się zachowa ta logika w przypadku zduplikowanego eventu.

0

Szczególnie ciekawi mnie sytuacja jeśli save się powiedzie, ale kafka się wywali

Można też łatwo to przetestować - na lokalną kafke wrzucasz ten AnimalBornEvent, stawiasz breakpointa w listenerze, jak już do niego wejdzie to stopujesz lokalną kafke i zobaczysz co się stanie.

IMOH ja bym się spodziewał, ze save zostanie zapisany, a ten event poleci właśnie na jakieś DLQ.

0

Można tez to nawet przetestować automatycznie: https://www.testcontainers.org/modules/toxiproxy/

1 użytkowników online, w tym zalogowanych: 0, gości: 1