[Kafka] klucze a partycje

0

Czytałem, że Kafka dobiera partycje dla produkowanych rekordów po kluczach, tak by te same klucze były tylko w 1 partycji.

Utworzyłem temat z 4 partycjami.

Do 0 partycji wyprodukowałem rekord z kluczem 0.
Do 1 z kluczem 1.
Do 2 2.
Do 3 3.

Nastepni wyprodukowałem 4 rekordy z kluczem 0 do partycji 1.

Na koniec wyprodukowałem 4 rekordy z kluczem 0, 1, 2 i 3 bez wskazywania partycji.
Rekord z kluczem 0 poszedł do partycji 0. Rekord z kluczem 1 do partycji 2.

Czy to jest prawidłowe zachowanie? Staram się to zrozumieć.
title

1

To zależy jakiego Partitionera używasz, czy wskazujesz klucz i czy zmieniałeś liczbę partycji.

https://dzone.com/articles/custom-partitioner-in-kafka-lets-take-quick-tour

0
Charles_Ray napisał(a):

To zależy jakiego Partitionera używasz, czy wskazujesz klucz i czy zmieniałeś liczbę partycji.

wskazuję klucz cały czas a liczby partycji nie zmieniam

Charles_Ray napisał(a):

https://dzone.com/articles/custom-partitioner-in-kafka-lets-take-quick-tour

Czytam

When no partition number or key is present, pick a partition in a round-robin fashion.

round-robin to znaczy, że możliwie po równo będzie rozdzielał...
Sprawdziłem na czystym temacie i nie jest tak. Do 2 partycji wepchnął mi już 6 elementów, poczas, gdy w 1 partycji jest tylko 1 element. W kodzie mają tak:

int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

A co do problemu to właśnie to o czym piszą mi się właśnie to nie zgadza.

0

Już wiem.

W kodzie producenta Kafki mamy tak:

public class KafkaProducer<K, V> implements Producer<K, V> {

//...

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {

// ...

        int partition = this.partition(record, serializedKey, serializedValue, cluster);

// ...

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

a domyslny Partitioner:

public class DefaultPartitioner implements Partitioner {
    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public DefaultPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return this.stickyPartitionCache.partition(topic, cluster);
        } else {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    public void close() {
    }

    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        this.stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

Wynika z tego, że jeśli podam mu klucz to partycję dobiera sobie tylko na podstawie klucza, a nie na podstawie tego gdzie wcześniej dany klucz wędrował. Liczy sobie hashCode klucza i reszte zdzielenia przez liczbę partycji.

1 użytkowników online, w tym zalogowanych: 0, gości: 1