Cześć,
w teście widocznym poniżej wiadomości nie są czytane przez Kafka consumer. Nie do końca wiem, czy wina leży po stronie wrzucającej, czy czytającej z brokera. Uprościłem test do minimum i niestety nadal nie działa, a pomysły mi się skończyły. Może Wy mnie poradujecie radą?
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@Testcontainers
public class KafkaTest
{
@Container
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Test
public void shouldReadFromKafkaBroker()
{
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaTemplate<String, String> producer = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "com.b.c");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(),
new StringDeserializer());
consumer.subscribe(ImmutableList.of("abc"));
producer.send("abc", "testowa wiadomosc");
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
Assertions.assertNotEquals(0, poll.count());
}
}