Metoda .get() blokuje aplikację

0

Cześć,

próbuje przy pomocy biblioteki kafka-clients wysłać na kafke prostą wiaodomość. NIestety gdy wysyłanie wiadomości ma się wykonać aplikacja się blokuje w tym meijscu i nie rzuca żadnych logów. Kafka jest uruchomiona, consumerem potrafie się do niej dobić i pobrać np liste topików.
Oto kod:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Producer<String, String> producer = createProducer();
    ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1", "value1");
    producer.send(record).get();
    System.out.println("Koniec");
}

public static Producer<String, String> createProducer() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test-producer");
    return new KafkaProducer<>(properties);
}
1

Producer.get() to operacja blokująca:

Invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.

Oznacza to tyle, że apka próbuje wysłać wiadomość na kolejkę, ale nie dostaje żadnej zwrotki. Trudno powiedzieć, co się dzieje - ja bym dorzucił log.dirs do propertiesów i sprawdził, czy tam nic nie ma.

0

Nie ustawiłeś "acks", więc jest domyślne. W zależności od wersji Kafki, może być różne. W szczególności może być "all", wówczas get() się powiedzie, jeśli wszystkie repliki potwierdzą przyjęcie wiadomości. Czy Twoja Kafka jest cała i zdrowa?

Możesz prosto zweryfikować powyższą hipotezę o klastrze, przez ustawienie acks, na jakąś głupią wartość pozwalającą zawęzić obszar poszukiwań, ale nie gwarantującą poprawności działania aplikacji (w zasadzie gwarantuje brak takowej ;-) ):

 properties.put(ProducerConfig.ACKS_CONFIG, "0");

Randomowa lektura uzupełniająca: https://betterprogramming.pub/kafka-acks-explained-c0515b3b707e

0
wartek01 napisał(a):

Oznacza to tyle, że apka próbuje wysłać wiadomość na kolejkę, ale nie dostaje żadnej zwrotki. Trudno powiedzieć, co się dzieje - ja bym dorzucił log.dirs do propertiesów i sprawdził, czy tam nic nie ma.

Rozumiem ze .get() jest blokująca ale wlanie nie wiem dlaczego nie dostaje żadnej zwrotki.
W jaki sposób dodać ten confg? Do Properties od KafkaProducer?

_13th_Dragon napisał(a):

Dobry i prosty przykład:
https://www.tutorialkart.com/apache-kafka/producer-example-in-apache-kafka/

Próbowałem kropka w kropkę z tego tutoriala ale nadal to samo

yarel napisał(a):

Nie ustawiłeś "acks", więc jest domyślne. W zależności od wersji Kafki, może być różne. W szczególności może być "all", wówczas get() się powiedzie, jeśli wszystkie repliki potwierdzą przyjęcie wiadomości. Czy Twoja Kafka jest cała i zdrowa?

Możesz prosto zweryfikować powyższą hipotezę o klastrze, przez ustawienie acks, na jakąś głupią wartość pozwalającą zawęzić obszar poszukiwań, ale nie gwarantującą poprawności działania aplikacji (w zasadzie gwarantuje brak takowej ;-) ):

 properties.put(ProducerConfig.ACKS_CONFIG, "0");

Randomowa lektura uzupełniająca: https://betterprogramming.pub/kafka-acks-explained-c0515b3b707e

Próbowałem ale nic to nie daje. Wydaje się że z Kafką wszystko okej. UI mogę się zapiąć bez problemu, stworzyć topc i wysłać message. Jak przy pomocy kodu stworze sobie Consumera to połączyć i pobrac liste topików też mogę.

0

@janiu: skoro jesteś w stanie z poziomu kodu pobrać listę topiców, to:
a) czy topic "test" istnieje?Jeśli tak, to jakie ma ustawienia?
b) jeśli nie, to jak skonfigurowane jest automatyczne tworzenie topiców w Twojej instancji Kafki?

1 użytkowników online, w tym zalogowanych: 0, gości: 1