一起来学kafka之整合SpringBoot深入使用(一)

前言

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~

本节给大家讲一下Kafka整合SpringBoot中如何进行消息应答以及@SendTo 和 @KafkaListener的讲解~

好了, 废话不多说直接开整吧~

pom引用

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
 

application.yml配置

kafka:
bootstrap-servers: kafka配置地址
consumer:
group-id: atlas.consumer.group
enable-auto-commit: false
auto-commit-interval: 100ms
properties:
session.timeout.ms: 15000
max-poll-interval-ms: 15000
max-partition-fetch-bytes: 15728640

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
max-poll-records: 50

producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
compression-type: gzip
listener:
ack-mode: manual_immediate
concurrency: 5
type: batch
# client-id: atlasKafkaCli

消息应答

有时候,消费者消费消息的时候,我们需要知道它有没有消费完,需要它给我们一个回应,该怎么做呢? 我们可以通过提供的ReplyingKafkaTemplate, 下面通过一个例子来体验一下,新建一个ReceiveCustomerController

@Slf4j
@RestController
public class ReceiveCustomerController{

    private static final String topic = "hello3";
    private static final String topicCroup = "hello3Group";

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory){
        ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer(topic + "_replies");
        repliesContainer.getContainerProperties().setGroupId(topicCroup);
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer){
        return new ReplyingKafkaTemplate(producerFactory, repliesContainer);
    }

    @Bean
    public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> producerFactory){
        return new KafkaTemplate(producerFactory);
    }

    @Autowired
    private ReplyingKafkaTemplate kafkaReplyTemplate;

    @GetMapping("/send/{msg}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMsg(@PathVariable String msgthrows Exception{
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
        RequestReplyFuture<String, String, String> replyFuture = kafkaReplyTemplate.sendAndReceive(record);
        ConsumerRecord<String, String> consumerRecord = replyFuture.get();
        log.info("customer reply >>>> {}: ", consumerRecord.value()); // customer reply >>>>listen: I do it >>> 1:
    }

    @KafkaListener(id = topicCroup, topics = topic)
    @SendTo
    public String listen(String msg){
        log.info("listen receive msg >>>  {}", msg); // listen receive msg >>>  1
        return "listen: I do it >>> " + msg;
    }
}

启动应用,测试一下,观察控制台的变化~

 // listen receive msg >>>  1
 // customer reply >>>>listen: I do it >>> 1:

@SendTo

Spring Kafka 中,@SendTo 注解可以用于指定消息被发送到的目标 Topic。当消费者成功消费一个消息后,可以将结果发送到指定的目标 Topic,以供其他消费者进一步处理。

@SendTo 注解可以应用于 Kafka 消费者方法上,以指定消息的处理结果将被发送到哪个 Topic。下面通过一个例子来演示一下如何进行消息的转发~

@Slf4j
@RestController
public class SendToController{

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    // 接收消息
    @Transactional(rollbackFor = Exception.class)
    @KafkaListener(id"input", topics = "inputTopic")
    @SendTo("outputTopic")
    public String processMessage(String message){
        // 处理消息并返回结果
        log.info("inputTopic >>>> {}", message); // inputTopic >>>> 1
        return "2";
    }

    @KafkaListener(id = "output", topics = "outputTopic")
    public String process1Message(String message){
        // 处理消息并返回结果
        String result = "Processed message: " + message;
        log.info("outputTopic >>>> {}", result); // outputTopic >>>> Processed message: 2
        return result;
    }

    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/hello")
    public String hello() {
        // 发送消息
        kafkaTemplate.send("inputTopic""1");
        return "hello";
    }
}

观察控制台的日志信息

inputTopic >>>> 1
        
outputTopic >>>> Processed message: 2

可以看到消息被转发到outputTopic并且被output消费者成功消费

@KafkaListener

@KafkaListener 是一个注解,用于标记一个方法作为 Kafka 消费者。在 Spring Boot 应用程序中,使用该注解可以方便地处理Kafka 消息。

@KafkaListener 注解可以添加到类级别或方法级别。在类级别添加注解,将指定默认的 Topic消费者组 ID。在方法级别添加注解,则可以使用不同的 Topic消费者组 ID

在前面的几个例子中,带大家已经体验过了,但都是监听一个topic,那么如何去监听多个topic呢? 其实很简单,下面通过一个例子来体验下

@Slf4j
@RestController
public class ListenerController{

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/hello")
    public String hello() {
        // 发送消息
        kafkaTemplate.send("topic1""1");
        kafkaTemplate.send("topic2""2");
        return "hello";
    }

    /**
     * 监听多个topic
     * @param message
     */
    @KafkaListener(topics = {"topic1""topic2"}, groupId = "group1")
    public void listen(String message){
        log.info("Received message >>> {}", message);
        // Received message >>> 1
        // Received message >>> 2
    }

}

