手机微信网站怎么做的好处,搭建网上商城,关键词分为哪三类,网站免费软件下载文章目录 1、RabbitMQ是什么1.1、RabbitMQ---使用场景一般场景解耦削峰异步 2、Dokcer安装RabbitMQ2.1安装Dokcer2.2安装rabbitmq 3、RabbitMQ入门案例 - Simple 简单模式4、RabbitMQ的核心组成部分4.1 RabbitMQ整体架构4.2RabbitMQ的运行流程 5、RabbitMQ的模式5.1 发布订阅模… 文章目录 1、RabbitMQ是什么1.1、RabbitMQ---使用场景一般场景解耦削峰异步 2、Dokcer安装RabbitMQ2.1安装Dokcer2.2安装rabbitmq 3、RabbitMQ入门案例 - Simple 简单模式4、RabbitMQ的核心组成部分4.1 RabbitMQ整体架构4.2RabbitMQ的运行流程 5、RabbitMQ的模式5.1 发布订阅模式--fanout5.2路由模式-direct模式5.3路由模式-Topic模式5.4轮询模式 - Work模式5.4.1Work模式 - 轮询模式Round-Robin5.4.1Work模式 - 公平分发模式Round-Robin 1、RabbitMQ是什么
RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写支持多种客户端语言。用于在分布式系统中存储消息转发消息具有高可用高可扩性易用性等特征。
1.1、RabbitMQ—使用场景
一般场景
像一般的下订单业务如下图 将订单信息写入数据库成功后发送注册邮件再发送注册短信。以上三个任务全部完成后返回给客户端 像这样耗时就很大 所有服务操作的耗时总和而且若是这一整条执行链某个环节出了问题触发回滚得不偿失 public void makeOrder(){// 1 :保存订单 orderService.saveOrder();// 2 发送短信服务messageService.sendSMS(order);//1-2 s// 3 发送email服务emailService.sendEmail(order);//1-2 s// 4 发送APP服务appService.sendApp(order);
}那么当我们开辟一个线程池去异步处理的话也存在缺点最大的原因就是自己去实现起来因素过多实现复杂 存在问题 1耦合度高 2需要自己写线程池自己维护成本太高 3出现了消息可能会丢失需要你自己做消息补偿 4如何保证消息的可靠性你自己写 5如果服务器承载不了你需要自己去写高可用
所以MQ就诞生了 只管下单下单后直接就给用户提示下单成功别的事交给mq去派发让别的服务去mq拿消息处理
用户响应耗时 下单主要耗时50ms 别的次要处理服务全放到消息队列当中等待处理
public void makeOrder(){// 1 :保存订单 orderService.saveOrder(); rabbitTemplate.convertSend(ex,2,消息内容);
}解耦 发送方将消息发送到消息队列中接收方从队列中获取消息进行处理。这种松耦合的通信模式可以提高系统的可扩展性和灵活性。 这样使得下单服务并不受发短信、发邮件、等等服务的影响前提是下单不依赖任何一个服务的返回值
削峰 当系统面临突然的请求高峰时消息队列可以起到缓冲的作用。请求先进入消息队列排队然后逐个被处理使得系统能够逐渐消化高峰期的请求压力避免过载和故障。 也就是如果某个时刻有大量的请求此时都会到mq里面去而不会瞬间开启很多线程去异步执行从而达到销峰的效果使得即便大量的用户请求来了那系统处理请求还是非常平滑的
异步 消息队列支持异步处理即发送方发送消息后并不需要等待接收方立即处理完成而是继续执行其他任务。接收方在合适的时间从队列中获取消息进行处理。这种异步处理可以提高系统的性能和响应速度尤其适用于处理耗时的操作。 好处 1完全解耦用MQ建立桥接 2有独立的线程池和运行模型 3出现了消息可能会丢失MQ有持久化功能 4如何保证消息的可靠性死信队列和消息转移的等 5如果服务器承载不了你需要自己去写高可用HA镜像模型高可用。 按照以上约定用户的响应时间相当于是订单信息写入数据库的时间也就是50毫秒。注册邮件发送短信写入消息队列后直接返回因此写入消息队列的速度很快基本可以忽略因此用户的响应时间可能是50毫秒。因此架构改变后系统的吞吐量提高到每秒20 QPS。比串行提高了3倍比并行提高了两倍
2、Dokcer安装RabbitMQ
2.1安装Dokcer
yum 包更新到最新 yum update安装软件包yum-util提供yum-config-manager功能另外两个是devicemapper驱动依赖的 yum install -y yum-utils device-mapper-persistent-data lvm2设置yum源为阿里云 yum-config-manager --add-repohttp://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo安装docker yum install docker-ce-y安装后查看docker版本 docker-v安装加速镜像
从阿里云获取镜像加速器 https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json -EOF
{registry-mirrors: [https://spukdfwp.mirror.aliyuncs.com]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docke2.2安装rabbitmq
路径https://www.rabbitmq.com/download.html 点击上图中标红线的 community Docker image跳转到如下地址https://registry.hub.docker.com/_/rabbitmq/
当前可以看到安装镜像的时候可以设置用户名密码ip。就不用安装完进入容器内部设置 3. 官网给的安装案例
$ docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USERuser -e RABBITMQ_DEFAULT_PASSpassword rabbitmq:3-management4.命令讲解
docker run -id --hostname my-rabbit --namemyrabbit -p 15672:15672 rabbitmq:3-management--hostname指定容器主机名称
--name:指定容器名称
-p将mq端口号映射到本地
-e 设置5.修改命令创建并安装
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USERadmin -e RABBITMQ_DEFAULT_PASSadmin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:3-management6.阿里云开放上方命令 设置的端口号
-p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:18837.安装成功
[rootiZbp1av1izm1qqcdfa0nndZ ~]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
rabbitmq 3-management 6c3c2a225947 7 months ago 253MB
[rootiZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1de1f1e10cb0 rabbitmq:3-management docker-entrypoint.s… 6 minutes ago Up 6 minutes 4369/tcp, 0.0.0.0:1883-1883/tcp, :::1883-1883/tcp, 5671/tcp, 0.0.0.0:5672-5672/tcp, :::5672-5672/tcp, 15671/tcp, 0.0.0.0:15672-15672/tcp, :::15672-15672/tcp, 0.0.0.0:25672-25672/tcp, :::25672-25672/tcp, 0.0.0.0:61613-61613/tcp, :::61613-61613/tcp, 15691-15692/tcp myrabbit
[rootiZbp1av1izm1qqcdfa0nndZ ~]#8.停掉手动安装的rabbimq
systemctl stop rabbitmq-server9.启动docker的rabbitmq容器
##查看容器
[rootiZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1de1f1e10cb0 rabbitmq:3-management docker-entrypoint.s… 9 minutes ago Up 9 minutes 4369/tcp, 0.0.0.0:1883-1883/tcp, :::1883-1883/tcp, 5671/tcp, 0.0.0.0:5672-5672/tcp, :::5672-5672/tcp, 15671/tcp, 0.0.0.0:15672-15672/tcp, :::15672-15672/tcp, 0.0.0.0:25672-25672/tcp, :::25672-25672/tcp, 0.0.0.0:61613-61613/tcp, :::61613-61613/tcp, 15691-15692/tcp myrabbit
##启动容器 docker start 容器idCONTAINER ID
[rootiZbp1av1izm1qqcdfa0nndZ ~]# docker start 1de1f1e10cb0
1de1f1e10cb0
[rootiZbp1av1izm1qqcdfa0nndZ ~]#10.通过服务器虚拟机ip端口号15672访问RabbitMQ主页http://192.168.157.128:15672
默认登录账号和密码都是admin
并且在admin账号下可以通过增加用户给用户不同角色也就对应不同的操作权限 详情如下
3、RabbitMQ入门案例 - Simple 简单模式
1.实现步骤
1jdk1.8
2构建一个maven工程
3导入rabbitmq的maven依赖
4启动rabbitmq-server服务
5定义生产者
6定义消费者
7观察消息的在rabbitmq-server服务中的过程2.构建一个maven工程 3.导入rabbitmq的maven依赖
dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.10.0/version
/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-amqp/artifactIdversion2.2.5.RELEASE/version
/dependency
dependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit/artifactIdversion2.2.5.RELEASE/version
/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
4.启动rabbitmq-server服务
systemctl start rabbitmq-server
或者
docker start myrabbit5、定义生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost(192.168.157.128);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);Connection connection null;Channel channel null;try {// 3: 从连接工厂中获取连接connection connectionFactory.newConnection(生产者);// 4: 从连接中获取通道channelchannel connection.createChannel();// 5: 申明队列queue存储消息/** 如果队列不存在则会创建* Rabbitmq不允许创建两个相同的队列名称否则会报错。** params1 queue 队列的名称* params2 durable 队列是否持久化* params3 exclusive 是否排他即是否私有的如果为true,会对当前队列加锁其他的通道不能访问并且连接自动关闭* params4 autoDelete 是否自动删除当最后一个消费者断开连接之后是否自动删除消息。* params5 arguments 可以设置队列附加参数设置队列的有效期消息的最大长度队列的消息生命周期等等。* */channel.queueDeclare(queue1, false, false, true, null);// 6 准备发送消息的内容String message 你好学相伴;// 7: 发送消息给中间件rabbitmq-server// params1: 交换机exchange// params2: 队列名称/routing// params3: 属性配置// params4: 发送消息的内容channel.basicPublish(, queue1, null, message.getBytes());System.out.println(消息发送成功!);} catch (Exception ex) {ex.printStackTrace();System.out.println(发送消息出现异常...);} finally {// 7: 释放连接关闭通道if (channel ! null channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}// 8: 关闭连接if (connection ! null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}1执行发送这个时候可以在web控制台查看到这个队列queue的信息 2我们可以进行对队列的消息进行预览和测试如下 3:进行预览和获取消息进行测试 NACK 只是做消息预览不会吧消息从队列移除 ACK相当于手动的把消息处理了这个时候就会把消息从队列剔除导致消息丢失 6、定义消费者
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) {// 所有的中间件技术都是基于tcp/ip协议基础上构建新型协议规范只不过rabbitmq遵循的是amqp// ip port// 1: 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost(192.168.157.128);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);Connection connection null;Channel channel null;try {// 3: 从连接工厂中获取连接connection connectionFactory.newConnection(消费者);// 4: 从连接中获取通道channelchannel connection.createChannel();channel.basicConsume(queue1, true, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {System.out.println(收到的消息是 new String(message.getBody(), UTF-8));}}, new CancelCallback() {public void handle(String s) throws IOException {System.out.println(接收失败了。。。);}});System.out.println(开始接收消息);System.in.read();}catch (Exception e){e.printStackTrace();System.out.println(发送消息出现异常...);}finally {// 7: 释放连接关闭通道if (channel ! null channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection ! null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
消费者和生产者的区别在于消费者是从mq中取消息而生产者是从mq中存消息
4、RabbitMQ的核心组成部分 核心概念
Server又称Broker ,接受客户端的连接实现AMQP实体服务。 安装rabbitmq-serverConnection连接应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手Channel网络信道几乎所有的操作都在Channel中进行Channel是进行消息读写的通道客户端可以建立对各Channel每个Channel代表一个会话任务。Message:消息服务与应用程序之间传送的数据由Properties和body组成Properties可是对消息进行修饰比如消息的优先级延迟等高级特性Body则就是消息体的内容。Virtual Host 虚拟地址用于进行逻辑隔离最上层的消息路由一个虚拟主机理由可以有若干个Exhange和Queueu同一个虚拟主机里面不能有相同名字的ExchangeExchange交换机接受消息根据路由键发送消息到绑定的队列。(不具备消息存储的能力)BindingsExchange和Queue之间的虚拟连接binding中可以保护多个routing key.Routing key是一个路由规则虚拟机可以用它来确定如何路由一个特定消息。Queue队列也成为Message Queue,消息队列保存消息并将它们转发给消费者。
4.1 RabbitMQ整体架构 4.2RabbitMQ的运行流程 所以发送消息的时候没有设置交换机rabbitmq发送消息一定会有默认一个交换机并且消息不是直接到队列当中的而是由交换机根据路由键发送消息到绑定的队列 5、RabbitMQ的模式
5.1 发布订阅模式–fanout
特点Fanout—发布与订阅模式是一种广播机制它是没有路由key的模式。 也就是只要生产者发送一条消息经过交换机加入队列中左右的消费者都能拿到消息 这里就直接用web界面演示 新建一个fanout模式的交换机让交换机代替生产者去发消息 创建3个消息队列q1、q2、q3 将队列绑定到交换机上 由交换机代替生产者发送消息 然后三个队列都会有一个交换机发来的消息 q1队列消息正常被消费者拾取其他队列一样 q1队列消息正常被消费者拾取之后队列消息-1
ACK后 页面在自动会更新队列消息条目默认5秒5.2路由模式-direct模式
Direct模式是fanout模式上的一种叠加增加了路由RoutingKey的模式。 这样就可以给指定设置了路由key的队列发送消息并且一个队列可以有多个路由key当发送消息指定了路由key则只有设置了相对应的路由key的队列才能接收到消息 5.3路由模式-Topic模式
Topic模式是direct模式上的一种叠加增加了模糊路由RoutingKey的模式。 * 代表一级(必须有一级 # 代表0级或者多级 注意最好用代码的形式来进行绑定 在实际开发中我们既可以在RabbitMq的web界面进行交换机的创建队列的创建绑定路由key等等操作。 还可以在生产者代码里面通过channel.XXX的方式设置交换机设置队列设置路由key等等效果是一样的 例如下面代码生产者(消费者也可以声明) 代码实现【交换机和队列】的声明和绑定
package com.xxx.rabbitmq.all;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** RabbitMQ入门案例 - 完整的声明方式创建* 代码实现创建交换机和队列并绑定关系* 生产者*/
public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost(121.196.153.197);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);Connection connection null;Channel channel null;try {// 3: 从连接工厂中获取连接connection connectionFactory.newConnection(生产者);// 4: 从连接中获取通道channelchannel connection.createChannel();// 5 准备发送消息的内容String message 你好交换机;// 6准备交换机。取名规范 : 类型_业务模块_交换机String exchangeName direct_order_exchange;// 7: 定义路由keyString routeKeyOrder order;String routeKeyCourse course;// 8: 指定交换机的类型String exchangeType direct;// 9: 声明交换机/注册交换机// params1: 交换机名称// params2: 交换机类型// params3: 是否持久化 所谓的持久化就是值交换机不会随着服务器的重启造成丢失如果是true代表不丢失false重启丢失channel.exchangeDeclare(exchangeName, exchangeType, true);// 10: 声明队列/注册队列// params1: 队列名称// params2: 是否持久化// params3: 是不是排他性是否是独占独立// params4: 是不是自动删除 随着最后一个消费者消息完毕消息以后是否把队列自动删除// params5: 是不是有参数 参数携带可能会引发headers模式channel.queueDeclare(queue5, true, false, false, null);channel.queueDeclare(queue6, true, false, false, null);channel.queueDeclare(queue7, true, false, false, null);// 11: 绑定队列// params1: 队列名称// params2: 交换机名称// params3: routeKeychannel.queueBind(queue5, exchangeName, routeKeyOrder);channel.queueBind(queue6, exchangeName, routeKeyOrder);channel.queueBind(queue7, exchangeName, routeKeyCourse);// 12: 发送消息给中间件rabbitmq-server// params1: 交换机exchange// params2: 队列名称// params3: 属性配置// params4: 发送消息的内容channel.basicPublish(exchangeName, routeKeyOrder, null, message.getBytes());System.out.println(消息发送成功!);} catch (Exception ex) {ex.printStackTrace();System.out.println(发送消息出现异常...);} finally {// 13: 释放连接关闭通道if (channel ! null channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection ! null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
消费者去队列拿消息
package com.xxx.rabbitmq.all;import com.rabbitmq.client.*;import java.io.IOException;/*** RabbitMQ入门案例 - 完整的声明方式创建* 消费者*/
public class Consumer {private static Runnable runnable new Runnable() {public void run() {// 1: 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost(121.196.153.197);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);//获取队列的名称final String queueName Thread.currentThread().getName();Connection connection null;Channel channel null;try {// 3: 从连接工厂中获取连接connection connectionFactory.newConnection();// 4: 从连接中获取通道channelchannel connection.createChannel();// 5: 申明队列queue存储消息/** 如果队列不存在则会创建* Rabbitmq不允许创建两个相同的队列名称否则会报错。** params1 queue 队列的名称* params2 durable 队列是否持久化* params3 exclusive 是否排他即是否私有的如果为true,会对当前队列加锁其他的通道不能访问并且连接自动关闭* params4 autoDelete 是否自动删除当最后一个消费者断开连接之后是否自动删除消息。* params5 arguments 可以设置队列附加参数设置队列的有效期消息的最大长度队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了可以不需要定义//channel.queueDeclare(queue1, false, false, false, null);// 6 定义接受消息的回调Channel finalChannel channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName 收到消息是 new String(delivery.getBody(), UTF-8));}}, new CancelCallback() {Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName 开始接受消息);System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println(发送消息出现异常...);} finally {// 7: 释放连接关闭通道if (channel ! null channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection ! null connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}};public static void main(String[] args) {// 启动三个线程去执行new Thread(runnable, queue5).start();new Thread(runnable, queue6).start();new Thread(runnable, queue7).start();}}
5.4轮询模式 - Work模式
5.4.1Work模式 - 轮询模式Round-Robin
轮询模式的分发一个消费者一条按均分配 生产者
package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*** author: 学相伴-飞哥* description: Producer 简单队列生产者* Date : 2021/3/2*/
public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost(192.168.157.128);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);Connection connection null;Channel channel null;try {// 3: 从连接工厂中获取连接connection connectionFactory.newConnection(生产者);// 4: 从连接中获取通道channelchannel connection.createChannel();// 6 准备发送消息的内容//end topic模式for (int i 1; i 20; i) {//消息的内容String msg 学相伴: i;// 7: 发送消息给中间件rabbitmq-server// params1: 交换机exchange// params2: 队列名称/routingkey// params3: 属性配置// params4: 发送消息的内容channel.basicPublish(, queue1, null, msg.getBytes());}System.out.println(消息发送成功!);} catch (Exception ex) {ex.printStackTrace();System.out.println(发送消息出现异常...);} finally {// 7: 释放连接关闭通道if (channel ! null channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection ! null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
消费者work1
package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/*** author: 学相伴-飞哥* description: Consumer* Date : 2021/3/2*/
public class Work1 {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost(192.168.157.128);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);Connection connection null;Channel channel null;try {// 3: 从连接工厂中获取连接connection connectionFactory.newConnection(消费者-Work1);// 4: 从连接中获取通道channelchannel connection.createChannel();// 5: 申明队列queue存储消息/** 如果队列不存在则会创建* Rabbitmq不允许创建两个相同的队列名称否则会报错。** params1 queue 队列的名称* params2 durable 队列是否持久化* params3 exclusive 是否排他即是否私有的如果为true,会对当前队列加锁其他的通道不能访问并且连接自动关闭* params4 autoDelete 是否自动删除当最后一个消费者断开连接之后是否自动删除消息。* params5 arguments 可以设置队列附加参数设置队列的有效期消息的最大长度队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了可以不需要定义
// channel.queueDeclare(queue1, false, false, false, null);// 同一时刻服务器只会推送一条消息给消费者// 6 定义接受消息的回调Channel finalChannel channel;finalChannel.basicQos(1);finalChannel.basicConsume(queue1, true, new DeliverCallback() {Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println(Work1-收到消息是 new String(delivery.getBody(), UTF-8));Thread.sleep(2000);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {Overridepublic void handle(String s) throws IOException {}});System.out.println(Work1-开始接受消息);System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println(发送消息出现异常...);} finally {// 7: 释放连接关闭通道if (channel ! null channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection ! null connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
work2
package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/*** author: 学相伴-飞哥* description: Consumer* Date : 2021/3/2*/
public class Work2 {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost(192.168.157.128);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);Connection connection null;Channel channel null;try {// 3: 从连接工厂中获取连接connection connectionFactory.newConnection(消费者-Work2);// 4: 从连接中获取通道channelchannel connection.createChannel();// 5: 申明队列queue存储消息/** 如果队列不存在则会创建* Rabbitmq不允许创建两个相同的队列名称否则会报错。** params1 queue 队列的名称* params2 durable 队列是否持久化* params3 exclusive 是否排他即是否私有的如果为true,会对当前队列加锁其他的通道不能访问并且连接自动关闭* params4 autoDelete 是否自动删除当最后一个消费者断开连接之后是否自动删除消息。* params5 arguments 可以设置队列附加参数设置队列的有效期消息的最大长度队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了可以不需要定义//channel.queueDeclare(queue1, false, true, false, null);// 同一时刻服务器只会推送一条消息给消费者//channel.basicQos(1);// 6 定义接受消息的回调Channel finalChannel channel;finalChannel.basicQos(1);finalChannel.basicConsume(queue1, true, new DeliverCallback() {Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println(Work2-收到消息是 new String(delivery.getBody(), UTF-8));Thread.sleep(200);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {Overridepublic void handle(String s) throws IOException {}});System.out.println(Work2-开始接受消息);System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println(发送消息出现异常...);} finally {// 7: 释放连接关闭通道if (channel ! null channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection ! null connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
} work1和work2的消息处理能力不同但是最后处理的消息条数相同是“按均分配”。 往队列发送20条消息
结果就是轮询给消费者拿消息即便有的消费者消费很快也只能按照新顺序拿消息也只能按照轮询一个一个拿 也就是说不会因为某个消费者所在的服务器满而导致少消费一定是公平消费 5.4.1Work模式 - 公平分发模式Round-Robin
根据消费者的消费能力进行公平分发处理快的处理的多处理慢的处理的少按劳分配
相比较轮询模式公平分发的不同在于修改应答方式为手动 qos设置为1就代表消费者拿到cpu的执行权就每次从只拿走一条消息一条一条的拿。若不设置就是默认轮询拿一条 所以根据队列堆积的消息条数以及内存和磁盘空间来合理设置qos 这个时候性能好的消费者就会消费得多而性能差的消费者就消费得少能者多劳
更新中------ 参考来自狂神