Używacie Springa z Kafką?
1. Produkowanie
Jak na razie to więcej mam problemów niż pożytku z używania Kafki opakowanej w Springa.
Popatrzcie na implementację KafkaTemplate
. Gdy chcę użyć go do wysłania Springowego Message<?>
to mam tylko jedną metodę:
public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
if (!producerRecord.headers().iterator().hasNext()) {
byte[] correlationId = (byte[])message.getHeaders().get("kafka_correlationId", byte[].class);
if (correlationId != null) {
producerRecord.headers().add("kafka_correlationId", correlationId);
}
}
return this.doSend(producerRecord);
}
nie mam możliwości zapodania topic
tak jak w innych metodach KafkaTemplate
do wysyłki:
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
...
}
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
...
}
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
....
}
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) {
...
}
Używanie 1 KafkaTemplate
do wysyłania na wiele topic
staje się bardzo utrudnione. Musiałbym za każdym razem robić kafkaTemplate.setDefaultTopic(...)
i zsynchronizować to. Ewentualnie musiałbym mu zapodać swój converter przez kafkaTemplate.setMessageConverter(...)
. Jest jescze możliwość, by zapodać topic w headerach co sprawia, że muszę zmienić otrzymany message do wysyłki.
Lepiej jest więc zapodawać do KafkaTemplate
od razu kafkowy ProducerRecord<K, V> record
, a wtedy pojawia się pytanie do czego w ogóle to KafkaTemplate
skoro taki record można wsadzić bezpośrednio do producenta Kafki.
2. Konsumowanie
No ja nie wiem czy to springowe jest bardziej przejrzyste:
@KafkaListener(topics = KafkaTopics.UPPER_CASE)
public void listenToUpperCase(
@Payload String payload,
@Headers Map<String, String> headers
) {
...
}
w porównaniu do kafkowego
ConsumerRecords<String, String> records = kafkaConsumer.poll(duration);
Map<String, String> headers = mapHeaders(records.headers());
String payload = records.value();
3. Nagłówki.
Są niekonfigurowalne, masz używać takich jakie wymyślił Spring.
mimo że przecież jest na to klasa: org.springframework.kafka.support.KafkaHeaders