网站单个页面紧张搜索引擎蜘蛛,oa手机端,专做商品折扣的网站,天津门头设计制作价格引言
根据前面的知识#xff08;深入了解RabbitMQ工作原理及简单使用、Rabbit的几种工作模式介绍与实践#xff09;我们知道#xff0c;如果要保证消息的可靠性#xff0c;需要对消息进行持久化处理#xff0c;然而消息持久化除了需要代码的设置之外#xff0c;还有一个…引言
根据前面的知识深入了解RabbitMQ工作原理及简单使用、Rabbit的几种工作模式介绍与实践我们知道如果要保证消息的可靠性需要对消息进行持久化处理然而消息持久化除了需要代码的设置之外还有一个重要步骤是至关重要的那就是保证你的消息顺利进入Broker代理服务器如图所示 正常情况下如果消息经过交换器进入队列就可以完成消息的持久化但如果消息在没有到达broker之前出现意外那就造成消息丢失有没有办法可以解决这个问题
RabbitMQ有两种方式来解决这个问题
通过AMQP提供的事务机制实现使用发送者确认模式实现
一、事务使用
事务的实现主要是对信道Channel的设置主要的方法有三个
channel.txSelect()声明启动事务模式channel.txComment()提交事务channel.txRollback()回滚事务
从上面的可以看出事务都是以tx开头的tx应该是transaction extend事务扩展模块的缩写如果有准确的解释欢迎在博客下留言。
我们来看具体的代码实现
// 创建连接
ConnectionFactory factory new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn factory.newConnection();
// 创建信道
Channel channel conn.createChannel();
// 声明队列
channel.queueDeclare(_queueName, true, false, false, null);
String message String.format(时间 %s, new Date().getTime());
try {channel.txSelect(); // 声明事务// 发送消息channel.basicPublish(, _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(UTF-8));channel.txCommit(); // 提交事务
} catch (Exception e) {channel.txRollback();
} finally {channel.close();conn.close();
}
注意用户需把config.xx配置成自己Rabbit的信息。
从上面的代码我们可以看出在发送消息之前的代码和之前介绍的都是一样的只是在发送消息之前需要声明channel为事务模式提交或者回滚事务即可。
了解了事务的实现之后那么事务究竟是怎么执行的让我们来使用wireshark抓个包看看如图所示 输入ip.addrrabbitip amqp查看客户端和rabbit之间的通讯可以看到交互流程
客户端发送给服务器Tx.Select(开启事务模式)服务器端返回Tx.Select-Ok开启事务模式ok推送消息客户端发送给事务提交Tx.Commit服务器端返回Tx.Commit-Ok
以上就完成了事务的交互流程如果其中任意一个环节出现问题就会抛出IoException移除这样用户就可以拦截异常进行事务回滚或决定要不要重复消息。
那么既然已经有事务了没什么还要使用发送方确认模式呢原因是因为事务的性能是非常差的。事务性能测试
事务模式结果如下
事务模式发送1w条数据执行花费时间14197s事务模式发送1w条数据执行花费时间13597s事务模式发送1w条数据执行花费时间14216s
非事务模式结果如下
非事务模式发送1w条数据执行花费时间101s非事务模式发送1w条数据执行花费时间77s非事务模式发送1w条数据执行花费时间106s
从上面可以看出非事务模式的性能是事务模式的性能高149倍我的电脑测试是这样的结果不同的电脑配置略有差异但结论是一样的事务模式的性能要差很多那有没有既能保证消息的可靠性又能兼顾性能的解决方案呢那就是接下来要讲的Confirm发送方确认模式。
扩展知识
我们知道消费者可以使用消息自动或手动发送来确认消费消息那如果我们在消费者模式中使用事务当然如果使用了手动确认消息完全用不到事务的会发生什么呢
消费者模式使用事务
假设消费者模式中使用了事务并且在消息确认之后进行了事务回滚那么RabbitMQ会产生什么样的变化
结果分为两种情况
autoAckfalse手动应对的时候是支持事务的也就是说即使你已经手动确认了消息已经收到了但在确认消息会等事务的返回解决之后在做决定是确认消息还是重新放回队列如果你手动确认现在之后又回滚了事务那么已事务回滚为主此条消息会重新放回队列autoAcktrue如果自定确认为true的情况是不支持事务的也就是说你即使在收到消息之后在回滚事务也是于事无补的队列已经把消息移除了
二、Confirm发送方确认模式
Confirm发送方确认模式使用和事务类似也是通过设置Channel进行发送方确认的。
Confirm的三种实现方式
方式一channel.waitForConfirms()普通发送方确认模式
方式二channel.waitForConfirmsOrDie()批量确认模式
方式三channel.addConfirmListener()异步监听发送方确认模式
方式一普通Confirm模式
// 创建连接
ConnectionFactory factory new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn factory.newConnection();
// 创建信道
Channel channel conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
String message String.format(时间 %s, new Date().getTime());
channel.basicPublish(, config.QueueName, null, message.getBytes(UTF-8));
if (channel.waitForConfirms()) {System.out.println(消息发送成功 );
}
看代码可以知道我们只需要在推送消息之前channel.confirmSelect()声明开启发送方确认模式再使用channel.waitForConfirms()等待消息被服务器确认即可。
方式二批量Confirm模式
// 创建连接
ConnectionFactory factory new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn factory.newConnection();
// 创建信道
Channel channel conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i 0; i 10; i) {String message String.format(时间 %s, new Date().getTime());channel.basicPublish(, config.QueueName, null, message.getBytes(UTF-8));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布只要有一个未确认就会IOException
System.out.println(全部执行完成);
以上代码可以看出来channel.waitForConfirmsOrDie()使用同步方式等所有的消息发送之后才会执行后面代码只要有一个消息未被确认就会抛出IOException异常。
方式三异步Confirm模式
// 创建连接
ConnectionFactory factory new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn factory.newConnection();
// 创建信道
Channel channel conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i 0; i 10; i) {String message String.format(时间 %s, new Date().getTime());channel.basicPublish(, config.QueueName, null, message.getBytes(UTF-8));
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println(未确认消息标识 deliveryTag);}Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format(已确认消息标识%d多个消息%b, deliveryTag, multiple));}
});
异步模式的优点就是执行效率高不需要等待消息执行完只需要监听消息即可以上异步返回的信息如下 可以看出代码是异步执行的消息确认有可能是批量确认的是否批量确认在于返回的multiple的参数此参数为bool值如果true表示批量执行了deliveryTag这个值以前的所有消息如果为false的话表示单条确认。
Confirm性能测试
测试前提与事务一样我们发送1w条消息。
方式一Confirm普通模式
执行花费时间2253s执行花费时间2018s执行花费时间2043s
方式二Confirm批量模式
执行花费时间1576s执行花费时间1400s执行花费时间1374s
方式三Confirm异步监听方式
执行花费时间1498s执行花费时间1368s执行花费时间1363s
总结
综合总体测试情况来看Confirm批量确定和Confirm异步模式性能相差不大Confirm模式要比事务快10倍左右。
长按二维码关注我的技术公众号