Kafka ze Springiem

0

Używacie Springa z Kafką?

1. Produkowanie

Jak na razie to więcej mam problemów niż pożytku z używania Kafki opakowanej w Springa.

Popatrzcie na implementację KafkaTemplate. Gdy chcę użyć go do wysłania Springowego Message<?> to mam tylko jedną metodę:

public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
        ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
        if (!producerRecord.headers().iterator().hasNext()) {
            byte[] correlationId = (byte[])message.getHeaders().get("kafka_correlationId", byte[].class);
            if (correlationId != null) {
                producerRecord.headers().add("kafka_correlationId", correlationId);
            }
        }

        return this.doSend(producerRecord);
    }

nie mam możliwości zapodania topic tak jak w innych metodach KafkaTemplate do wysyłki:

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        ...
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        ...
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
       ....
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) {
       ...
    }

Używanie 1 KafkaTemplate do wysyłania na wiele topic staje się bardzo utrudnione. Musiałbym za każdym razem robić kafkaTemplate.setDefaultTopic(...) i zsynchronizować to. Ewentualnie musiałbym mu zapodać swój converter przez kafkaTemplate.setMessageConverter(...). Jest jescze możliwość, by zapodać topic w headerach co sprawia, że muszę zmienić otrzymany message do wysyłki.

Lepiej jest więc zapodawać do KafkaTemplate od razu kafkowy ProducerRecord<K, V> record, a wtedy pojawia się pytanie do czego w ogóle to KafkaTemplate skoro taki record można wsadzić bezpośrednio do producenta Kafki.

2. Konsumowanie
No ja nie wiem czy to springowe jest bardziej przejrzyste:

    @KafkaListener(topics = KafkaTopics.UPPER_CASE)
    public void listenToUpperCase(
            @Payload String payload,
            @Headers Map<String, String> headers
    ) {
       ...
    }

w porównaniu do kafkowego

ConsumerRecords<String, String> records = kafkaConsumer.poll(duration);
Map<String, String> headers = mapHeaders(records.headers());
String payload = records.value();

3. Nagłówki.
Są niekonfigurowalne, masz używać takich jakie wymyślił Spring.

mimo że przecież jest na to klasa: org.springframework.kafka.support.KafkaHeaders

0
NamingException napisał(a):

Używacie Springa z Kafką?

1. Produkowanie

Jak na razie to więcej mam problemów niż pożytku z używania Kafki opakowanej w Springa.

Popatrzcie na implementację KafkaTemplate. Gdy chcę użyć go do wysłania Springowego Message<?> to mam tylko jedną metodę:
nie mam możliwości zapodania topic tak jak w innych metodach KafkaTemplate do wysyłki:

Przecież org.springframework.messaging.Message to wrapper z headerami. Jak chcesz element wysłać na topic="myTopic" to budujesz Message z payloadem element i headerem do topicu.

Używanie 1 KafkaTemplate do wysyłania na wiele topic staje się bardzo utrudnione. Musiałbym za każdym razem robić kafkaTemplate.setDefaultTopic(...) i zsynchronizować to. Ewentualnie musiałbym mu zapodać swój converter przez kafkaTemplate.setMessageConverter(...). Jest jescze możliwość, by zapodać topic w headerach co sprawia, że muszę zmienić otrzymany message do wysyłki.

Albo po prostu:

public void reroute(final Message<?> message, final String newTopic) {
	Message<?> newMessage = MessageBuilder.from(message)
		.withPayload(message.getPayload())
		.setHeader(TOPIC_HEADER, newTopic)
		.build();

	kafkaTemplate.send(message);
}

Lepiej jest więc zapodawać do KafkaTemplate od razu kafkowy ProducerRecord<K, V> record, a wtedy pojawia się pytanie do czego w ogóle to KafkaTemplate skoro taki record można wsadzić bezpośrednio do producenta Kafki.

