1.说明
Spring可以方便的集成使用 Kafka消息队列 ,
只需要引入依赖包spring-kafka,
注意版本兼容问题,
本文详细介绍SpringBoot集成Kafka的方法,
以及生产者和消费者的使用方法。
2.引入依赖
在 pom.xml 中引入Spring Kafka版本:
1 | <dependency> |
3.配置
新增applicaion.yml如下配置:
Spring Kafka 通用配置
1 | spring: |
通用配置:spring.kafka.*
下面的admin、producer、consumer、streams配置,
会覆盖通用配置 spring.kafka.* 中相同的属性。
生产者配置
1 | spring: |
生产者相关配置:spring.kafka.producer.*
消费者配置
1 | spring: |
消费者相关配置:spring.kafka.consumer.*
默认 value-deserializer 使用 org.apache.kafka.common.serialization.StringDeserializer ,
只支持文本消息。
使用org.springframework.kafka.support.serializer.JsonDeserializer可以让消息支持JSON。
完整applicaion.yml配置:
1 | server: |
4.开发代码
KafkaMQApplication.java启动类,
注意要新增 @EnableKafka 注解:
1 | package com.yuwen.spring.kafka; |
生产者发送消息
Spring Kafka 提供KafkaTemplate类发送消息,
在需要的地方注入即可,
新增ProviderService.java生产者服务类:
1 | package com.yuwen.spring.kafka.provider; |
注意指定 topic ,
以及要发送的消息内容message。
消费者接收消息
新增ConsumerService.java类,
注意使用 @KafkaListener 注解:
1 | package com.yuwen.spring.kafka.consumer; |
参数说明:
topics 与发送消息topic相同,可以指定多个
groupId 消费组唯一id
topicPartitions topic分区,可指定多个
一辈子很短,努力的做好两件事就好;
第一件事是热爱生活,好好的去爱身边的人;
第二件事是努力学习,在工作中取得不一样的成绩,实现自己的价值,而不是仅仅为了赚钱;