服务支持型网站,企业高端网站建设,东莞做网站制作,wordpress文章加载慢前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 [本文] RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题模式 上文我们了解了 RabbitMQ 六种队列模式中的简单队列#xff0c;代码也是非常的简单#xff0c;比较…前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 [本文] RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题模式 上文我们了解了 RabbitMQ 六种队列模式中的简单队列代码也是非常的简单比较容易理解。
但是简单队列有个缺点简单队列是一一对应的关系即点对点一个生产者对应一个消费者按照这个逻辑如果我们有一些比较耗时的任务也就意味着需要大量的时间才能处理完毕显然简单队列模式并不能满足我们的工作需求我们今天再来看看工作队列。
文章目录 文章目录前言文章目录1. 什么是工作队列2. 代码部分2.1 生产者2.2 消费者3. 循环分发3.1 启动生产者3.2 启动两个消费者3.3 公平分发4. 消息持久化4.1 问题背景4.2 参数配置5. 工作队列总结1. 什么是工作队列
工作队列用来将耗时的任务分发给多个消费者工作者
主要解决问题处理资源密集型任务并且还要等他完成。有了工作队列我们就可以将具体的工作放到后面去做将工作封装为一个消息发送到队列中一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程那么工作就可以在多个进程间共享。
工作队列也称为公平性队列模式怎么个说法呢
循环分发假如我们拥有两个消费者默认情况下RabbitMQ 将按顺序将每条消息发送给下一个消费者平均而言每个消费者将获得相同数量的消息这种分发消息的方式称为轮询。
看代码吧。
2. 代码部分
2.1 生产者
创建50个消息
public class Producer2 {/** 队列名称 */private static final String QUEUE_NAME test_queue;public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel newConnection.createChannel();/**3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/**保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */channel.basicQos(1);for (int i 1; i 50; i) {String msg 生产者消息_ i;System.out.println(生产者发送消息: msg);/**4.发送消息 */channel.basicPublish(, QUEUE_NAME, null, msg.getBytes());}channel.close();newConnection.close();}}2.2 消费者
public class Customer2_1 {/*** 队列名称*/private static final String QUEUE_NAME test_queue;public static void main(String[] args) throws IOException, TimeoutException {System.out.println(001);/** 1.获取连接 */Connection newConnection MQConnectionUtils.newConnection();/** 2.获取通道 */final Channel channel newConnection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */channel.basicQos(1);DefaultConsumer defaultConsumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String msgString new String(body, UTF-8);System.out.println(消费者获取消息: msgString);try {Thread.sleep(1000);} catch (Exception e) {} finally {/** 手动回执消息 */channel.basicAck(envelope.getDeliveryTag(), false);}}};/** 3.监听队列 */channel.basicConsume(QUEUE_NAME, false, defaultConsumer);}}3. 循环分发
3.1 启动生产者 3.2 启动两个消费者 在生产者中我们发送了50条消息进入队列而上方消费者启动图里很明显的看到轮询的效果就是每个消费者会分到相同的队列任务。
3.3 公平分发
由于上方模拟的是非常简单的消息队列的消费假如有一些非常耗时的任务某个消费者在缓慢地进行处理而另一个消费者则空闲显然是非常消耗资源的。
再举一个例子一个1年的程序员跟一个3年的程序员分配相同的任务量明显3年的程序员处理起来更加得心应手很快就无所事事了但是3年的程序员拿着非常高的薪资显然3年的程序员应该承担更多的责任那怎么办呢
公平分发。
其实发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去而没有确认各个工作者未返回确认的消息数量类似于TCP/UDP中的UDP面向无连接。
因此我们可以使用 basicQos 方法并将参数 prefetchCount 设为1告诉 RabbitMQ 我每次值处理一条消息你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了而是寻找空闲的工作者进行分发。
关键性代码
/** 2.获取通道 */
final Channel channel newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);4. 消息持久化
4.1 问题背景
上边我们提到的公平分发是由消费者收取消息时确认解决的但是这里面又会出现被 kill 的情况。
当有多个消费者同时收取消息且每个消费者在接收消息的同时还要处理其它的事情且会消耗很长的时间。在此过程中可能会出现一些意外比如消息接收到一半的时候一个消费者死掉了。
这种情况要使用消息接收确认机制可以执行上次宕机的消费者没有完成的事情。
但是在默认情况下我们程序创建的消息队列以及存放在队列里面的消息都是非持久化的。当RabbitMQ死掉了或者重启了上次创建的队列、消息都不会保存。
怎么办呢
4.2 参数配置
参数配置一生产者创建队列声明时修改第二个参数为 true
/**3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);参数配置二生产者发送消息时修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN
for (int i 1; i 50; i) {String msg 生产者消息_ i;System.out.println(生产者发送消息: msg);/**4.发送消息 */channel.basicPublish(, QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}5. 工作队列总结
1、循环分发消费者端在信道上打开消息应答机制并确保能返回接收消息的确认信息这样可以保证消费者发生故障也不会丢失消息。
2、消息持久化服务器端和客户端都要指定队列的持久化和消息的持久化这样可以保证RabbitMQ重启队列和消息也不会。
3、公平分发指定消费者接收的消息个数避免出现消息均匀推送出现的资源不合理利用的问题。
案例代码https://www.lanzous.com/i5ydu6d 我创建了一个java相关的公众号用来记录自己的学习之路感兴趣的小伙伴可以关注一下微信公众号哈niceyoo