“本文章springboot kafka 实现延时队列,使用Kafka消费者的pause函数(暂停)和resume函数(恢复)+线程池+定时任务+事件监听机制+工厂模式”

好文推荐:
2.5万字讲解DDD领域驱动设计,从理论到实践掌握DDD分层架构设计,赶紧收藏起来吧!
文章目录
一、延时队列定义
二、应用场景
三、技术实现方案:
1. Redis
2. Kafka
3. RabbitMQ
4. RocketMQ
四、Kafka延时队列背景
五、Kafka延时队列实现思路
六、Kafka延时队列架构图
七、kafka延时任务代码实现
1. KafkaSyncConsumer:Kafka消费者
2. KafkaDelayQueue:Kafka延迟队列
3. KafkaDelayQueueFactory:Kafka延迟队列工厂
4. KafkaPollListener:Kafka延迟队列事件监听
5. KafkaDelayConfig:Kafka延时配置文件
八、如何使用kafka延时队列
九、测试
十、总结
01
—
延时队列定义
02 — 应用场景
03 —
技术实现方案
04 —
Kafka延时队列背景
1. 基于以上四种实现延时队列的分析来,选择对应的技术方案的基础上呢,不同公司的mq的基础设施不同,如果只有Kafka,也没必要引入RabbitMQ和RocketMq来实现,引入新的组件也会顺便带来新的问题。
1. 基于以上四种实现延时队列的分析来,选择对应的技术方案的基础上呢,不同公司的mq的基础设施不同,如果只有Kafka,也没必要引入RabbitMQ和RocketMq来实现,引入新的组件也会顺便带来新的问题。
2. 网上搜Kafka实现延时队列有很多文章,很多文章说使用Kafka内部的时间轮,支持延时操作,但这是Kafka自己内部使用的,时间轮只是一个工具类,用户无法将其作为延迟队列来使用。
3. Kafka延时队列的最佳实践,使用Kafka消费者的暂停和恢复机制来实现。
05
—
Kafka延时队列实现思路
06 —
Kafka延时队列实现思路
07 —
kafka延时任务代码实现
源码目录: 1. KafkaSyncConsumer:Kafka消费者 该类封装了一个线程安全的KafkaConsumer,因为原生的 KafkaConsumer是不支持线程共享的,直接使用会报错:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access import org.apache.kafka.clients.consumer.ConsumerRecords; import java.time.Duration; /** @Override @Override synchronized void pauseAndSeek(TopicPartition partition, long offset) { @Override synchronized void resume(TopicPartition topicPartition) { @Override
2. KafkaDelayQueue:Kafka延迟队列 定义一个Kafka延期队列,包含的内容:KafkaDelayQueue,其中有延迟队列配置,主题,消费组,延迟时间,目标主题,KafkaSyncConsumer,ApplicationContext,poll线程池,delay线程池等等 import lombok.Getter; import java.time.Duration; /** 3. KafkaDelayQueueFactory:Kafka延迟队列工厂 Kafka延期队列的工厂,用于及其管理延迟队列 import lombok.Data; import java.util.Properties; /** public KafkaDelayQueueFactory(Properties properties, KafkaDelayConfig kafkaDelayConfig) { public void listener(String topic, String group, Integer delayTime, String targetTopic) { private KafkaDelayQueue<String, String> createKafkaDelayQueue(String topic, String group, Integer delayTime, String targetTopic, KafkaSyncConsumer<String, String> kafkaSyncConsumer) { private KafkaSyncConsumer<String, String> createKafkaSyncConsumer(String group) { 4. KafkaPollListener:Kafka延迟队列事件监听 import lombok.extern.slf4j.Slf4j; import java.time.Instant; /** public KafkaPollListener(KafkaTemplate kafkaTemplate) { @Override 5. KafkaConfig:Kafka配置文件 import lombok.Data; /** public KafkaDelayConfig() {
08 — 如何使用kafka延时队列 自己项目中引入以上代码之后,使用KafkaDelayApplication:一个Kafka延迟任务注册程序,注意一个延时主题对应一个延迟时间,后续有新的延迟任务只需要在此注册延迟任务的监听即可!开箱即用! 使用流程: 生产者发送消息到【延时主题】——自己写 然后Kafka将消息从【延时主题】经过【延时时间】后发送到【目标主题】——以下代码 自己创建消费者消费【目标主题】——自己写 import javax.annotation.PostConstruct; /** /** 09 —

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
* Kafka同步消费者 * @author WDYin * @date 2023/4/14
**/
public class KafkaSyncConsumer<K, V> extends KafkaConsumer<K, V> {
KafkaSyncConsumer(Properties properties) {
super(properties);
}
public synchronized ConsumerRecords<K, V> poll(Duration timeout) {
return super.poll(timeout);
}
public synchronized Set<TopicPartition> paused() {
return super.paused();
}
super.pause(Collections.singletonList(partition));
super.seek(partition, offset);
}
public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
super.commitSync(offsets);
}
super.resume(Collections.singleton(topicPartition));
}
public synchronized void commitSync() {
super.commitSync();
}
}
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;
* kafka延时队列 * * @Author WDYin * @Date 2022/7/2
**/
@Slf4j
@Getter
@Setter
class KafkaDelayQueue<K, V> {
private String topic;
private String group;
private Integer delayTime;
private String targetTopic;
private KafkaDelayConfig kafkaDelayConfig;
private KafkaSyncConsumer<K, V> kafkaSyncConsumer;
private ApplicationContext applicationContext;
private ThreadPoolTaskScheduler threadPoolPollTaskScheduler;
private ThreadPoolTaskSchedulerthreadPoolDelayTaskScheduler;
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
* 延时队列工厂 * @author WDYin * @date 2023/4/17
**/
@Datapublic
class KafkaDelayQueueFactory {
private KafkaDelayConfig kafkaDelayConfig;
private Properties properties;
private ApplicationContext applicationContext;
private Integer concurrency;
Assert.notNull(properties, "properties cannot null");
Assert.notNull(kafkaDelayConfig.getDelayThreadPool(), "delayThreadPool cannot null");
Assert.notNull(kafkaDelayConfig.getPollThreadPool(), "pollThreadPool cannot null");
Assert.notNull(kafkaDelayConfig.getPollInterval(), "pollInterval cannot null");
Assert.notNull(kafkaDelayConfig.getPollTimeout(), "timeout cannot null");
this.properties = properties;
this.kafkaDelayConfig = kafkaDelayConfig;
}
if (StringUtils.isEmpty(topic)) {
throw new RuntimeException("topic cannot empty");
}
if (StringUtils.isEmpty(group)) {
throw new RuntimeException("group cannot empty");
}
if (StringUtils.isEmpty(delayTime)) {
throw new RuntimeException("delayTime cannot empty");
}
if (StringUtils.isEmpty(targetTopic)) {
throw new RuntimeException("targetTopic cannot empty");
}
KafkaSyncConsumer<String, String> kafkaSyncConsumer = createKafkaSyncConsumer(group);
KafkaDelayQueue<String, String> kafkaDelayQueue = createKafkaDelayQueue(topic, group, delayTime, targetTopic, kafkaSyncConsumer);
kafkaDelayQueue.send();
}
KafkaDelayQueue<String, String> kafkaDelayQueue = new KafkaDelayQueue<>(kafkaSyncConsumer, kafkaDelayConfig);
Assert.notNull(applicationContext, "kafkaDelayQueue need applicationContext");
kafkaDelayQueue.setApplicationContext(applicationContext);
kafkaDelayQueue.setDelayTime(delayTime);
kafkaDelayQueue.setTopic(topic);
kafkaDelayQueue.setGroup(group);
kafkaDelayQueue.setTargetTopic(targetTopic);
return kafkaDelayQueue;
}
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
return new KafkaSyncConsumer<>(properties);
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
* 延时队列监听 * @Author : WDYin * @Date : 2021/5/7 * @Desc :
*/
@Slf4jpublic
class KafkaPollListener<K, V> implements ApplicationListener<KafkaPollEvent<K, V>> {
private KafkaTemplate kafkaTemplate;
this.kafkaTemplate = kafkaTemplate;
}
public void onApplicationEvent(KafkaPollEvent<K, V> event) {
ConsumerRecords<K, V> records = (ConsumerRecords<K, V>) event.getSource();
Integer delayTime = event.getDelayTime();
KafkaDelayQueue<K, V> kafkaDelayQueue = event.getKafkaDelayQueue();
KafkaSyncConsumer<K, V> kafkaSyncConsumer = kafkaDelayQueue.getKafkaSyncConsumer(); //1.获取poll到的有消息的分区 Set<TopicPartition> partitions = records.partitions(); //2.存储需要commit的消息,提高效率批量提交 Map<TopicPartition, OffsetAndMetadata> commitMap = new HashMap<>(); //3.遍历有消息的分区 partitions.forEach((partition) -> { List<ConsumerRecord<K, V>> consumerRecords = records.records(partition); //4.遍历分区里面的消息 for (ConsumerRecord<K, V> record : consumerRecords) { //5.获取消息创建时间 long startTime = (record.timestamp() / 1000) * 1000; long endTime = startTime + delayTime; //6.不符合条件的分区暂停消费 long now = System.currentTimeMillis(); if (endTime > now) { kafkaSyncConsumer.pauseAndSeek(partition, record.offset()); //7.使用 schedule()执行定时任务 kafkaDelayQueue.getThreadPoolPollTaskScheduler().schedule(kafkaDelayQueue.delayTask(partition), new Date(endTime)); //无需继续消费该分区下的其他消息,直接消费其他分区 break; } log.info("{}: partition:{}, offset:{}, key:{}, value:{}, messageDate:{}, nowDate:{}, messageDate:{}, nowDate:{}", Thread.currentThread().getName() + "#" + Thread.currentThread().getId(), record.topic() + "-" + record.partition(), record.offset(), record.key(), record.value(), LocalDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()), LocalDateTime.now(), startTime, Instant.now().getEpochSecond()); //发送目标主题 kafkaTemplate.send(kafkaDelayQueue.getTargetTopic(), record.value()); //更新需要commit的消息 commitMap.put(partition, new OffsetAndMetadata(record.offset() + 1)); } }); //8.批量提交,提高效率,commitSync耗时几百毫秒
if (!commitMap.isEmpty()) {
kafkaSyncConsumer.commitSync(commitMap);
}
}
}
* 延时队列配置 * @author WDYin * @date 2023/4/16
**/
@Datapublic
class KafkaDelayConfig {
private Integer pollInterval;
private Integer pollTimeout;
private Integer pollThreadPool;
private Integer delayThreadPool;
}
}
import javax.annotation.Resource;
* @author WDYin * @date 2023/4/18
**/
@Componentpublic
class KafkaDelayApplication {
@Resource
private KafkaDelayQueueFactory kafkaDelayQueueFactory;
* 延迟任务都可以配置在这里 * Kafka将消息从【延时主题】经过【延时时间】后发送到【目标主题】
*/
@PostConstruct
public void init() { //延迟30秒
kafkaDelayQueueFactory.listener("delay-30-second-topic", "delay-30-second-group", 1 * 30 * 1000, "delay-60-second-target-topic");
//延迟60秒
kafkaDelayQueueFactory.listener("delay-60-second-topic", "delay-60-second-group", 1 * 60 * 1000, "delay-60-second-target-topic");
//延迟30分钟
kafkaDelayQueueFactory.listener("delay-30-minute-topic", "delay-30-minute-group", 30 * 60 * 1000, "delay-30-minute-target-topic");
}
}
测试
1. 先往延时主题【delay-60-second-topic】发送一千条消息,一共10个分区,每个分区100条消息,消息时间是2023-04-21 16:37:26分,延迟消息消费时间就应该是2023-04-21 16:38:26

2. 延时队列进行消费:通过日志查看,消息日期和延迟队列消费消息时间正好相差一分钟

10
—
总结
-
本案例已成功实现Kafka的延时队列,并进行实测,代码引入可用非常方便。
-
Kafka实现的延时队列支持秒级别的延时任务,不支持毫秒级别,但是毫秒级别的延时任务也没有意义
-
注意一个主题对应的延时时间是一致的,不能在同一个主题里放不同时间的延时任务。
-
此方案的缺点就是,如果数据量很大,一定要保证Kafka的消费能力,否则可能会导致延迟,精度不是特别高,不过如果延迟小时级别的任务,差异个几秒种肯定可以接受的,一般场景肯定满足。
-
完整代码已经给大家整理好了,可以关注【微信公众号】微信搜索【老板来一杯java】,然后【加群】直接获取源码。
好文推荐:
2.5万字讲解DDD领域驱动设计,从理论到实践掌握DDD分层架构设计,赶紧收藏起来吧!
更多好文点击下方【阅读原文】!!!
本篇文章来源于微信公众号: 老板来一杯java
微信扫描下方的二维码阅读本文

Comments NOTHING