微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

springboot kafka group.id多消费组配置

kafka aide_941 40℃

springboot kafka group.id多消费组配置

 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zwx19921215/article/details/83341522

很早之前就使用了springboot + kafka组合配置,但是之前使用的spring-kafka(1.1.7)版本较低,所以只能通过 spring.kafka.consumer.group-id=default_consumer_group 或者 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, “default_consumer_group”);的形式配置一个默认消组,当然理论上这也是没有问题的,但是如果你定义的topic数量过多且并发消费比较大,只有一个消费组的配置方式就会暴露出很多问题,其中主要的一个问题便是每个topic分区的offset偏移量问题(在大并发下会出现offset异常问题),因为他们都保存在同一个消费组中。

直到后来发布了spring-kafka 1.3.x的版本后,增加了groupId的属性,非常方便的帮助我们解决了实现每个topic自定义一个消费组的问题,我们再也不用共用一个消费组了。

接下来通过代码演示看是否如我们的期望一样:

pom依赖

 

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>1.5.10.RELEASE</version>
  5. <relativePath/> <!– lookup parent from repository –>
  6. </parent>
  7. <properties>
  8. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  9. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  10. <java.version>1.8</java.version>
  11. </properties>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-web</artifactId>
  16. </dependency>
  17. <!– https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka –>
  18. <dependency>
  19. <groupId>org.springframework.kafka</groupId>
  20. <artifactId>spring-kafka</artifactId>
  21. <version>1.3.5.RELEASE</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-test</artifactId>
  26. <scope>test</scope>
  27. </dependency>
  28. <!–引入elasticsearch–>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  32. </dependency>
  33. </dependencies>
  34. <build>
  35. <plugins>
  36. <plugin>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-maven-plugin</artifactId>
  39. </plugin>
  40. </plugins>
  41. </build>

 

application.properties

 

  1. server.port=10087
  2. spring.application.name=example
  3. #topic
  4. spring.kafka.bootstrap-servers=10.0.2.22:9092
  5. kafka.test.topic=TEST_TOPIC
  6. #es
  7. spring.data.elasticsearch.cluster-name=elasticsearch
  8. spring.data.elasticsearch.cluster-nodes=10.0.2.23:9300
  9. #spring.data.elasticsearch.cluster-nodes=10.0.2.22:9300

生产者:

 

  1. /**
  2. * @author xiaofeng
  3. * @version V1.0
  4. * @title: TestKafkaSender.java
  5. * @package: com.example.demo.kafka.sender
  6. * @description: kafka生产者
  7. * @date 2018/4/2 0002 下午 3:31
  8. */
  9. @Component
  10. public class TestKafkaSender {
  11. @Autowired
  12. private KafkaTemplate kafkaTemplate;
  13. @Value(“${kafka.test.topic}”)
  14. String testTopic;
  15. public void sendTest(String msg){
  16. kafkaTemplate.send(testTopic, msg);
  17. }
  18. }

消费者1:

  1. /**
  2. * @author xiaofeng
  3. * @version V1.0
  4. * @title: TestKafkaConsumer2.java
  5. * @package: com.example.demo.kafka.consumer
  6. * @description: kafka消费者
  7. * @date 2018/4/2 0002 下午 3:31
  8. */
  9. @Component
  10. public class TestKafkaConsumer {
  11. Logger logger = LoggerFactory.getLogger(getClass());
  12. /**
  13. * topics: 配置消费topic,以数组的形式可以配置多个
  14. * groupId: 配置消费组为”xiaofeng1“
  15. *
  16. * @param message
  17. */
  18. @KafkaListener(topics = {“${kafka.test.topic}”},groupId = “xiaofeng1”)
  19. public void consumer(String message) {
  20. logger.info(“groupId = xiaofeng1, message = “ + message);
  21. }
  22. }

消费者2:

  1. /**
  2. * @author xiaofeng
  3. * @version V1.0
  4. * @title: TestKafkaConsumer2.java
  5. * @package: com.example.demo.kafka.consumer
  6. * @description: kafka消费者
  7. * @date 2018/4/2 0002 下午 3:31
  8. */
  9. @Component
  10. public class TestKafkaConsumer2 {
  11. Logger logger = LoggerFactory.getLogger(getClass());
  12. /**
  13. * topics: 配置消费topic,以数组的形式可以配置多个
  14. * groupId: 配置消费组为”xiaofeng2“
  15. *
  16. * @param message
  17. */
  18. @KafkaListener(topics = {“${kafka.test.topic}”}, groupId = “xiaofeng2”)
  19. public void consumer(String message) {
  20. logger.info(“groupId = xiaofeng2, message = “ + message);
  21. }
  22. }

 

测试类:

  1. @Autowired
  2. TestKafkaSender sender;
  3. @Test
  4. public void send() {
  5. for (int i = 0; i < Integer.MAX_VALUE; i++) {
  6. logger.info(“send message = “ + i);
  7. sender.sendTest(i + “”);
  8. try {
  9. Thread.sleep(1000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }

运行效果:

 

转载请注明:SuperIT » springboot kafka group.id多消费组配置

喜欢 (0)or分享 (0)