Jest też opcja, żeby korzystać z KafkaTemplate.send(Message<?>); zgodnie z jej przeznaczeniem - tj. tworzyć je w adapterze do Kafki, który decyduje do jakich topiców wysyłać wiadomości.

2. Konsumowanie
No ja nie wiem czy to springowe jest bardziej przejrzyste:

    @KafkaListener(topics = KafkaTopics.UPPER_CASE)
    public void listenToUpperCase(
            @Payload String payload,
            @Headers Map<String, String> headers
    ) {
       ...
    }

w porównaniu do kafkowego

ConsumerRecords<String, Message<String>> records = kafkaConsumer.poll(duration);
Map<String, String> headers = mapHeaders(records.headers());
String payload = records.value();

Problem jest taki, że one nie są równoważne. W przypadku kodu Springowego listenToUpperCase(String, Map<String, String>) będzie wołana za każdym razem gdy przyjdzie wiadomość, twój kod zbierze jedną wiadomość.

3. Nagłówki.
Są niekonfigurowalne, masz używać takich jakie wymyślił Spring.

mimo że przecież jest na to klasa: org.springframework.kafka.support.KafkaHeaders

Możesz sobie dodać nagłówki jakie chcesz, ale jeśli chcesz skorzystać z jakiejś konkretnej funkcjonalności spring-kafka to musisz się do nich przystosować.

0

ad 1. no i własnie mówię, że to jest do kitu. Serwis mówi masz tę wiadomość i wyślij mi ją tu i tu i musisz edytować tę wiadomość żeby zrobić to nagłówkami. A wystarczyłoby dodać 1 metodę do KafkaTemplate.
ad 3. nie o to mi chodzi, chodzi o to, że wpisują coś z palca i nie wiesz jakich headerów masz użyć żeby się przystosować. Gdyby zamaist z palca brali to ze swojej KafkaHeaders to byś wiedział, że u siebie masz brać headery z KafkaHeaders.

0
NamingException napisał(a):

ad 1. no i własnie mówię, że to jest do kitu. Serwis mówi masz tę wiadomość i wyślij mi ją tu i tu i musisz edytować tę wiadomość żeby zrobić to nagłówkami. A wystarczyłoby dodać 1 metodę do KafkaTemplate.

No nie, bo wcześniej pisałeś o jakichś dzikich synchronizacjach i zmianach domyślnego topicu.
Mam wrażenie, że nie do końca czujesz o co chodzi tutaj. Message to nie jest wiadomość kafkowa - a JMSowa i jako taka musi "pasować" do innych brokerów. Dorzucenie jakiejś dziwnej metody, która by powodowała ignorowanie topicu z headera byłoby nielogiczne.

ad 3. nie o to mi chodzi, chodzi o to, że wpisują coś z palca i nie wiesz jakich headerów masz użyć żeby się przystosować. Gdyby zamaist z palca brali to ze swojej KafkaHeaders to byś wiedział, że u siebie masz brać headery z KafkaHeaders.

Ok, teraz to ty mnie trolujesz udając głupiego czy jak?
KafkaHeaders jest po to, żebyś nie musiał znać tych wartości jakie pod spodem sobie leżą. Jeśli dorzucasz message.setHeader(KafkaHeaders.TOPIC, "some.topic") to wiadomość poleci do "some.topic" przecież. Nie musisz wiedzieć jakiego klucza sobie pod spodem używają.

0
wartek01 napisał(a):
NamingException napisał(a):

ad 1. no i własnie mówię, że to jest do kitu. Serwis mówi masz tę wiadomość i wyślij mi ją tu i tu i musisz edytować tę wiadomość żeby zrobić to nagłówkami. A wystarczyłoby dodać 1 metodę do KafkaTemplate.

No nie, bo wcześniej pisałeś o jakichś dzikich synchronizacjach i zmianach domyślnego topicu.

No, bo jak puszczę 2 wiadomości w jednym czasie tą samą metodą:

void send(String topic, Message<String> msg) {
    kafkaTemplate.setDefault(topic);
    kafkaTemplate.send(msg);
}

