STRAT

乘风破浪 | 直挂云帆

接下来我们了解一下RabbitMQ的高级特性以及独有特色的插件。

队列的选择

我们一直以来都是在使用Classic队列去实现一些相关的逻辑和功能,实际上在我们使用客户端创建队列的时候会出现三种可以选的队列classic经典队列,Quorum仲裁队列,Stream流式队列他们各自存在各自的优缺点,针对不同的业务可以使用不同的队列。

01
Classic经典队列

使用最多的队列,是一种先进先出的数据结构,面向单机服务,拥有较高的消息可靠性。

经典队列主要用在数据量比较小并且生产消息和消费消息的速度比较稳定的业务场景。

02
Quorum仲裁队列
为了弥补经典队列在分布式下的不足,也是目前主推的一种队列。
Quorum是基于类似zookeeper中的Raft一致性协议实现的一种新型的分布式消息队列,需要满足集群中多半节点同意确定后才会写入队列,保证消息在集群中不会丢失,整体意义上来说就是在Classic队列的基础上做减法,用来保证集群环境下的高可用
其中额外加了一个Poison Message handling参数,定义为毒消息,所谓毒消息是指消息一直不能被消费者正常消费(可能是由于消费者失败或者消费逻辑有问题等),就会导致消息不断的重新入队,这样这些消息就成为了毒消息。毒消息中失败次数会记录在header中的x-delivery-count参数中同时可以通过参数Delivery limit来设置重试的次数,如果重试次数到了存在死信队列就加入,不存在就直接删除了。
Quorum队列更适合于队列长期存在并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。
01
Stream流式队列
该队列类型的消息是持久化到磁盘并且具备分布式备份的更适合于消费者多,读消息非常频繁的场景。但是目前并不完善有待提升,但是是趋势。
核心是以append-only只添加的日志来记录消息整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset来实现消息的多次分发。

其他队列

01
死信队列

概念:未能正常消费的消息进行的一种补救机制,他还是一个普通的队列,亦可以绑定消费者去消费。

x-dead-letter-exchange: 对应的死信交换机
x-dead-letter-routing-key: 死信交换机routingKey
x-message-ttl: 消息过期时间
durable: 持久化。

产生方式

  • 消息被消费者确认拒绝后(通过channel.basicReject或者channel.basicNack)

  • 消息过期

  • 消息数量超长

实现方式:

单独配置某个队列为死信队列。

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"xxxQueue,yyyQueue"}' --apply-to queues

在服务器使用策略模式方法批量设置某些队列为死信。

channel.exchangeDeclare("xxx", "direct");Map<String, Object> args = new HashMap<String, Object>();args.put("x-dead-letter-exchange", "xxx");channel.queueDeclare("xxxqueue", false, false, false, args)
02
延迟队列
  1. 基于死信队列来实现延迟队列,设置一个队列将里面的消息设置好TTL时间,不绑定消费者,绑定一个死信队列,当消息过期后就会进入死信队列,此时死信队列我们绑定消费者,这样就可以间接性实现延时消息。
  2. 使用Rabbitmq的rabbitmq_delayed_message_exchange插件来实现。

03
懒队列

懒队列的设计目的就是为了内存堆积问题,先将消息持久化在磁盘里面,等需要使用的时候会将消息加载到内存里面进行执行。

懒队列会尝试尽可能早的把消息写到硬盘中。这意味着在正常操作的大多数情况下,RAM中要保存的消息要少得多。当然,这是以增加磁盘IO为代价的。

实现方式:

直接在参数里面进行配置。

 Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);

通过策略去指定。

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues

插件

联邦插件:实现数据之间的同步,并不需要你为了去连接需要同步的Rabbitmq服务而搭建一个集群,直接让多个交换器或者队列之间建联邦关系,联邦交换器或联邦队列接收上游(交换器或队列)的消息。联邦交换器将原本发送给上游交换器的信息路由到本地某个队列上,而联邦队列允许本地的消费者消费到上游队列的消息。

消息分片:提升Lazy Queue的功能,将队列模拟生成多个队列,消费者去消费的时候只需要关心原始队列,Sharding插件在进行消息分散存储时,虽然尽量是按照轮询的方式,均匀的保存消息,但不一定能保证一定均匀,相对于消息的顺序和延迟就没有保证。

文章有帮助的话,点赞在看转发吧。

一起讨论,谢谢支持哟

长按二维码关注我
每日分享干货

本篇文章来源于微信公众号: java从零到壹



微信扫描下方的二维码阅读本文

此作者没有提供个人介绍
最后更新于 2023-11-13