Jak wyfiltrować recordy Kafki przed deserializacją?

0

Największym plusem dodania headerów do kafki miała być możliwość wyfiltrowania wiadomości bez konieczności deserializacji payloadu.

Chciałbym z tego skorzystać w Springu.

Do deserializacji dochodzi przed filtrowaniem z RecordFilterStrategy:
org.springframework.kafka.listener.KafkaMessageListenerContainer odpala records = this.consumer.poll(this.pollTimeout);
które odpala:
return this.fetcher.fetchedRecords(); w org.apache.kafka.clients.consumer.KafkaConsumer
które odpala:
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); w org.apache.kafka.clients.consumer.internals.Fetcher

czy coś robię źle?

konfiguracja:

    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory,
            RecordInterceptor<String, Object> recordInterceptor,
            RecordFilterStrategy<String, Object> recordFilterStrategy,
            @Value("${spring.kafka.consumer.ack-discarded:true}") boolean ackDiscarded
    ) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setAckDiscarded(ackDiscarded);
        factory.setRecordFilterStrategy(recordFilterStrategy);
        return factory;
    }

    @Bean
    RecordFilterStrategy<String, Object> recordFilterStrategy() {
        return record -> {
            return true;
        };
    }

konsumpcja:

@KafkaListener(topics = KafkaTopics.MY_TOPIC, groupId = "001", containerFactory = "kafkaListenerContainerFactory")
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class KafkaMessageListener {

    private final Set<MessageHandler<MyEvent>> myEventHandlers;
    private final AppConfig appConfig;

    @KafkaHandler
    public void listenToUpperCase(@Payload MyEventpayload, @Headers MessageHeaders headers) {
        myEventHandlers.parallelStream()
                .filter(h -> h.accept(headers))
                .forEach(h -> {
                    log.info(createLogMsg("Accepted", payload.toString(), headers));
                    h.handle(payload, headers);
                });
    }
}

Jedyne dzikie rozwiązanie jakie mi przychodzi do głowy to opakować deserializer w deserializer, który będzie filtrował.

0

@NamingException: ale coś ci nie działa czy jak? Ogólnie to ten kod będzie odrzucał wszystkie rekordy z automatu.

0
wartek01 napisał(a):

@NamingException: ale coś ci nie działa czy jak? Ogólnie to ten kod będzie odrzucał wszystkie rekordy z automatu.

Zanim odrzuci to zdeserializuje payload. Chcę nie deserializować payloadu wiadomości, które odrzucam bez konieczności zaglądania w payload.

2

