简单网站,海尔网站建设的缺点,营销网络,北京知名的网站建设公司排名今天开始RabbitMQ教程的第二讲#xff0c;废话不多说#xff0c;直接进入话题。 (使用.NET 客户端 进行事例演示) 在第一个教程中#xff0c;我们编写了一个从命名队列中发送和接收消息的程序。在本教程中#xff0c;我们将创建一个工作队列#xff0c;这个队列将用于在…今天开始RabbitMQ教程的第二讲废话不多说直接进入话题。 (使用.NET 客户端 进行事例演示) 在第一个教程中我们编写了一个从命名队列中发送和接收消息的程序。在本教程中我们将创建一个工作队列这个队列将用于在多个工人之间分配耗时的任务。 工作队列【又名任务队列】背后主要的思想是避免立刻执行耗时的工作任务并且一直要等到它结束为止。相反我们规划任务并晚些执行。我们封装一个任务作为消息发送到一个命名的消息队列中后台运行的工作线程将获取任务并且最终执行该任务。当你运行很多的任务的时候他们会 共享工作线程和队列。 这个概念在Web应用程序中是尤其有用的异步执行可以在短时间内处理一个复杂Http请求。1、准备工作 在本系列教程的前一个教程中我们发送了一个包含“Hello World”的消息现在我们发送一个代表复杂任务的字符串。我们不会创建一个真实的任务比如对图像文件进行处理或PDF文件的渲染因此让我们假装我们很忙-通过采用Thread.Sleep()功能来实现复杂和繁忙。我们将根据字符串中的点的数量作为它的复杂性每一个点将占一秒钟的“工作”。例如一个假的任务描述Hello…有三个点我们就需要三秒。 我们将稍微修改一下我们以前的例子中Send 程序的代码允许从命令行发送任意消息。这个程序将把任务发送到我们的消息队列中所以我们叫它NewTask 像教程一我们需要生成两个项目。 dotnet new console --name NewTask mv NewTask/Program.cs NewTask/NewTask.cs dotnet new console --name Worker mv Worker/Program.cs Worker/Worker.cs cd NewTask dotnet add package RabbitMQ.Client dotnet restore cd ../Worker dotnet add package RabbitMQ.Client dotnet restore var message GetMessage(args); var body Encoding.UTF8.GetBytes(message); var properties channel.CreateBasicProperties(); properties.Persistent true; channel.BasicPublish(exchange: , routingKey: task_queue, basicProperties: properties, body: body);
信息数据我们可以从命令行的参数获得
private static string GetMessage(string[] args)
{ return ((args.Length 0) ? string.Join( , args) : Hello World!);
} 我们的旧Receive.cs代码也需要一些修改需要为消息体中每个点都需要消耗一秒钟的工作先要计算出消息体内有几个点号然后在乘以1000就是这个复杂消息所消耗的时间同时表示这是一个复杂任务。RabbitMQ将处理和发送理消息并且执行这个任务让我们拷贝以下代码黏贴到Worker的项目中并进行相应的修改 var consumer new EventingBasicConsumer(channel); consumer.Received (model, ea) { var body ea.Body; var message Encoding.UTF8.GetString(body); Console.WriteLine( [x] Received {0}, message); int dots message.Split(.).Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine( [x] Done); }; channel.BasicConsume(queue: task_queue, noAck: true, consumer: consumer); 我们自己假设的任务的模拟执行时间就是 int dots message.Split(.).Length - 1;Thread.Sleep(dots * 1000);
2、轮询调度 我们使用任务队列的好处之一就是使任务可以并行化增加系统的并行处理能力。如果我们正在建立一个积压的工作我们可以紧紧增加更多的Worker实例就可以完成大量工作的处理修改和维护就很容易。 首先让我们同时运行两个Worker实例。他们都会从队列中得到消息但具体如何让我想想。 你需要打开三个控制台的应用程序。两个控制台程序将运行Wroker程序。这些控制台程序将是我们的两个消费者C1和C2。
# shell 1 cd Worker dotnet run # [*] Waiting for messages. To exit press CTRLC # shell 2 cd Worker dotnet run # [*] Waiting for messages. To exit press CTRLC 在第三个控制台应用程序中我们将发布新的任务。只要你已经启动了消费者程序你可以看到一些发布的信息
# shell 3 cd NewTask dotnet run First message. dotnet run Second message.. dotnet run Third message... dotnet run Fourth message.... dotnet run Fifth message..... 让我们看看交付了什么东西在Workers:
# shell 1 # [*] Waiting for messages. To exit press CTRLC # [x] Received First message. # [x] Received Third message... # [x] Received Fifth message..... # shell 2 # [*] Waiting for messages. To exit press CTRLC # [x] Received Second message.. # [x] Received Fourth message....
默认情况下RabbitMQ将会发送每一条消息给序列中每一个消费者。每个消费者都会得到相同数量的信息。这种分发消息的方式叫做轮询。我们尝试这三个或更多的Workers。 3、消息确认 处理一个任务可能需要几秒钟。如果有一个消费者开始了一个长期的任务并且只做了一部分就发生了异常你可能想知道到底发生了什么。我们目前的代码一旦RabbitMQ发送一个消息给客户立即从内存中移除。在这种情况下如果你关掉了一个Worker我们将失去它正在处理的信息。我们也将丢失发送给该特定员工但尚未处理的所有信息。 但我们不想失去任何任务。如果一个Worker出现了问题我们希望把这个任务交给另一个Woker。 为了确保消息不会丢失RabbitMQ支持消息确认机制。ACKnowledgement确认消息是从【消息使用者】发送回来告诉RabbitMQ结果的一种特殊消息确认消息告诉RabbitMQ指定的接受者已经收到、处理并且RabbitMQ你可以自由删除它。 如果一个【消费者Consumer】死亡其通道关闭连接被关闭或TCP连接丢失不会发送ACKRabbitMQ将会知道这个消息并没有完全处理将它重新排队。如果有其他用户同时在线它就会快速地传递到另一个【消费者】。这样你就可以肯定没有消息丢失即使【Worker】偶尔死了或者出现问题。 在没有任何消息超时当【消费者】死亡的时候RabbitMQ会重新发送消息。只要是正常的即使处理消息需要很长很长的时间也会重发消息给【消费者】。 消息确认的机制默认是打开的。在以前的例子中我们明确地把它们关闭设置noAck“没有手动确认”参数为true。是时候删除这个标志了并且从Worker发送一个适当确认消息一旦我们完成了工作任务。 var consumer new EventingBasicConsumer(channel); consumer.Received (model, ea) { var body ea.Body; var message Encoding.UTF8.GetString(body); Console.WriteLine( [x] Received {0}, message); int dots message.Split(.).Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine( [x] Done); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: task_queue, noAck: false, consumer: consumer); 使用这个代码我们可以肯定的是即使你使用Ctrl C关掉一个正在处理消息的Worker也不会丢失任何东西。【Worker】被杀死后未被确认的消息很快就会被退回。4、忘记确认 忘记调用BasicAck这是一个常见的错误。虽然这是一个简单的错误但后果是严重的。消息会被退回时你的客户退出这可能看起来像是随机的但是RabbitMQ将会使用更多的内存保存这些任何延迟确认消息。 为了调试这种错误你可以使用rabbitmqctl打印messages_unacknowledged字段值sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 如果是在Window环境下删除掉sudo字符就可以rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged5、持久性的消息 我们已经学会了如何确保即使【消费者】死亡任务也不会丢失。但是如果RabbitMQ服务器停止了我们的任务仍然会丢失的。 当RabbitMQ退出或死机会清空队列和消息除非你告诉它即使宕机也不能丢失任何东西。要确保消息不会丢失有两件事情我们是必需要做的我们需要将队列和消息都标记为持久的。 首先我们需要确保我们RabbitMQ从来都不会损失我们的的队列。为了做到这一点我们需要声明我们的队列为持久化的 channel.QueueDeclare(queue: hello,durable: true,exclusive: false,autoDelete: false,arguments: null); 虽然这个命令本身是正确的它不会起作用在我们目前的设置中。这是因为我们已经定义了一个叫hello的队列它不是持久化的。RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列在任何程序代码中都试图返回一个错误。但有一个快速的解决方法-让我们声明一个名称不同的队列例如task_queue
channel.QueueDeclare(queue: task_queue,durable: true,exclusive: false,autoDelete: false,arguments: null); 这行代码QueueDeclare表示队列的声明创建并打开队列这个段代码需要应用到【生产者】和【消费者】中。 在这一点上我们相信task_queue队列不会丢失任何东西即使RabbitMQ重启了。现在我们要通过设置IbasicProperties.SetPersistent属性值为true来标记我们的消息持久化的。
var properties channel.CreateBasicProperties(); properties.Persistent true; 关于消息持久性的注意 将消息标记为持久性并不能完全保证消息不会丢失。虽然该设置告诉RabbitMQ时时刻刻把保存消息到磁盘上但是这个时间间隔还是有的当RabbitMQ已经接受信息但并没有保存它此时还有可能丢失。另外RabbitMQ不会为每个消息调用fsync2--它可能只是保存到缓存并没有真正写入到磁盘。虽然他的持久性保证不强但它我们简单的任务队列已经足够用了。如果您需要更强的保证那么您可以使用Publisher Comfirms。6、公平调度 你可能已经注意到调度仍然没有像我们期望的那样的工作。例如在两个Workers的情况下当所有的奇数消息是沉重的甚至消息是轻的一个Worker忙个不停而另一个Worker几乎没事可做。哎RabbitMQ对上述情况一无所知仍将消息均匀发送。 发生这种情况是因为当有消息进入队列的时候RabbitMQ才仅仅调度了消息。它根本不看【消费者】未确认消息的数量它只是盲目的把第N个消息发送给第N个【消费者】。 为了避免上述情况的发生我们可以使用prefetchcount 1的设置来调用BasicQos方法。这个方法告诉RabbitMQ在同一时间不要发送多余一个消息的数据给某个【Worker】。或者换句话说当某个消息处理完毕并且已经收到了消息确认之后才可以继续发送消息给那个【Worker】。相反它将把消息分配给给下一个不忙的【Worker】。
channel.BasicQos(0, 1, false); 注意队列大小 如果所有的工人都很忙你的队列可以填满。你要留意这一点也许会增加更多的【Worker】或者有其他的策略。7、把所有的代码放在一起NewTask.cs类最终的代码是
using System;
using RabbitMQ.Client;
using System.Text; class NewTask
{ public static void Main(string[] args) { var factory new ConnectionFactory() { HostName localhost }; using(var connection factory.CreateConnection()) using(var channel connection.CreateModel()) { channel.QueueDeclare(queue: task_queue, durable: true, exclusive: false, autoDelete: false, arguments: null); var message GetMessage(args); var body Encoding.UTF8.GetBytes(message); var properties channel.CreateBasicProperties(); properties.Persistent true; channel.BasicPublish(exchange: , routingKey: task_queue, basicProperties: properties, body: body); Console.WriteLine( [x] Sent {0}, message); } Console.WriteLine( Press [enter] to exit.); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length 0) ? string.Join( , args) : Hello World!); }
}
Worker.cs完整源码如下 using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading; class Worker
{ public static void Main() { var factory new ConnectionFactory() { HostName localhost }; using(var connection factory.CreateConnection()) using(var channel connection.CreateModel()) { channel.QueueDeclare(queue: task_queue, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine( [*] Waiting for messages.); var consumer new EventingBasicConsumer(channel); consumer.Received (model, ea) { var body ea.Body; var message Encoding.UTF8.GetString(body); Console.WriteLine( [x] Received {0}, message); int dots message.Split(.).Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine( [x] Done); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: task_queue, noAck: false, consumer: consumer); Console.WriteLine( Press [enter] to exit.); Console.ReadLine(); } }
}
使用消息确认和BasicQos方法可以建立一个工作队列。持久化的选项可以让我们的任务队列保持存活即使RabbitMQ重启。 好了写完了翻译的不好大家见谅。 原文地址如下http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
相关文章
RabbitMQ系列教程之一我们从最简单的事情开始Hello World如何优雅的使用RabbitMQ.NET 使用 RabbitMQ 图文简介RabbitMQ 高可用集群搭建及电商平台使用经验总结搭建高可用的rabbitmq集群 Mirror Queue 使用C#驱动连接RabbitMQ消息队列应用体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景 原文地址http://www.cnblogs.com/PatrickLiu/p/6943830.html .NET社区新闻深度好文微信中搜索dotNET跨平台或扫描二维码关注