如上,我们发送了两条不同的消息topic1topic2,通过指定topics = {"topic1", "topic2"}成功消费两条消息

下面一起看一下,@KafkaListener还支持哪些参数?

@KafkaListener 注解支持许多参数,以满足不同的使用场景。以下是常用参数的列表:

  • topics:指定要消费的 Topic 的名称,可以是字符串或字符串数组。必填参数。
  • groupId:指定消费者组 ID。消费者组是一组共享相同 Topic 的消费者的集合。默认值为 "",表示使用默认的消费者组 ID
  • containerFactory:指定要使用的 KafkaListenerContainerFactory 实例的名称。如果没有指定,将使用默认的 KafkaListenerContainerFactory 实例。
  • concurrency:指定要创建的并发消费者的数量。默认值为 1
  • autoStartup:指定容器是否在应用启动时自动启动。默认值为 true
  • id:指定监听器的唯一标识符。默认值为 ""
  • errorHandler:指定在处理消息时出现异常时要使用的 ErrorHandler 实例。
  • properties:指定传递给消费者工厂的 Kafka 消费者配置属性的 Map
  • partitionOffsets: 是一个 Map 类型的参数,该参数用于指定要从 Topic每个分区的哪个偏移量开始消费消息

有几个参数很好理解,没啥好讲的,我们主要看一下containerFactory,errorHandler, partitionOffsets

containerFactory

前面我们使用的都是默认消息监听器,在 Spring Kafka 中,Kafka 消费者可以使用不同的消息监听器容器,例如 ConcurrentKafkaListenerContainerFactoryKafkaMessageListenerContainer 等。每个容器都提供了不同的功能和配置选项,可以根据实际需求进行选择和配置。 如果你需要自定义 Kafka 消费者的配置选项,可以通过在 Spring Boot 配置文件中设置属性来实现。另外一种方法是通过创建 KafkaListenerContainerFactory bean 并配置其属性来实现。

下面通过一个例子来体验一下ConcurrentKafkaListenerContainerFactory~

@Configuration
@EnableKafka
public class KafkaConfig{
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
         //也可以设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        //factory.setBatchListener(true);

        return factory;
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean
    public Map<String, Object> consumerConfigs(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

在上面的示例中,我们创建了一个 ConcurrentKafkaListenerContainerFactorybean,并配置了一些属性,例如:

  • 使用 DefaultKafkaConsumerFactory 作为消费者工厂
  • 设置并发消费者数量为 3
  • 设置轮询超时时间为 3000 毫秒

那么如何使用呢?很简单,只需之指定containerFactory就好了

@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello1")
public String hello1() {
    // 发送消息
    kafkaTemplate.send("topic3""3");
    return "hello1";
}

@KafkaListener(topics = "topic3", containerFactory = "kafkaListenerContainerFactory")
public void processMessage(String message){
    // 处理消息
    log.info("hello1 >>>>> {}", message);
}

我们可以通过containerFactory 应用于不同的消费者方法和主题,以满足不同的需求

errorHandler

在 ·Spring Kafka· 中,·ErrorHandler· 接口定义了在处理 Kafka 消息时发生错误时如何处理异常。如果不配置 ErrorHandler,则默认使用 LoggingErrorHandler 将异常记录到日志中。

如果你需要自定义异常处理逻辑,可以通过实现 ErrorHandler 接口并配置其 bean 来实现。

下面通过一个例子来体验一下:

@Slf4j
@Component
public class MyErrorHandler implements KafkaListenerErrorHandler{
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException e){
        // 处理异常,例如打印错误日志、发送错误消息等,自定义逻辑处理
        log.error("Error occurred while processing message:  {}", e.getMessage());
        return null;
    }
}

使用错误处理器:

@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello2")
public String hello2() {
    // 发送消息
    kafkaTemplate.send("topic4""4");
    return "hello2";
}

@KafkaListener(id = "topic4Group", topics = "topic4", errorHandler = "myErrorHandler")
public void processMessage1(String message){
    // 处理消息
    if(true)
        throw new RuntimeException("消息处理异常");
    log.info("hello2 >>>>> {}", message);
}

观察控制台:

Error occurred while processing message:  Listener method 'public void com.kafka.study.controller.ListenerController.processMessage1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 消息处理异常
 
 
事例代码
 
config

@Configuration
@Component
@EnableKafka
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

// @Value("${spring.kafka.client-id}")
// private String clientId;
//
// @Value("${spring.kafka.consumer.enable-auto-commit}")
// private Boolean autoCommit;
//
// @Value("${spring.kafka.consumer.auto-commit-interval}")
// private Integer autoCommitInterval;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;

@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;

@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;

@Value("${spring.kafka.producer.retries}")
private Integer retries;

@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;

@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;

@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;

@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;

// @Value("${spring.kafka.producer.compression-type}")
// private String compressionType;

/**
* 生产配置信息
*
* @return
*/
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return props;
}

