当前位置: 首页 > news >正文

长沙市住房与城乡建设厅网站wordpress投票

长沙市住房与城乡建设厅网站,wordpress投票,注册网站发财的富豪,苏州网站建设选苏州梦易行前言最近需要使用到消息队列相关技术#xff0c;于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题#xff0c;归纳了一下#xff0c;大概有以下几种#xff1a;1. 临时异常#xff0c;如数据库网络闪断、http请求临时失效等#xff1b;2. 时序异常#xff0c;如A任… 前言  最近需要使用到消息队列相关技术于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题归纳了一下大概有以下几种  1. 临时异常如数据库网络闪断、http请求临时失效等  2. 时序异常如A任务依赖于B任务但可能由于调度或消费者分配的原因导致A任务先于B任务执行  3. 业务异常由于系统测试不充分上线后发现某几个或某几种消息无法正常处理  4. 系统异常业务中间件无法正常操作如网络中断、数据库宕机等  5. 非法异常一些伪造、攻击类型的消息。   针对这些异常我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。 方案    1. 消息均使用Exchange进行通讯方式可以是direct或topic不建议fanout。  2. 根据业务在Exchange下分配一个或多个Queue同时设置一个审计线程(Audit)监听所有Queue用于记录消息到MongoDB同时又不阻塞正常业务处理。  3. 生产者(Publisher)在发布消息时基于AMQP协议生成消息标识MessageId和时间戳Timestamp根据消息业务添加头信息Headers便于跟踪。    4. 消费者(Comsumer)消息处理失败时则把消息发送到重试交换机(Retry Exchange)并设置过期重试时间及更新重试次数如果超过重试次数则删除消息。  5. 重试交换机Exchange设置死信交换机(Dead Letter Exchange)消息过期后自动转发到业务交换机(Exchange)。  6. WebApi可以根据消息标识MessageId、时间戳Timestamp以及头信息Headers在MongoDB中对消息进行检索或重试。     注选择MongoDB作为存储介质的主要原因是其对头信息headers的动态查询支持较好同等的替代产品还可以是Elastic Search这些。 生产者(Publisher)  1. 设置断线自动恢复  var factory new ConnectionFactory{Uri new Uri(amqp://guest:guest192.168.132.137:5672),AutomaticRecoveryEnabled true  };   2. 定义Exchange模式为direct  channel.ExchangeDeclare(Exchange, direct);   3. 根据业务定义QueueA和QueueB  channel.QueueDeclare(QueueA, true, false, false);channel.QueueBind(QueueA, Exchange, RouteA);channel.QueueDeclare(QueueB, true, false, false);channel.QueueBind(QueueB, Exchange, RouteB);   4. 启动消息发送确认机制即需要收到RabbitMQ服务端的确认消息  channel.ConfirmSelect();   5. 设置消息持久化  var properties channel.CreateBasicProperties();properties.Persistent true;   6. 生成消息标识MessageId、时间戳Timestamp以及头信息Headers  properties.MessageId Guid.NewGuid().ToString(N);properties.Timestamp new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());properties.Headers new Dictionarystring, object  {{ key, value i}};   7. 发送消息偶数序列发送到QueueARouteA奇数序列发送到QueueBRouteB  channel.BasicPublish(Exchange, i % 2 0 ? RouteA : RouteB, properties, body);   8. 确定收到RabbitMQ服务端的确认消息  var isOk channel.WaitForConfirms();if (!isOk){throw new Exception(The message is not reached to the server!);}   完整代码效果QueueA和QueueB各一条消息QueueAudit两条消息    注Exchange下必须先声明Queue才能接收到消息上述代码并没有QueueAudit的声明需要手动声明或者先执行下面的消费者程序进行声明。 正常消费者(ComsumerA)  1. 设置预取消息避免公平轮训问题可以根据需要设置预取消息数这里是1  _channel.BasicQos(0, 1, false);     2. 声明Exchange和Queue  _channel.ExchangeDeclare(Exchange, direct);_channel.QueueDeclare(QueueA, true, false, false);_channel.QueueBind(QueueA, Exchange, RouteA);   3. 编写回调函数 注设置了RabbitMQ的断线恢复机制当RabbitMQ连接不可用时与MQ通讯的操作会抛出AlreadyClosedException的异常导致主线程退出哪怕连接恢复了程序也无法恢复因此需要捕获处理该异常。 异常消费者(ComsumerB)  1. 设置预取消息  _channel.BasicQos(0, 1, false);   2. 声明Exchange和Queue  _channel.ExchangeDeclare(Exchange, direct);_channel.QueueDeclare(QueueB, true, false, false);_channel.QueueBind(QueueB, Exchange, RouteB);   3.  设置死信交换机(Dead Letter Exchange)  var retryDic new Dictionarystring, object  {      {x-dead-letter-exchange, Exchange},{x-dead-letter-routing-key, RouteB}};_channel.ExchangeDeclare(Exchange_Retry, direct);_channel.QueueDeclare(QueueB_Retry, true, false, false, retryDic);_channel.QueueBind(QueueB_Retry, Exchange_Retry, RouteB_Retry);  4. 重试设置3次重试第一次1秒第二次10秒第三次30秒  _retryTime new Listint  {1 * 1000,10 * 1000,30 * 1000  };   5. 获取当前重试次数  var retryCount 0;if (ea.BasicProperties.Headers ! null ea.BasicProperties.Headers.ContainsKey(retryCount)){retryCount (int)ea.BasicProperties.Headers[retryCount];_logger.LogWarning($[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {retryCount} retry started...);}   6. 发生异常判断是否可以重试  private bool CanRetry(int retryCount){return retryCount _retryTime.Count - 1;}   7. 可以重试则启动重试机制审计消费者(Audit Comsumer)  1. 声明Exchange和Queue  _channel.ExchangeDeclare(Exchange, direct);_channel.QueueDeclare(QueueAudit, true, false, false);_channel.QueueBind(QueueAudit, Exchange, RouteA);_channel.QueueBind(QueueAudit, Exchange, RouteB);   2. 排除死信Exchange转发过来的重复消息  if (ea.BasicProperties.Headers null || !ea.BasicProperties.Headers.ContainsKey(x-death)){...}   3. 生成消息实体  var message new Message{MessageId ea.BasicProperties.MessageId,Body ea.Body,Exchange ea.Exchange,Route ea.RoutingKey};  4. RabbitMQ会用bytes来存储字符串因此要把头中bytes转回字符串  if (ea.BasicProperties.Headers ! null){var headers new Dictionarystring, object();foreach (var header in ea.BasicProperties.Headers){if (header.Value is byte[] bytes){headers[header.Key] Encoding.UTF8.GetString(bytes);}else{headers[header.Key] header.Value;}}message.Headers headers;}  5. 把Unix格式的Timestamp转成UTC时间  if (ea.BasicProperties.Timestamp.UnixTime 0){message.TimestampUnix ea.BasicProperties.Timestamp.UnixTime;var offset DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);message.Timestamp offset.UtcDateTime;}   6. 消息存入MongoDB  _mongoDbContext.CollectionMessage().InsertOne(message, cancellationToken: cancellationToken);   MongoDB记录     重试记录   消息检索及重发(WebApi)  1. 通过消息Id检索消息     2. 通过头消息检索消息       3. 消息重发会重新生成MessageId     AckNackReject的关系  1. 消息处理成功执行AckRabbitMQ会把消息从队列中删除。  2. 消息处理失败执行Nack或者Reject  a) 当requeuetrue时消息会重新回到队列然后当前消费者会马上再取回这条消息  b) 当requeuefalse时如果Exchange有设置Dead Letter Exchange则消息会去到Dead Letter Exchange  c) 当requeuefalse时如果Exchange没设置Dead Letter Exchange则消息从队列中删除效果与Ack相同。   3. Nack与Reject的区别在于Nack可以批量操作Reject只能单条操作。  RabbitMQ自动恢复连接Connection恢复  1. 重连Reconnect  2. 恢复连接监听Listeners  3. 重新打开通道Channels  4. 恢复通道监听Listeners  5. 恢复basic.qospublisher confirms以及transaction设置   拓扑Topology恢复  1. 重新声明交换机Exchanges  2. 重新声明队列Queues  3. 恢复所有绑定Bindings  4. 恢复所有消费者Consumers 异常处理机制  1. 临时异常如数据库网络闪断、http请求临时失效等  通过短时间重试如1秒后的方式处理也可以考虑Nack/Reject来实现重试时效性更高。   2. 时序异常如A任务依赖于B任务但可能由于调度或消费者分配的原因导致A任务先于B任务执行  通过长时间重试如1分钟、30分钟、1小时、1天等等待B任务先执行完的方式处理。    3. 业务异常由于系统测试不充分上线后发现某几个或某几种消息无法正常处理  等系统修正后通过消息重发的方式处理。   4. 系统异常业务中间件无法正常操作如网络中断、数据库宕机等  等系统恢复后通过消息重发的方式处理。   5. 非法异常一些伪造、攻击类型的消息  多次重试失败后消息从队列中被删除也可以针对此业务做进一步处理。 源码地址https://github.com/ErikXu/RabbitMesage相关文章CAP带你轻松玩转ASP.NETCore消息队列.NetCore Cap 结合 RabbitMQ 实现消息订阅[译]RabbitMQ教程C#版 - 发布订阅原文地址: https://www.cnblogs.com/Erik_Xu/p/9515208.html.NET社区新闻深度好文欢迎访问公众号文章汇总 http://www.csharpkit.com
http://www.huolong8.cn/news/322598/

