kafka partition（分区）与 group

1、原理图

2、原理描述

3、查看topic-group的offsert

int hashCode = Math.abs(“ttt”.hashCode());

int partition = hashCode % 50;

4.参数

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 `import` `kafka.producer.Partitioner;` `import` `kafka.utils.VerifiableProperties;` `public` `class` `JasonPartitioner ``implements` `Partitioner {` `    ``public` `JasonPartitioner(VerifiableProperties verifiableProperties) {}` `    ``@Override` `    ``public` `int` `partition(Object key, ``int` `numPartitions) {` `        ``try` `{` `            ``int` `partitionNum = Integer.parseInt((String) key);` `            ``return` `Math.abs(Integer.parseInt((String) key) % numPartitions);` `        ``} ``catch` `(Exception e) {` `            ``return` `Math.abs(key.hashCode() % numPartitions);` `        ``}` `    ``}` `}`

 1 2 3 4 5 6 7 8 9 10 `public` `void` `sendMessage() ``throws` `InterruptedException{` `　　``for``(``int` `i = ``1``; i <= ``5``; i++){` `　　      List messageList = ``new` `ArrayList>();` `　　      ``for``(``int` `j = ``0``; j < ``4``; j++）{` `　　          messageList.add(``new` `KeyedMessage(``"topic2"``, j+``""``, ``"The "` `+ i + ``" message for key "` `+ j));` `　　      }` `　　      producer.send(messageList);` `    ``}` `　　producer.close();` `}`

4、consumer group   （本节所有描述都是基于Consumer hight level API而非low level API）。