Brak wiadomości w topiku

0

Cześć,
w teście widocznym poniżej wiadomości nie są czytane przez Kafka consumer. Nie do końca wiem, czy wina leży po stronie wrzucającej, czy czytającej z brokera. Uprościłem test do minimum i niestety nadal nie działa, a pomysły mi się skończyły. Może Wy mnie poradujecie radą?

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

@Testcontainers
public class KafkaTest
{
    @Container
    public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    @Test
    public void shouldReadFromKafkaBroker()
    {
        final Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaTemplate<String, String> producer = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));

        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "com.b.c");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(),
                new StringDeserializer());

        consumer.subscribe(ImmutableList.of("abc"));
        producer.send("abc", "testowa wiadomosc");

        ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
        Assertions.assertNotEquals(0, poll.count());
    }
}
1

Rozumiem że w logach widać, że sama Kafka startuje, tak?

Ja co prawda używałem Kafki w testcontainers "ręcznie" (w sensie bez @Container), ale pamietam że czasem trzeba było spory delay zrobić, żeby consumer załapał, np 5-10 sekund. Spróbuj wydłużyć delay do 30 sekund i zobaczyć.

0
Pinek napisał(a):

Rozumiem że w logach widać, że sama Kafka startuje, tak?

Ja co prawda używałem Kafki w testcontainers "ręcznie" (w sensie bez @Container), ale pamietam że czasem trzeba było spory delay zrobić, żeby consumer załapał, np 5-10 sekund. Spróbuj wydłużyć delay do 30 sekund i zobaczyć.

Co do logów to sam nie wiem. Niby w porządku, ale nie jestem do końca przekonany. Przykładowo tutaj nie widać bootstrap.servers:

[2022-11-29 18:45:20,091] INFO ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = []

tutaj jest jakieś nadpisanie. Nie do końca wiem jak duża jest to zasługa tego, że jest to właśnie test containers.

[2022-11-29 18:45:20,719] INFO Processing override for entityPath: brokers/1 with config: Map(advertised.listeners -> PLAINTEXT://localhost:49198,BROKER://69ac5fbadce9:9092) (kafka.server.DynamicConfigManager)
[2022-11-29 18:45:20,721] INFO KafkaConfig values: 
	advertised.host.name = null
	advertised.listeners = PLAINTEXT://localhost:49198,BROKER://69ac5fbadce9:9092
	advertised.port = null
[2022-11-29 18:45:39,752] INFO [Partition abc-0 broker=1] abc-0 starts at leader epoch 0 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
0

Te DynamicConfig to tak, tak właśnie w testcontainers Kafka podmienia dynamicznie parametry. Tak myślę - a spróbuj delay 20 sekundowy dać jeszcze na samym początku testu (żeby na spokojnie Kafka wstała i podmieniła swoje propertiesy, a dopiero potem skonfigurować consumer i producer).

1 użytkowników online, w tym zalogowanych: 0, gości: 1