相关文章:

  • 贵州建设厅造价信息网站wordpress主题包下载
  • dede网站迁移广州网络营销公司
  • 可以自己做网站服务器不玉屏网络推广公司
  • 阜宁建设网站的公司图片制作视频手机软件
  • 搭建好网站生情好域名后怎么做wordpress 外链 预览
  • 可信网站认证必须做吗彩库宝典官方app版下载
  • 哪种浏览器什么网站都可以进酒泉市住房和城乡建设局网站
  • 自助做网站傻瓜式自助建站工具招聘网络推广专员
  • 建设网站都需要注意什么wordpress登陆404
  • 做网站的客户需求报告答案电子商务网站建设论文总结
  • 昆明制作手机网站房产律师咨询免费24小时在线
  • dede网站前台没有图片电话网站域名到期
  • 泰州网站建设团队遵义酷虎网站开发
  • 有了域名怎么建设网站外包服务公司是干什么的
  • html网页框架代码实例下载班级优化大师并安装
  • 网站设计导航栏怎么做合肥高端网站设计
  • 北京中国建设部网站首页苏州高端网站设计机构
  • 怎样黑进别人的网站服务器可以做网站吗
  • 东莞网站建设服务商中企动力做的网站价格区间
  • 广告素材网站wordpress 评论登陆
  • 房屋和建设工程信息平台seo2短视频发布
  • 网站后台登陆不进去微商做网站网站
  • 山东教育网站开发公司程序开发软件有哪些
  • 做外贸如何通过网站精准找到老板联系方法唐山住房和城乡建设局网站
  • 微信长图的免费模板网站重庆在线官网
  • 遵义做网站多少钱小程序制作公司排名
  • 手机网站缩放wordpress 主题 音乐
  • 山西做网站的公司哪个好山东建设银行官方网站
  • 建设企业网站模板下载企业网站网站建设公司
  • 网站建设与维护 书代做网页设计