Największym plusem dodania headerów do kafki miała być możliwość wyfiltrowania wiadomości bez konieczności deserializacji payloadu.
Chciałbym z tego skorzystać w Springu.
Do deserializacji dochodzi przed filtrowaniem z RecordFilterStrategy
:
org.springframework.kafka.listener.KafkaMessageListenerContainer
odpala records = this.consumer.poll(this.pollTimeout);
które odpala:
return this.fetcher.fetchedRecords();
w org.apache.kafka.clients.consumer.KafkaConsumer
które odpala:
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
w org.apache.kafka.clients.consumer.internals.Fetcher
czy coś robię źle?
konfiguracja:
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
RecordInterceptor<String, Object> recordInterceptor,
RecordFilterStrategy<String, Object> recordFilterStrategy,
@Value("${spring.kafka.consumer.ack-discarded:true}") boolean ackDiscarded
) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setAckDiscarded(ackDiscarded);
factory.setRecordFilterStrategy(recordFilterStrategy);
return factory;
}
@Bean
RecordFilterStrategy<String, Object> recordFilterStrategy() {
return record -> {
return true;
};
}
konsumpcja:
@KafkaListener(topics = KafkaTopics.MY_TOPIC, groupId = "001", containerFactory = "kafkaListenerContainerFactory")
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class KafkaMessageListener {
private final Set<MessageHandler<MyEvent>> myEventHandlers;
private final AppConfig appConfig;
@KafkaHandler
public void listenToUpperCase(@Payload MyEventpayload, @Headers MessageHeaders headers) {
myEventHandlers.parallelStream()
.filter(h -> h.accept(headers))
.forEach(h -> {
log.info(createLogMsg("Accepted", payload.toString(), headers));
h.handle(payload, headers);
});
}
}
Jedyne dzikie rozwiązanie jakie mi przychodzi do głowy to opakować deserializer w deserializer, który będzie filtrował.