Już wiem.
W kodzie producenta Kafki mamy tak:
public class KafkaProducer<K, V> implements Producer<K, V> {
//...
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
// ...
int partition = this.partition(record, serializedKey, serializedValue, cluster);
// ...
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
a domyslny Partitioner:
public class DefaultPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public DefaultPartitioner() {
}
public void configure(Map<String, ?> configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return this.stickyPartitionCache.partition(topic, cluster);
} else {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
public void close() {
}
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
this.stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}
Wynika z tego, że jeśli podam mu klucz to partycję dobiera sobie tylko na podstawie klucza, a nie na podstawie tego gdzie wcześniej dany klucz wędrował. Liczy sobie hashCode klucza i reszte zdzielenia przez liczbę partycji.