no to będzie niefajnie

Ok, teraz to ty mnie trolujesz udając głupiego czy jak?
KafkaHeaders jest po to, żebyś nie musiał znać tych wartości jakie pod spodem sobie leżą. Jeśli dorzucasz message.setHeader(KafkaHeaders.TOPIC, "some.topic") to wiadomość poleci do "some.topic" przecież. Nie musisz wiedzieć jakiego klucza sobie pod spodem używają.

Czy masz problemy z rozumieniem tekstu czytanego czy jak?

W screenie który załączyłem Spring ma konwerter, w którym nazwy headerów wpisał sobie z palca zamaist użyć swojego KafkaHeaders.

0
wartek01 napisał(a):

Dorzucenie jakiejś dziwnej metody, która by powodowała ignorowanie topicu z headera byłoby nielogiczne.

Widać, że nie znasz tej implementacji, a się wypowiadasz, bo ten defaultowy topic przez konwerter jest używany tylko jeśli nie ma go headerze. Zamiast:

	@SuppressWarnings("unchecked")
	@Override
	public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
		ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
		if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
			byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
			if (correlationId != null) {
				producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
			}
		}
		return doSend((ProducerRecord<K, V>) producerRecord);
	}

powinno być:

	@SuppressWarnings("unchecked")
	@Override
	public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
		return send(defaultTopic, message);
	}
	@SuppressWarnings("unchecked")
	@Override
	public ListenableFuture<SendResult<K, V>> send(String defaultTopic, Message<?> message) {
		ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, defaultTopic);
		if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
			byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
			if (correlationId != null) {
				producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
			}
		}
		return doSend((ProducerRecord<K, V>) producerRecord);
	}
1

Przyznam szczerze że nie nie rozumiem o co chodzi z tym:

void send(String topic, Message<String> msg) {
    kafkaTemplate.setDefault(topic);
    kafkaTemplate.send(msg);
}

Przeciez możesz sobie tworzyć pierdyliard kafkaTemplatów, a tak właściwie to powinno się pewnie tworzyć jeden pod konkretny topic. Co prawda na Kafce się nie znam, ale tak nakazuje sądzić logika, podobnie jest przecież z RestTemplate np. Nie robisz jednego RestTemplate i do googlowej ReCaptchy i do API z kursem walut od banku centralnego Europy.
Możesz przecież możec użyć Qualifier i wstrzykanąć KafkaTemplate orderKafkaTemplate.

0
NamingException napisał(a):
void send(String topic, Message<String> msg) {
    kafkaTemplate.setDefault(topic);
    kafkaTemplate.send(msg);
}

no to będzie niefajnie

No to po co puszczasz to tak skoro wystarczy skorzystać z MessageBuildera:

public void reroute(final Message<?> message, final String newTopic) {
    Message<?> newMessage = MessageBuilder.from(message)
        .setHeader(KafkaHeaders.TOPIC, newTopic)
        .build();

    kafkaTemplate.send(newMessage);
}

W screenie który załączyłem Spring ma konwerter, w którym nazwy headerów wpisał sobie z palca zamaist użyć swojego KafkaHeaders.

Ale ty zdajesz sobie sprawę, że np. KafkaHeaders.TOPIC jest taki sam jak klucz do topicu w podanym kodzie?
Ciebie nie powinno obchodzić jak jest napisany kod, ty powinieneś się martwić czy jak podasz KafkaHeaders.TOPIC to zadziała. A zadziała.

Widać, że nie znasz tej implementacji, a się wypowiadasz, bo ten defaultowy topic przez konwerter jest używany tylko jeśli nie ma go headerze.

Podstawowe pytanie - skoro chcesz robić sobie jakiś routing to czemu nie wskazujesz na jaki topic ma iść dany element, tylko chcesz zmieniać ten domyślny? To jest jakieś zadanie na studia gdzie zakazano ci korzystać z innych metod?

1 użytkowników online, w tym zalogowanych: 0, gości: 1