/**
* 生产者工厂
*
* @return
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}

/**
* 生产者模板
*
* @return
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

/**
* 消费者配置信息
*/
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
return props;
}

/**
* 消费者批量工程
*/
@Bean
public KafkaListenerContainerFactory<?> batchFactor() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<Integer, String>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
factory.setBatchListener(true);
return factory;
}

}
 
消费者示例代码
 
public class FlowConsumer {

private static Logger log = LoggerFactory.getLogger(FlowConsumer.class);

private static final String topicName = AtlasMQConstants.SendDifferentTopic.TOPIC;

@Autowired
private ShopInspectionGradeAbarbeitungService shopInspectionGradeAbarbeitungService;

@Autowired
private BaseWeixinService baseWeixinService;

@KafkaListener(topics = {topicName})
public void receiveMessageFormKafka(List<ConsumerRecord<?, ?>> recordArr, Acknowledgment ack) throws Exception {
try {
log.info("消费者等待2S钟消费start");
Thread.sleep(2000);
log.info("消费者等待2S钟消费end");
} catch (InterruptedException e) {
log.error("消费者等待消费异常:[{}]", e.getMessage());
}
Map<String, BaseWeixin> weiXinApi = baseWeixinService.queryWeiXinApi();
for (ConsumerRecord<?, ?> record : recordArr) {
log.info("监听消息:topic={},partition={},offset={},value={}", record.topic(), record.partition(), record.offset(), record.value());
Optional<?> optional = Optional.ofNullable(record.value());
if (optional.isPresent()) {
FlowDifferentMessageVi messageVi = JsonUtils.jsonTo(record.value().toString(), FlowDifferentMessageVi.class);
log.info("唯一编码:Unique coding={},message={}", messageVi.getMessageId(), messageVi.toString());
shopInspectionGradeAbarbeitungService.sendDifferentMessageByCondition(messageVi, weiXinApi);
}
ack.acknowledge();
}
}

}
 
生产者示例代码
 
public class FlowProducerServer {

private static Logger log = LoggerFactory.getLogger(FlowProducerServer.class);

private final KafkaConfig kafkaConfig;

private final UniqueIDMethodUtils uniqueIDMethodUtils;

private final AuthService authService;

private final String topicName = AtlasMQConstants.SendDifferentTopic.TOPIC;

private final Producer<String, Object> producer;

@Autowired
public FlowProducerServer(KafkaConfig kafkaConfig, UniqueIDMethodUtils uniqueIDMethodUtils, AuthService authService) {
this.kafkaConfig = kafkaConfig;
this.uniqueIDMethodUtils = uniqueIDMethodUtils;
this.authService = authService;
this.producer = new KafkaProducer<>(kafkaConfig.producerConfig());
}

public Result sendDifferentMessageByCondition(FlowSendDifferentMessVi vi) {
Auth auth;
try {
auth = authService.getAuthCodeVerifyCode(vi.getEncryptAuth());
} catch (Exception e) {
throw new BizException("获取授权码验证异常:" + e.getMessage());
}
if (StringUtils.checkNull(auth)) {
throw new BizException(ResultCode.DECRYPT_VERIFY_CODE_ERROR.getMsg());
}
if (StringUtils.checkNull(vi.getFlowName()) && StringUtils.checkNull(vi.getFlowId())) {
throw new BizException(ResultCode.FLOW_ID_AND_FLOW_NAME_IS_NULL_ERROR);
}
FlowDifferentMessageVi messageVi = new FlowDifferentMessageVi();
messageVi.setMessageId(uniqueIDMethodUtils.generateUniqueID());
messageVi.setSendNum(0);
messageVi.setServerFlag(topicName);
messageVi.setFlowSendDifferentMessVi(vi);
String toJson = JsonUtils.toJson(messageVi);
producer.send(new ProducerRecord<String, Object>(topicName, toJson), (recordMetadata, e) -> {
if (!StringUtils.checkNull(recordMetadata)) {
log.info("生产消息唯一标识[{}],发送次数[{}],checksum:[{}],offset:[{}],partition:[{}],topic:[{}]",
messageVi.getMessageId(), messageVi.getSendNum(), recordMetadata.checksum(), recordMetadata.offset(), recordMetadata.partition(), recordMetadata.topic());
}
if (!StringUtils.checkNull(e)) {
log.error("生产消息发送失败,异常信息:[{}]", e.getMessage());
throw new BizException("消息异常信息:" + e.getMessage());
}
});
return Result.ok();
}

@PreDestroy
public void closeProducer() {
producer.close();
}

}

结束语

本节还遗留一个partitionOffsets这个我们放到下节给大家讲,涉及到分区 partition偏移量 offset的概念,不是很好理解,下节给大家好好理一下这个概念~

 



微信扫描下方的二维码阅读本文

此作者没有提供个人介绍
最后更新于 2023-05-28