当前位置: 首页 > news >正文

果乐宝的网站建设昆山营销型网站建设方法

果乐宝的网站建设,昆山营销型网站建设方法,网站页头,怎么用网站的二级目录做排名前言生产者发送消息到了队列#xff0c;队列推送数据给了消费者#xff0c;这里存在一些问题需要思考下生产者如何确保消息一定投递到了队列中RabbitMQ 丢失了消息(下文暂不涉及这块)队列如何确保消费者收到了消息呢生产者可靠发送执行流程当生产者将消息发送出去后#xff…前言生产者发送消息到了队列队列推送数据给了消费者这里存在一些问题需要思考下生产者如何确保消息一定投递到了队列中RabbitMQ 丢失了消息(下文暂不涉及这块)队列如何确保消费者收到了消息呢生产者可靠发送执行流程当生产者将消息发送出去后如果不进行特殊设置默认情况下发送消息操作后是没有返回任何消息给生产者的这时生产者是不知道消息有没有真的到达了RabbitMQ消息可能在到达RabbitMQ前已经丢了。对于这种情况有两种解决方案来应对事务机制该机制是AMQP协议层面提供的解决方案发送方确认机制这是RabbitMQ提供的解决方案事务机制可以将当前使用的channel设置成事务模式。借助如下与事务机制相关的三个方法来完成事务操作。channel.TxSelect() 启动事务模式channel.TxCommit() 提交事务channel.TxRollback() 回滚事务TXCommit事务模式下时序图中发布消息前后增加了一点动作设置信道为事务模式与提交事务。如果TXCommit()调用成功则说明事务提交成功消息一定到达了RabbitMQ中。TXRollback当事务提交执行之前由于RabbitMQ异常崩溃或其他原因抛出异常则可以捕获异常再执行TXRollback()完成事务回滚。生产者代码在原有Basic代码基础上增加事务操作此处先注释掉与事务有关的代码(16、34和40行)并且在发送消息后设置异常模拟var connFactory new ConnectionFactory {HostName xxx.xxx.xxx.xxx,Port 5672,UserName rabbitmqdemo,Password rabbitmqdemotest,VirtualHost rabbitmqdemo}; using (var conn connFactory.CreateConnection()) {using (var channel conn.CreateModel()){var queueName helloworld_tx;channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);//channel.TxSelect();        while (true){Console.WriteLine(消息内容(exit退出):);var message Console.ReadLine();if (message.Trim().ToLower() exit){break;}try{var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: , routingKey: queueName, basicProperties: null, body: body);if (message.Trim().ToLower() 9527){throw new Exception(触发异常);}//channel.TxCommit();                Console.WriteLine(消息内容发送完毕: message);}catch (Exception ex){Console.WriteLine(消息发送出现异常: ex.Message);//channel.TxRollback();            }}} }消费端代码保持不变(仅改动下队列名称)因生产者已经将消息发送到了队列中后触发了异常实则已经和消息能不能到达队列无关了在消费端也就正常消费了。取消16、34和40行的注释后在事务模式下当再次触发异常后能够看见消费端并没有收到消息(这不是说消费端那边出现异常了而是消息没有到达队列中)。发送消息与其他操作在同一事务下提供了一致性。局限性事务模式下只有收到了Broker的TX.Commit-OK才算提交成功否则便可以捕获异常执行事务回滚所以能够解决生产者和RabbitMQ之间消息确认的问题。但是使用事务机制下有一个缺点在一条消息发送之后会使得发送方阻塞以等待RabbitMQ的回应之后才能发送下一条消息严重降低RabbitMQ的性能与吞吐量一般不使用。确认机制生产者将channel设置为确认模式(confirm)在该信道上发送的消息都会分配一个唯一Id(从1开始)当消息进入到队列中后RabbitMQ会发送一个ack(Acknowledgement)命令(Basic.Ack)给生产者这样确保生产者知道这条消息是真的到了。相比于事务机制下的阻塞发送方确认机制下是异步的发布一条消息后可以等待信道确认的同时发送下一条消息当信道收到确认时生产者可以通过回调方法来处理该消息。当RabbitMQ自身内部错误导致消息丢失会发送nack(Negative Acknowledgement)命令(Basic.Nack)生产者可以在回调方法中处理该命令诸如重试记录等操作所有消息在确认模式下都会被ack或是nack一次。核心方法channel.ConfirmSelect 设置信道为确认模式(Confirm)收到Confirm.Select-OK命令表示同意将当前信道设置为Confirm模式。channel.WaitForConfirms 信道等待Broker回执确认信息三种模式生产者实现confirm有三种方式取决于confirm的时机同步确认模式单条确认模式发送完消息后等待确认批量确认模式发送完一批消息后等待确认异步确认模式发送完消息后提供一个回调方法Broker回执了一条或多条后由生产者回调执行异步等待确认单条确认模式生产者通过调用channel.ConfirmSelect()将信道设置为确认模式(Confirm)。发送消息到RabbitMQ当消息进入到队列后发送Basic.Ack给生产者信道等待获取确认信息执行回调逻辑。var connFactory new ConnectionFactory {HostName xxx.xxx.xxx.xxx,Port 5672,UserName rabbitmqdemo,Password rabbitmqdemotest,VirtualHost rabbitmqdemo}; using (var conn connFactory.CreateConnection()) {using (var channel conn.CreateModel()){var queueName helloworld_producerack_singleconfirm;channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.ConfirmSelect(); while (true){Console.WriteLine(消息内容(exit退出):);var message Console.ReadLine();if (message.Trim().ToLower() exit){break;}var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: , routingKey: queueName, basicProperties: null, body: body);Console.WriteLine(消息内容发送完毕: message);Console.WriteLine(channel.WaitForConfirms(new TimeSpan(0, 0, 1)) ? 消息发送成功 : 消息发送失败);}} }消息发送完毕同步等待获取到Broker的回执确认消息这样来判断是否ack或是nack了如此可确定生产者消息是否真的到达了Broker。批量确认模式信道开启confirm模式与单条发送后确认不同批量发送多条之后再调用waitForConfirms确认这样发送多条之后才会等待一次确认消息极大提升confirm效率但问题在于出现返回basic.nack或者超时的情况时生产者需要将这批消息全部重发则会带来明显的重复消息数量并且当消息经常丢失时批量confirm性能应该是不升反降的。var connFactory new ConnectionFactory {HostName xxx.xxx.xxx.xxx,Port 5672,UserName rabbitmqdemo,Password rabbitmqdemotest,VirtualHost rabbitmqdemo}; using (var conn connFactory.CreateConnection()) {using (var channel conn.CreateModel()){var queueName helloworld_producerack_batchconfirm;channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.ConfirmSelect();while (true){for (int i 0; i 3; i){Console.WriteLine(消息内容(exit退出):);var message Console.ReadLine();if (message.Trim().ToLower() exit){break;}var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: , routingKey: queueName, basicProperties: null, body: body);Console.WriteLine(消息内容发送完毕: message);}Console.WriteLine(channel.WaitForConfirms() ? 所有消息内容发送完毕 : 存在消息发送失败);}} }此处对于批量我设置三个为一组三个执行完毕后获得Broker的回执确认消息。异步确认模式异步模式下额外增加回调方法来处理Channel对象提供的BasicAcksBasicNacks两个回调事件//  // Summary:  //     Signalled when a Basic.Nack command arrives from the broker.  event EventHandlerBasicNackEventArgs BasicNacks;//  // Summary:  //     Signalled when a Basic.Ack command arrives from the broker.  event EventHandlerBasicAckEventArgs BasicAcks;BasicAckEventArg和BasicNackEventArgs都包含deliveryTag当前Chanel在发送消息时给的消息序号。var connFactory new ConnectionFactory {HostName xxx.xxx.xxx.xxx,Port 5672,UserName rabbitmqdemo,Password rabbitmqdemotest,VirtualHost rabbitmqdemo}; using (var conn connFactory.CreateConnection()) {using (var channel conn.CreateModel()){var queueName helloworld_producerack_asyncconfirm;channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.ConfirmSelect();channel.BasicAcks new EventHandlerBasicAckEventArgs((o, basic) {Console.WriteLine($调用ack回调方法: DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple });});channel.BasicNacks new EventHandlerBasicNackEventArgs((o, basic) {Console.WriteLine($调用Nacks回调方法; DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple });});while (true){Console.WriteLine(消息内容(exit退出):);var message Console.ReadLine();if (message.Trim().ToLower() exit){break;}var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: , routingKey: queueName, basicProperties: null, body: body);Console.WriteLine(消息内容发送完毕: message);}} }生产者发送消息生产者收到回执确认消息调用回调方法在此基础上我们可以为channel维护一个unconfirm的消息序号集合用来控制nack回调下的失败重试。具体实现为当发布一条消息时从channel获取当前的唯一的Id存入集合中。当回调一次BasicAcks事件方法时unconfirm集合删掉相应的一条或多条记录。// ...channel.ConfirmSelect();var _pendingDeliveryTags new LinkedListulong(); channel.BasicAcks new EventHandlerBasicAckEventArgs((o, basic) {Console.WriteLine($调用ack回调方法: DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple });if (_pendingDeliveryTags.Count 0){return;}if (!basic.Multiple){_pendingDeliveryTags.Remove(basic.DeliveryTag);return;}while (_pendingDeliveryTags.First.Value basic.DeliveryTag){_pendingDeliveryTags.RemoveFirst();}if (_pendingDeliveryTags.First.Value basic.DeliveryTag){_pendingDeliveryTags.RemoveFirst();} }); channel.BasicNacks new EventHandlerBasicNackEventArgs((o, basic) {Console.WriteLine($调用Nacks回调方法; DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple }); if (_pendingDeliveryTags.Count 0){ return;} if (!basic.Multiple){_pendingDeliveryTags.Remove(basic.DeliveryTag); return;} while (_pendingDeliveryTags.First.Value basic.DeliveryTag){_pendingDeliveryTags.RemoveFirst();} if (_pendingDeliveryTags.First.Value basic.DeliveryTag){_pendingDeliveryTags.RemoveFirst();}//Todo 执行消息重发});for (int i 0; i 5000; i) {var message 第 (i 1) 条消息;var body Encoding.UTF8.GetBytes(message);var nextSeqNo channel.NextPublishSeqNo;channel.BasicPublish(exchange: , routingKey: queueName, basicProperties: null, body: body);_pendingDeliveryTags.AddLast(nextSeqNo);Console.WriteLine(消息内容发送完毕: message); }异步模式下当生产者持续向RabbitMQ发送消息可能生产者端并不会收到100个ack消息 也许收到1个或者2个或N个ack消息这些ack消息的multiple都为trueack消息中的deliveryTag的值的含义也不是一样的当multiple为truedeliveryTag代表的值是一批的id像如下图中的4905则代表是4904和4905两个唯一Id值。而当multiple为false时则deliveryTag就仅代表一个唯一的Id值。从性能方面比对这几种方式批量和异步确认更胜一筹。可从代码复杂方面来讲事务机制或是单条确认模式都简单异步确认模式反而是更复杂。而当遇到批量消息需要重发场景时批量确认模式却又会存在性能不升反降。因此也无绝对的优劣适合的才是最好的。消费者可靠接收生产者将消息可靠传递到了RabbitMQRabbitMQ将消息推送给消费者消费者端如何确保消息可靠接收了呢? RabbitMQ提供了消息确认机制消费者在订阅队列时可以指定autoAck参数。AutoAck参数当autoAck为false时RabbitMQ会等待消费者显示的回复确认信号后再从内存(或磁盘)中移除消息(先标记再删除)。当autoAck为true时RabbitMQ会把消息发送出去的消息自动确认再从内存(或磁盘)中删除消费者是否接收到了消息不管。RabbitMQ的Web管理平台上可以看到当前队列是否AutoAck。当AutoAck设置为false时面板中展示Ack required为勾选。同时队列中的消息分成了两部分一部分是等待投递给消费者的消息数 Ready一部分是已经投递给消费者但是未收到消费者确认信号的消息数 Unacked。如果 RabbitMQ 服务器端一直没有收到消费者的确认信号并且消费此消息的消费者已经断开连接则服务器端会安排该消息重新进入队列等待投递给下一个消费者也可能还是原来的那个消费者。采用消息确认机制后只要设置 autoAck 参数为 false消费者就有足够的时间处理消息任务不用担心处理消息过程中消费者进程挂掉后消息丢失的问题因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。RabbitMQ 不会为未确认的消息设置过期时间判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久核心方法BasicAck 用于确认当前消BasicNack 用于拒绝当前消息可以执行批量拒绝BasicReject 用于拒绝当前消息仅拒绝一条消息手动确认生产者端将100条消息存入队列中可以通过Web面板看到已有Total/Ready都是100条。消费者端设置消费能力Qos为5同时channel.BasicConsume 设置autoAck为falsevar connFactory new ConnectionFactory {HostName xxx.xxx.xxx.xxx,Port 5672,UserName rabbitmqdemo,Password rabbitmqdemotest,VirtualHost rabbitmqdemo}; using (var conn connFactory.CreateConnection()) {using (var channel conn.CreateModel()){var queueName helloworld_consumerack;channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer new EventingBasicConsumer(channel);consumer.Received (model, ea) {Thread.Sleep(10000);var message ea.Body;Console.WriteLine(接收到信息为: Encoding.UTF8.GetString(message.ToArray()));((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);Console.ReadKey();} }运行代码在web面板中可以开始看到待确认数为5即RabbitMQ按照消费者最大负载能力推送消息同时等待消费者手动确认。这样能够可靠的确定消费者着实接收到了消息。当消费者因处理业务发生异常下或是因网络不稳定、服务器异常等消费者没有反馈Ack这样消息一直会处在队列中这样就确保了有其他消费者(也许还是自身)有机会执行重试确保业务正常仅仅当RabbitMQ收到消费者的Ack反馈后才会删除。自动确认RabbitMQ消息确认机制ACK默认是自动确认的自动确认会在消息发送给消费者后立即确认但存在丢失消息的可能。生产者发送100条消息到RabbitMQ中等待消费。消费者端设置自动确认Ack接收消息中设置了当读取到第50条消息时抛出异常。var consumer new EventingBasicConsumer(channel); consumer.Received (model, ea) {Thread.Sleep(1000);var message ea.Body;Console.WriteLine(接收到信息为: Encoding.UTF8.GetString(message.ToArray()));if (Encoding.UTF8.GetString(message.ToArray()).Contains(50)){Console.WriteLine(异常);throw new Exception(模拟异常);}((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false); };channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);当运行程序消息全部会推送到消费者中(basicQos在AutoAck下无效)出现异常时RabbitMQ中也不会留有消息这样一来消息就丢失了。这种不安全的模式使用场景受限消息易丢失。同时由于自动确认下设置Qos无效当消息数量过多时RabbitMQ发送到消费者端的消息太快数量太多又可能造成消费者端消息挤压而耗尽内存导致进程终止。2022-08-23,望技术有成后能回来看见自己的脚步
http://www.huolong8.cn/news/196136/

