《php网站开发》课程资料,北京建工集团有限公司官网,html5响应式网站,赣州人才网招聘一、概述
消息队列中间件#xff08;MQ#xff09;是不同系统之间消息传递#xff0c;异步通信的常见组件#xff0c;RabbitMQ、Kafka和RocketMQ是目前业界常见的3种消息中间件#xff0c;本文重点阐述了他们特性差异、架构设计和处理常见问题的方案。
二、特性比较
Ra…一、概述
消息队列中间件MQ是不同系统之间消息传递异步通信的常见组件RabbitMQ、Kafka和RocketMQ是目前业界常见的3种消息中间件本文重点阐述了他们特性差异、架构设计和处理常见问题的方案。
二、特性比较
RabbitMQ适合于中小规模的使用场景是目前业界使用最广泛的一种MQ其完全实现了AMQP的协议实现了非常丰富的消息可靠性的保障机制和其他MQ相比其在可靠性方面是最强的但也正是由于可靠性方面实现机制过于沉重导致其吞吐量并不高在生产环境经常会出现消息积压的问题。
Kafka适合于实时流处理的使用场景在大数据处理领域经常见到可以用来处理海量的日志数据和IoT海量数据采集由于其基于文件顺序读写的存储架构和基于zero-copy的IO处理策略使得他的吞吐量非常之高性能非常之好能够达到百万级别的数据处理吞吐量其可靠性保障主要是基于多副本这种策略所以可靠性方面明显不如RabbitMQ。
RabbitMQKafkaRocketMQ使用场景中小规模的使用场景实时流处理、海量日志数据处理性能均衡优势在分布式事务场景可靠性高AMPQ协议保障低基于多副本机制保障中等基于事务的保障吞吐量低万级别高基于顺序读写的存储架构百万级别中等十万级别时效性毫秒级别毫秒级别毫秒级别优点可靠性非常高吞吐量非常大性能非常好集群高可用性能和功能全面擅长分布式事务方向缺点吞吐量比较低消息积累会影响性能基于erlang开发不好定制数据可靠性保障较低会存在数据丢失客户端只支持Java官方文档支持较少
三、常见问题处理策略
1.可靠性保障
RabbitMQ
持久化机制。RabbitMQ通过消息持久化机制来确保消息的可靠传递。生产者可以选择将消息标记为持久化使得即使在消息队列服务器故障后消息也能被保存并传递给消费者。RabbitMQ生产者提供的可靠性机制包括发布确认Publish Confirm、事务机制Transaction生产者可以通过发布确认和事务机制获取消息是否成功被RabbitMQ接收和处理的确认RabbitMQ生产者提供的可靠性保障机制包括消息确认机制ACK消费者可以通过消息确认机制来保障消息的可靠消费。
void basicAck(long deliveryTag, boolean multiple)//确认消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue)//拒绝消息
void basicRecover(boolean requeue)//重发消息
Kafka
持久化。kafka的消息在发送前会被持久化存储到磁盘上即使在服务器重启后也不会丢失。但也需要对kafka的持久化消息设置失效时间保障存储空间的充足。多副本。Kafka采用多副本机制将消息复制到多个Broker节点上即使其中一个Broker节点故障仍然可以从其他副本节点读取和传递消息。
RocketMQ 和kafka类似。
总结RabbitMQ相比Kafka和RocketMQ他有跟丰富的可靠性保障机制包括保障生产者消息的可靠发送、数据的持久化还有消费者的可靠消费。
2.流控措施
流控措施主要是为了解决消息积压的问题如果生产者生成消息速率过快而消费者消费消息的速率过慢则会在MQ中形成消息挤压如果不及时处理就会造成MQ服务不可用或者OOM等问题。
RabbitMQ
调整消费者消息消费速率。主要是用来控制消费任务的条数。可以使用QoSQuality of Service机制设置每个消费者的预取计数限制每次从队列中获取的消息数量以控制消费者的处理速度。调整消费者消息消费流量。主要是用来控制消费消息的大小。通过设置basic.qos或basic.consume命令的参数来控制消费者的处理速度避免消息过多导致积压。
/**
* prefetchSize:服务器传送最大内容量以八位字节计算如果没有限制则为0
* prefetchCount:服务器每次传递的最大消息数如果没有限制则为0
* global:如果为true,则当前设置将会应用于整个Channel(频道)
**/
void basicQos(int prefetchSize, int prefetchCount, boolean global)
Kafka
调整分区数和副本数。kafka下游消费者的数量和其分区数是一致的所以Kafka通过分区和副本机制来实现消息的并行处理和负载均衡。可以根据消息的负载情况和消费者的处理能力通过增加分区数量、调整副本分配策略等方式来提高系统的处理能力。调整消息失效策略。kafka提供了消息的保存策略和清理策略可以根据时间和数据的使用情况来设置。
RocketMQ
动态调整消费者数量。RabbitMQ可以根据系统的负载情况和消息队列的堆积情况动态调整消费者的并发消费线程数以适应消息的处理需求。调整数据的拉取或推送的模式。RocketMQ还提供了消息拉取和推拉模式消费者可以根据自身的处理能力主动拉取消息避免消息积压过多。
总结流控措施的几种方式主要包括1扩大下游消费者的消费速率和流量2增大消费者的数量扩大消费能力3调整MQ的副本或分区数发挥下游消费者的最大消费能力4拉取或推送模式的权衡。
3.重复消费问题
重复性消费问题主要需要解决是幂等性问题对于重复下发的消息也能保障唯一性消费。
RabbitMQ
幂等性处理。在消费者端实现幂等性逻辑即无论消息被消费多少次最终的结果应该保持一致。这可以通过在消费端进行唯一标识的检查或者记录已经处理过的消息来实现。没下消费任务时都去查询该任务是否已被消费这种是重复下发后处理的方式。消息确认机制。消费者在处理完消息后发送确认消息ACK给RabbitMQ告知消息已经成功处理。RabbitMQ根据接收到的确认消息来判断是否需要重新投递消息给其他消费者这种是主动通知消息下发的方式。
Kafka
消息确认机制。消费者在处理完消息后提交已消费的偏移量Offset给KafkaKafka会记录已提交的偏移量以便在消费者重新启动时从正确的位置继续消费。消费者可以定期提交偏移量确保消息只被消费一次。
RocketMQ
使用消息唯一标识符Message ID。在消息发送时为每条消息附加一个唯一标识符。消费者在处理消息时可以通过判断消息唯一标识符来避免重复消费。可以将消息ID记录在数据库或缓存中用于去重检查。
总结在MQ中处理重复消费的问题主要的思路有1通过给消息加唯一性标识来过滤已经消费的消息对于像RocketMQ这种存在Messeage ID的处理起来就比较简单就只需要对Messeage ID去重即可对于像RabbitMQ和kafka这种可以将消息状态保存在数据库或缓存中进行唯一性去重2消息确认机制就是对于消息的消费会主动上报的每次消费完就会进行确认在RabbitMQ中是会恢复ACK标识在kafka中是会恢复offset标识。
4.消息顺序性
RabbitMQ
单个队列。rabbitmq 保证了同一个队列中的消息按照发布的顺序进入和出队。
Kafka
有序分区。kafka 保证了同一个分区topic partition中的消息按照发布的顺序存储和消费。
RocketMQ
有序分区。rokcetmq 保证了同一个队列topic queueId中的消息按照发布的顺序存储和消费。 参考资料
MQ黄金三剑客RabbitMQ、RocketMQ和Kafka深入解密常见问题及功能对比指南https://juejin.cn/post/7254267283249840184?utm_sourcegold_browser_extension 【RabbitMQ.Client笔记】Qos与消息应答https://www.cnblogs.com/fanfan-90/p/13589626.html 说明了通过Qos做限流通过手动ACK来进行消息确认《RabbitMQ系列》之RabbitMQ的优先级队列https://zhuanlan.zhihu.com/p/582787804实现了优先级队列 本文由博客一文多发平台 OpenWrite 发布