微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

kafka consumer group总结

kafka aide_941 28℃

kafka consumer group总结

kafka消费者api分为high api和low api,目前上述demo是都是使用kafka high api,高级api不用关心维护消费状态信息和负载均衡,不用关心offset。
高级api的一些注意事项:
1. 如果consumer group中的consumer线程数量比partition多,那么有的线程将永远不会收到消息。
因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数

2,如果consumer group中的consumer线程数量比partition少,那么有的线程将会收到多个消息。并且不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,

3,增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

4,High-level接口中获取不到数据的时候是会block的

关于consumer group(high api)的几点总结:
1,以consumer group为单位订阅 topic,每个consumer一起去消费一个topic;
2,consumer group 通过zookeeper来消费kafka集群中的消息(这个过程由zookeeper进行管理);
相对于low api自己管理offset,high api把offset的管理交给了zookeeper,但是high api并不是消费一次就在zookeeper中更新一次,而是每间隔一个(默认1000ms)时间更新一次offset,可能在重启消费者时拿到重复的消息。此外,当分区leader发生变更时也可能拿到重复的消息。因此在关闭消费者时最好等待一定时间(10s)然后再shutdown。
3,consumer group 设计的目的之一也是为了应用多线程同时去消费一个topic中的数据。

例子:

  1. import kafka.consumer.ConsumerIterator;
  2. import kafka.consumer.KafkaStream;
  3. public class ConsumerTest implements Runnable {
  4. private KafkaStream m_stream;
  5. private int m_threadNumber;
  6. public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
  7. m_threadNumber = a_threadNumber;
  8. m_stream = a_stream;
  9. }
  10. public void run() {
  11. ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
  12. while (it.hasNext())
  13. System.out.println(“Thread “ + m_threadNumber + “: “ + new String(it.next().message()));
  14. System.out.println(“Shutting down Thread: “ + m_threadNumber);
  15. }
  16. }
  17. //配置连接zookeeper的信息
  18. private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
  19. Properties props = new Properties();
  20. props.put(“zookeeper.connect”, a_zookeeper); //zookeeper连接地址
  21. props.put(“group.id”, a_groupId); //consumer group的id
  22. props.put(“zookeeper.session.timeout.ms”, “400”);
  23. props.put(“zookeeper.sync.time.ms”, “200”);
  24. props.put(“auto.commit.interval.ms”, “1000”);
  25. return new ConsumerConfig(props);
  26. }
  27. //建立一个消费者线程池
  28. public void run(int a_numThreads) {
  29. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  30. topicCountMap.put(topic, new Integer(a_numThreads));
  31. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  32. List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
  33. // now launch all the threads
  34. //
  35. executor = Executors.newFixedThreadPool(a_numThreads);
  36. // now create an object to consume the messages
  37. //
  38. int threadNumber = 0;
  39. for (final KafkaStream stream : streams) {
  40. executor.submit(new ConsumerTest(stream, threadNumber));
  41. threadNumber++;
  42. }
  43. }
  44. //经过一段时间后关闭
  45. try {
  46. Thread.sleep(10000);
  47. } catch (InterruptedException ie) {
  48. }
  49. example.shutdown();
  1. import kafka.consumer.ConsumerIterator;
  2. import kafka.consumer.KafkaStream;
  3. public class ConsumerTest implements Runnable {
  4. private KafkaStream m_stream;
  5. private int m_threadNumber;
  6. public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
  7. m_threadNumber = a_threadNumber;
  8. m_stream = a_stream;
  9. }
  10. public void run() {
  11. ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
  12. while (it.hasNext())
  13. System.out.println(“Thread “ + m_threadNumber + “: “ + new String(it.next().message()));
  14. System.out.println(“Shutting down Thread: “ + m_threadNumber);
  15. }
  16. }
  17. //配置连接zookeeper的信息
  18. private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
  19. Properties props = new Properties();
  20. props.put(“zookeeper.connect”, a_zookeeper); //zookeeper连接地址
  21. props.put(“group.id”, a_groupId); //consumer group的id
  22. props.put(“zookeeper.session.timeout.ms”, “400”);
  23. props.put(“zookeeper.sync.time.ms”, “200”);
  24. props.put(“auto.commit.interval.ms”, “1000”);
  25. return new ConsumerConfig(props);
  26. }
  27. //建立一个消费者线程池
  28. public void run(int a_numThreads) {
  29. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  30. topicCountMap.put(topic, new Integer(a_numThreads));
  31. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  32. List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
  33. // now launch all the threads
  34. //
  35. executor = Executors.newFixedThreadPool(a_numThreads);
  36. // now create an object to consume the messages
  37. //
  38. int threadNumber = 0;
  39. for (final KafkaStream stream : streams) {
  40. executor.submit(new ConsumerTest(stream, threadNumber));
  41. threadNumber++;
  42. }
  43. }
  44. //经过一段时间后关闭
  45. try {
  46. Thread.sleep(10000);
  47. } catch (InterruptedException ie) {
  48. }
  49. example.shutdown();

转载请注明:SuperIT » kafka consumer group总结

喜欢 (0)or分享 (0)