相关文章:

  • 一个人可以建设几个网站自己做网站赚钱案例
  • 杭州正规的网站建设公司昆山专业网站建设公司哪家好
  • dnf制裁做任务网站中关村网站建设
  • 什么是门户网站phpstudy建wordpress
  • 如何查询网站已经提交备案百度网盘下载安装
  • 网站登不了企业建设网银网站如何被手机端收录
  • 龙岗区网站建设哪个公司好长安营销服务协同管理平台网站
  • 运城做网站方式方法宁波关键词优化时间
  • 广州网站建设求职简历找专业公司做网站
  • 上海城市建设档案馆网站wordpress 评论者链接
  • 网站建设项目化教程制作网站的原因
  • 查询网站是否安全做自己的免费网站
  • 建设网站只能是公司网站内容智能
  • 网站logo用什么来做山东最新消息
  • 美橙建站十四年趣图在线生成网站源码
  • 开发中英文切换网站如何做蚌埠百度推广电话
  • seo外链招聘.net网站做优化
  • 创建网站的公司wordpress 4.8.1漏洞
  • 建筑网站知名度wordpress 删除缓存
  • 网站开发什么课程垂直类网站怎么做
  • 电子商务网站建设过程范文wordpress文章字段
  • 别人做的网站打不开网络推广竞价是什么
  • 凡科建站网站怎么保存发给别人医院网站建设 中标
  • 精湛的网站建设排行榜wordpress企业主
  • 网站设计公司合肥c 网站开发的优点
  • 企业网页与网站区别cms模板
  • 福州综合网站建设湖北住房和城乡建设厅官方网站
  • 自定义表单网站网站备案承若怎么写
  • 内网电脑做网站服务器免费建站网站建设
  • 优化 导航网站怎么做虚拟币网站