Cały czas nie rozumiem, ale się nie wtrącałem do tej pory - po co ty na siłę do tej kafki springa chcesz używać? Wszędzie Ci przeszkadza, nawet się nie dziwie, mi też raczej przeszkadza. Olej, używaj normalnego klienta, albo jakiegoś innego (używałem np. klienta z kafka reactor https://projectreactor.io/docs/kafka/release/reference/#_reactive_api_for_kafka).

0

@NamingException: nie zdeserializuje ponieważ RecordFilterStrategy z automatu wszystko ( record -> { return true; } ) odrzuci. Jeśli ci wiadomości dochodzą do listenToUpperCase to pewnie jest jakiś problem z konfiguracją springową.

EDIT: ok, doczytałem OCB w Springu. Spring jest pod tym względem dosyć naiwny i po prostu deserializuje wszystko jak leci. Chyba najłatwiej by było napisać własny deserializer który np. zwróci null'a, a potem RecordFilterStrategy czyścić takie rekordy.

0

do listenera nie dolatuje, tylko przed filtrowaniem deserializje.

0

Wygląda chyba na to, że to nie tyle task dla Springa tylko dla Kafki. Wiecie jak pobrać headery konsumowane rekordu bez serializacji value (payload)? Chyba się nie da inaczej niż przez dziki serializer, w którym wepchniemy logikę.

0

@NamingException: Wygląda, że jest tak jak piszesz . Randomowy link z githuba: https://github.com/a0x8o/kafka/blob/eea4ae07c2d55c6b3f4d1f6adc695424e91faace/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1384 pokazuje, że jest to w KafkaClient

Z drugiej strony, Consumer jest interfejsem i możesz sobie własną implementację zrobić taką jaka Ci pasuje.
https://github.com/a0x8o/kafka/blob/eea4ae07c2d55c6b3f4d1f6adc695424e91faace/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java#L38

  1. Rozumiem, że chcesz uniknąć deserializacji przed filtrowaniem, ale w jakim celu/jaki problem to rozwiązuje?
  2. Może problem da się inaczej rozwiązać, tak by nie było potrzeby filtrowania?
0
yarel napisał(a):

@NamingException: Wygląda, że jest tak jak piszesz . Randomowy link z githuba: https://github.com/a0x8o/kafka/blob/eea4ae07c2d55c6b3f4d1f6adc695424e91faace/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1384 pokazuje, że jest to w KafkaClient

  1. Rozumiem, że chcesz uniknąć deserializacji przed filtrowaniem, ale w jakim celu/jaki problem to rozwiązuje?

Szkoda zasobów po prostu.

A może używać ByteArrayDeserializer i sobie payload deserializować na własną rękę w ciele handlera... Może to taki zamysł był tych headerów.

1

Jeśli naprawdę czegoś takiego potrzeba to tak jak już napisałem:

public class KafkaTest {
    public class Wrapper<T> {
        private Class<T> clazz;
        private volatile T value;
        private byte[] bytes;

        public Wrapper(final byte[] bytes, final Class<T> clazz) {
            this.clazz = clazz;
            this.bytes = bytes;
        }

        public T getValue() {
            if (value == null) {
                synchronized (this) {
                    if (value == null) {
                        this.value = new GenericDeserializer(clazz)
                                .deserialize(null, bytes);
                    }
                }
            }
            
            return value;
        }

        public byte[] getBytes() {
            return bytes;
        }
    }

    public class WrapperDeserializer implements Deserializer<Wrapper<Concrete>> {

        @Override
        public Wrapper<Concrete> deserialize(String topic, byte[] data) {
            return new Wrapper<>(data, Concrete.class);
        }
    }
    
    public class Concrete implements Serializable {
        // fields
    }
}

Problem może zacząć się w momencie, kiedy ConcreteDeserializer ma jakieś dodatkowe zależności (głównie Avro i schema registry), ale do JSONa jak najbardziej wystarczy.

1

Wbrew temu co napisał @NamingException nie ma nic brzydkiego w inicjalizowaniu GenericDeserializer we wrapperze.
Jak będziesz chciał użyć innegoi to sobie dorzucisz factory do konstruktora.

Natomiast wali po oczach zrypana synchronizacja. Albo trzeba dodać volatile, albo przesunąć nieco synchronized -
albo najlepiej użyć gotowej klasy np. Lazy z vavr.
No i chyba najważniejsze pytanie: czy w ogóle ma tu synchronizacja sens?

0

Moim zdaniem jednak najprościej będzie zrobić tak, że ustawić ByteArrayDeserializer on zwraca niezdeserializowany payload i deserializować sobie w listenerze ręcznie:

@KafkaListener(topics = KafkaTopics.MY_TOPIC, groupId = "001", containerFactory = "kafkaListenerContainerFactory")
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class KafkaMessageListener {

    private final Set<MessageHandler<MyEvent>> myEventHandlers;
    private final AppConfig appConfig;

    @KafkaHandler
    public void listenToUpperCase(@Payload byte[] rawPayload, @Headers MessageHeaders headers) {
        myEventHandlers.parallelStream()
                .filter(h -> h.accept(headers))
                .forEach(h -> {
                    MyEventPayload payload = myDeserializer.deserialize(rawPayload);
                    log.info(createLogMsg("Accepted", payload.toString(), headers));
                    h.handle(payload, headers);
                });
    }
}

W obu przypadkach: zarówny w tym powyżej jak i propozycji Wrapera utracimy możliwość wielu KafkaHandler po typach payloadu, no bo każdy będzie byte[] albo Wrapper

1 użytkowników online, w tym zalogowanych: 0, gości: 1