一起来学kafka之整合SpringBoot深入使用(一)
前言
目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节给大家讲一下Kafka整合SpringBoot中如何进行消息应答以及@SendTo 和 @KafkaListener的讲解~
好了, 废话不多说直接开整吧~
pom引用
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>application.yml配置
kafka:
bootstrap-servers: kafka配置地址
consumer:
group-id: atlas.consumer.group
enable-auto-commit: false
auto-commit-interval: 100ms
properties:
session.timeout.ms: 15000
max-poll-interval-ms: 15000
max-partition-fetch-bytes: 15728640
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
max-poll-records: 50
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
compression-type: gzip
listener:
ack-mode: manual_immediate
concurrency: 5
type: batch
# client-id: atlasKafkaCli消息应答
有时候,消费者消费消息的时候,我们需要知道它有没有消费完,需要它给我们一个回应,该怎么做呢? 我们可以通过提供的ReplyingKafkaTemplate, 下面通过一个例子来体验一下,新建一个ReceiveCustomerController
@Slf4j
@RestController
public class ReceiveCustomerController{
private static final String topic = "hello3";
private static final String topicCroup = "hello3Group";
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory){
ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer(topic + "_replies");
repliesContainer.getContainerProperties().setGroupId(topicCroup);
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer){
return new ReplyingKafkaTemplate(producerFactory, repliesContainer);
}
@Bean
public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> producerFactory){
return new KafkaTemplate(producerFactory);
}
@Autowired
private ReplyingKafkaTemplate kafkaReplyTemplate;
@GetMapping("/send/{msg}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMsg(@PathVariable String msg) throws Exception{
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
RequestReplyFuture<String, String, String> replyFuture = kafkaReplyTemplate.sendAndReceive(record);
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
log.info("customer reply >>>> {}: ", consumerRecord.value()); // customer reply >>>>listen: I do it >>> 1:
}
@KafkaListener(id = topicCroup, topics = topic)
@SendTo
public String listen(String msg){
log.info("listen receive msg >>> {}", msg); // listen receive msg >>> 1
return "listen: I do it >>> " + msg;
}
}
启动应用,测试一下,观察控制台的变化~
// listen receive msg >>> 1
// customer reply >>>>listen: I do it >>> 1:
@SendTo
在 Spring Kafka 中,@SendTo 注解可以用于指定消息被发送到的目标 Topic。当消费者成功消费一个消息后,可以将结果发送到指定的目标 Topic,以供其他消费者进一步处理。
@SendTo 注解可以应用于 Kafka 消费者方法上,以指定消息的处理结果将被发送到哪个 Topic。下面通过一个例子来演示一下如何进行消息的转发~
@Slf4j
@RestController
public class SendToController{
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
// 接收消息
@Transactional(rollbackFor = Exception.class)
@KafkaListener(id= "input", topics = "inputTopic")
@SendTo("outputTopic")
public String processMessage(String message){
// 处理消息并返回结果
log.info("inputTopic >>>> {}", message); // inputTopic >>>> 1
return "2";
}
@KafkaListener(id = "output", topics = "outputTopic")
public String process1Message(String message){
// 处理消息并返回结果
String result = "Processed message: " + message;
log.info("outputTopic >>>> {}", result); // outputTopic >>>> Processed message: 2
return result;
}
@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello")
public String hello() {
// 发送消息
kafkaTemplate.send("inputTopic", "1");
return "hello";
}
}
观察控制台的日志信息
inputTopic >>>> 1
outputTopic >>>> Processed message: 2
可以看到消息被转发到outputTopic并且被output消费者成功消费
@KafkaListener
@KafkaListener 是一个注解,用于标记一个方法作为 Kafka 消费者。在 Spring Boot 应用程序中,使用该注解可以方便地处理Kafka 消息。
@KafkaListener 注解可以添加到类级别或方法级别。在类级别添加注解,将指定默认的 Topic 和消费者组 ID。在方法级别添加注解,则可以使用不同的 Topic 和消费者组 ID。
在前面的几个例子中,带大家已经体验过了,但都是监听一个topic,那么如何去监听多个topic呢? 其实很简单,下面通过一个例子来体验下
@Slf4j
@RestController
public class ListenerController{
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello")
public String hello() {
// 发送消息
kafkaTemplate.send("topic1", "1");
kafkaTemplate.send("topic2", "2");
return "hello";
}
/**
* 监听多个topic
* @param message
*/
@KafkaListener(topics = {"topic1", "topic2"}, groupId = "group1")
public void listen(String message){
log.info("Received message >>> {}", message);
// Received message >>> 1
// Received message >>> 2
}
}
如上,我们发送了两条不同的消息topic1和topic2,通过指定topics = {"topic1", "topic2"}成功消费两条消息
下面一起看一下,@KafkaListener还支持哪些参数?
@KafkaListener 注解支持许多参数,以满足不同的使用场景。以下是常用参数的列表:
-
topics:指定要消费的Topic的名称,可以是字符串或字符串数组。必填参数。 -
groupId:指定消费者组ID。消费者组是一组共享相同Topic的消费者的集合。默认值为"",表示使用默认的消费者组ID。 -
containerFactory:指定要使用的KafkaListenerContainerFactory实例的名称。如果没有指定,将使用默认的KafkaListenerContainerFactory实例。 -
concurrency:指定要创建的并发消费者的数量。默认值为1。 -
autoStartup:指定容器是否在应用启动时自动启动。默认值为true。 -
id:指定监听器的唯一标识符。默认值为""。 -
errorHandler:指定在处理消息时出现异常时要使用的ErrorHandler实例。 -
properties:指定传递给消费者工厂的Kafka消费者配置属性的Map。 -
partitionOffsets: 是一个Map类型的参数,该参数用于指定要从Topic的每个分区的哪个偏移量开始消费消息
有几个参数很好理解,没啥好讲的,我们主要看一下containerFactory,errorHandler, partitionOffsets
containerFactory
前面我们使用的都是默认的消息监听器,在 Spring Kafka 中,Kafka 消费者可以使用不同的消息监听器容器,例如 ConcurrentKafkaListenerContainerFactory、KafkaMessageListenerContainer 等。每个容器都提供了不同的功能和配置选项,可以根据实际需求进行选择和配置。 如果你需要自定义 Kafka 消费者的配置选项,可以通过在 Spring Boot 配置文件中设置属性来实现。另外一种方法是通过创建 KafkaListenerContainerFactory bean 并配置其属性来实现。
下面通过一个例子来体验一下ConcurrentKafkaListenerContainerFactory~
@Configuration
@EnableKafka
public class KafkaConfig{
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
//也可以设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
//factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
在上面的示例中,我们创建了一个 ConcurrentKafkaListenerContainerFactory 的 bean,并配置了一些属性,例如:
-
使用 DefaultKafkaConsumerFactory作为消费者工厂; -
设置并发消费者数量为 3; -
设置轮询超时时间为 3000 毫秒。
那么如何使用呢?很简单,只需之指定containerFactory就好了
@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello1")
public String hello1() {
// 发送消息
kafkaTemplate.send("topic3", "3");
return "hello1";
}
@KafkaListener(topics = "topic3", containerFactory = "kafkaListenerContainerFactory")
public void processMessage(String message){
// 处理消息
log.info("hello1 >>>>> {}", message);
}
我们可以通过containerFactory 应用于不同的消费者方法和主题,以满足不同的需求
errorHandler
在 ·Spring Kafka· 中,·ErrorHandler· 接口定义了在处理 Kafka 消息时发生错误时如何处理异常。如果不配置 ErrorHandler,则默认使用 LoggingErrorHandler 将异常记录到日志中。
如果你需要自定义异常处理逻辑,可以通过实现 ErrorHandler 接口并配置其 bean 来实现。
下面通过一个例子来体验一下:
@Slf4j
@Component
public class MyErrorHandler implements KafkaListenerErrorHandler{
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e){
// 处理异常,例如打印错误日志、发送错误消息等,自定义逻辑处理
log.error("Error occurred while processing message: {}", e.getMessage());
return null;
}
}
使用错误处理器:
@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello2")
public String hello2() {
// 发送消息
kafkaTemplate.send("topic4", "4");
return "hello2";
}
@KafkaListener(id = "topic4Group", topics = "topic4", errorHandler = "myErrorHandler")
public void processMessage1(String message){
// 处理消息
if(true)
throw new RuntimeException("消息处理异常");
log.info("hello2 >>>>> {}", message);
}
观察控制台:
Error occurred while processing message: Listener method 'public void com.kafka.study.controller.ListenerController.processMessage1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 消息处理异常
事例代码
config
@Configuration@Component@EnableKafkapublic class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;// @Value("${spring.kafka.client-id}")// private String clientId;//// @Value("${spring.kafka.consumer.enable-auto-commit}")// private Boolean autoCommit;//// @Value("${spring.kafka.consumer.auto-commit-interval}")// private Integer autoCommitInterval;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.max-poll-records}")private Integer maxPollRecords;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${spring.kafka.producer.retries}")private Integer retries;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.producer.batch-size}")private Integer batchSize;@Value("${spring.kafka.producer.buffer-memory}")private Integer bufferMemory;// @Value("${spring.kafka.producer.compression-type}")// private String compressionType;/*** 生产配置信息** @return*/@Beanpublic Map<String, Object> producerConfig() {Map<String, Object> props = new HashMap<String, Object>();props.put(ProducerConfig.ACKS_CONFIG, "1");props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);return props;}/*** 生产者工厂** @return*/@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfig());}/*** 生产者模板** @return*/@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** 消费者配置信息*/@Beanpublic Map<String, Object> consumerConfig() {Map<String, Object> props = new HashMap<String, Object>();props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);return props;}/*** 消费者批量工程*/@Beanpublic KafkaListenerContainerFactory<?> batchFactor() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<Integer, String>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));factory.setBatchListener(true);return factory;}}消费者示例代码
public class FlowConsumer {private static Logger log = LoggerFactory.getLogger(FlowConsumer.class);private static final String topicName = AtlasMQConstants.SendDifferentTopic.TOPIC;@Autowiredprivate ShopInspectionGradeAbarbeitungService shopInspectionGradeAbarbeitungService;@Autowiredprivate BaseWeixinService baseWeixinService;@KafkaListener(topics = {topicName})public void receiveMessageFormKafka(List<ConsumerRecord<?, ?>> recordArr, Acknowledgment ack) throws Exception {try {log.info("消费者等待2S钟消费start");Thread.sleep(2000);log.info("消费者等待2S钟消费end");} catch (InterruptedException e) {log.error("消费者等待消费异常:[{}]", e.getMessage());}Map<String, BaseWeixin> weiXinApi = baseWeixinService.queryWeiXinApi();for (ConsumerRecord<?, ?> record : recordArr) {log.info("监听消息:topic={},partition={},offset={},value={}", record.topic(), record.partition(), record.offset(), record.value());Optional<?> optional = Optional.ofNullable(record.value());if (optional.isPresent()) {FlowDifferentMessageVi messageVi = JsonUtils.jsonTo(record.value().toString(), FlowDifferentMessageVi.class);log.info("唯一编码:Unique coding={},message={}", messageVi.getMessageId(), messageVi.toString());shopInspectionGradeAbarbeitungService.sendDifferentMessageByCondition(messageVi, weiXinApi);}ack.acknowledge();}}}生产者示例代码
public class FlowProducerServer {private static Logger log = LoggerFactory.getLogger(FlowProducerServer.class);private final KafkaConfig kafkaConfig;private final UniqueIDMethodUtils uniqueIDMethodUtils;private final AuthService authService;private final String topicName = AtlasMQConstants.SendDifferentTopic.TOPIC;private final Producer<String, Object> producer;@Autowiredpublic FlowProducerServer(KafkaConfig kafkaConfig, UniqueIDMethodUtils uniqueIDMethodUtils, AuthService authService) {this.kafkaConfig = kafkaConfig;this.uniqueIDMethodUtils = uniqueIDMethodUtils;this.authService = authService;this.producer = new KafkaProducer<>(kafkaConfig.producerConfig());}public Result sendDifferentMessageByCondition(FlowSendDifferentMessVi vi) {Auth auth;try {auth = authService.getAuthCodeVerifyCode(vi.getEncryptAuth());} catch (Exception e) {throw new BizException("获取授权码验证异常:" + e.getMessage());}if (StringUtils.checkNull(auth)) {throw new BizException(ResultCode.DECRYPT_VERIFY_CODE_ERROR.getMsg());}if (StringUtils.checkNull(vi.getFlowName()) && StringUtils.checkNull(vi.getFlowId())) {throw new BizException(ResultCode.FLOW_ID_AND_FLOW_NAME_IS_NULL_ERROR);}FlowDifferentMessageVi messageVi = new FlowDifferentMessageVi();messageVi.setMessageId(uniqueIDMethodUtils.generateUniqueID());messageVi.setSendNum(0);messageVi.setServerFlag(topicName);messageVi.setFlowSendDifferentMessVi(vi);String toJson = JsonUtils.toJson(messageVi);producer.send(new ProducerRecord<String, Object>(topicName, toJson), (recordMetadata, e) -> {if (!StringUtils.checkNull(recordMetadata)) {log.info("生产消息唯一标识[{}],发送次数[{}],checksum:[{}],offset:[{}],partition:[{}],topic:[{}]",messageVi.getMessageId(), messageVi.getSendNum(), recordMetadata.checksum(), recordMetadata.offset(), recordMetadata.partition(), recordMetadata.topic());}if (!StringUtils.checkNull(e)) {log.error("生产消息发送失败,异常信息:[{}]", e.getMessage());throw new BizException("消息异常信息:" + e.getMessage());}});return Result.ok();}@PreDestroypublic void closeProducer() {producer.close();}}结束语
本节还遗留一个partitionOffsets这个我们放到下节给大家讲,涉及到分区 partition和偏移量 offset的概念,不是很好理解,下节给大家好好理一下这个概念~
微信扫描下方的二维码阅读本文

Comments NOTHING