做网站用asp div代码,网站开发需要解决的问题,网页标准化对网站开发维护者的好处,网站qq未启用RabbitMQ 是当前最流行的消息中间件(Message Broker)之一#xff0c;支持多种消息协议(如 AMQP、MQTT)。同时它也是一个轻量级的非常易于部署的开源软件#xff0c;可以运行在当前大多数操作系统及云端环境中#xff0c;也能够部署在分布式的集群环境里以达到高可用、可伸缩…RabbitMQ 是当前最流行的消息中间件(Message Broker)之一支持多种消息协议(如 AMQP、MQTT)。同时它也是一个轻量级的非常易于部署的开源软件可以运行在当前大多数操作系统及云端环境中也能够部署在分布式的集群环境里以达到高可用、可伸缩的需求。此外RabbitMQ 还为目前主流的编程语言提供了丰富的开发工具。一、软件安装可以进入 官方下载界面 阅读针对自己操作系统版本的安装手册根据需求选择适合的安装方式。Windows 系统可以直接在该页面中获取二进制安装包(还需要安装 Erlang 环境)Linux 系统也可以根据发行版的不同添加特定的软件镜像源。我这里是 Ubuntu 19.04没有特别的需求所以直接从系统默认的软件镜像源里下载安装命令如下$ sudo apt-get install rabbitmq-server安装完成以后运行 systemctl status rabbitmq-server 命令查看 RabbitMQ 服务的运行状态$ systemctl status rabbitmq-server● rabbitmq-server.service - RabbitMQ Messaging ServerLoaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)Active: active (running) since Fri 2019-07-26 01:03:27 CST; 2min 55s agoMain PID: 770 (beam.smp)Status: InitializedTasks: 85 (limit: 2302)Memory: 85.8MCGroup: /system.slice/rabbitmq-server.service├─ 741 /bin/sh /usr/sbin/rabbitmq-server├─ 770 /usr/lib/erlang/erts-10.2.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/ebin -noshell -noinput -s rabbit boot -sname rabbitserver1 -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit lager_log_root /var/log/rabbitmq -rabbit lager_default_file /var/log/rabbitmq/rabbitserver1.log -rabbit lager_upgrade_file /var/log/rabbitmq/rabbitserver1_upgrade.log -rabbit enabled_plugins_file /etc/rabbitmq/enabled_plugins -rabbit plugins_dir /usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/plugins -rabbit plugins_expand_dir /var/lib/rabbitmq/mnesia/rabbitserver1-plugins-expand -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir /var/lib/rabbitmq/mnesia/rabbitserver1 -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672├─1243 erl_child_setup 65536├─1286 inet_gethost 4└─1287 inet_gethost 47月 26 01:02:44 server1 systemd[1]: Starting RabbitMQ Messaging Server...7月 26 01:03:27 server1 systemd[1]: rabbitmq-server.service: Supervising process 770 which is not our child. Well most likely not notice when it exits.7月 26 01:03:27 server1 systemd[1]: Started RabbitMQ Messaging Server.Web AdminRabbitMQ 还提供了可以远程访问的 Web 管理与监控工具默认以插件的形式安装到系统中需要使用 rabbitmq-plugins 命令开启。具体命令如下$ sudo rabbitmq-plugins enable rabbitmq_managementRabbitMQ 默认创建了一个用户名密码分别为 guest/guest 的用户只是该用户只允许本地登录。(我这里是远程。。。)如果需要远程访问 Web 控制台可以通过 rabbitmqctl 命令创建一个新的管理账户$ sudo rabbitmqctl add_user 此时新创建的账户仍无法登录还需要为其分配用户角色以及对 vhost 的管理权限命令如下$ sudo rabbitmqctl set_user_tags administrator$ sudo rabbitmqctl set_permissions -p / .* .* .*权限设置完毕后即可用之前指定的用户名密码远程登录 Web 管理系统界面如下图Web AdminWeb 形式的后台界面为管理工作与监控需求提供了便捷的接口同时大部分管理操作也可直接通过 rabbitmqctl 命令完成具体可参考该命令的帮助信息$ sudo rabbitmqctlUsage:rabbitmqctl [-n ] [-l] [-q] []...Commands:add_user add_vhost authenticate_user await_online_nodes [-t ]cancel_sync_queue [-p ] queuechange_cluster_node_type change_password ...二、架构解析RabbitMQ 是一种高性能、稳定、可伸缩(集群部署)的消息中间件由 Erlang 语言编写。Erlang 是一种函数式编程语言专注于分布式、高容错的软件类实时系统等应用场景。它通过轻量级的进程设计以及进程之间的消息通信提供了一个高层次的不需要共享状态的并发模型。RabbitMQ 集群通过 Erlang VM 原生的 IPC (inter-process communication) 机制完成跨节点的消息通信。松耦合架构对于传统的应用架构比如一个 Web 应用的登录程序往往需要对后端的数据库表格进行多项实时的写入操作。而当用户的访问量大增时此时的表格更新操作很容易成为瓶颈并影响到整体的响应速度。相对于登录程序直接更新表格数据的紧耦合架构可以将前端的请求数据推送到基于消息的中间件或者某个中心化的消息队列应用再通过中间件分发消息到多个消费者(Consumer)应用由消费者独立、异步地完成最终的数据库更新操作。image.png基于消息的中间件对于创建数据导向的灵活的应用架构有非常大的优势。RabbitMQ 支持的松耦合设计可以使应用不再受类似于数据库写操作的性能限制。同时这种架构也非常易于横向扩展可以在添加作用于相同数据的应用实例时不影响现有的核心功能。tightly coupled applicationloosely coupled application三、消息应用示例代码下文中将使用 Python 语言及其 RabbitMQ 客户端 Pika 创建 5 个基本的消息应用结构由简单到复杂源代码均参考自官网 RabbitMQ Tutorials 。安装 pika 库pip install pikaHello World该应用的结构示意图如下helloworld由 P (Producer) 发送一条消息到队列(Queue)再由队列转发消息到 C (Consumer) 。发送端代码 send.py 如下#!/usr/bin/env pythonimport pika# 初始化与 RabbitMQ 服务器的连接connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()# 队列声明channel.queue_declare(queuehello)# 发送消息channel.basic_publish(exchange, routing_keyhello, bodyHello World!)print( [x] Sent Hello World!)connection.close()接收端 reveive.py 代码如下#!/usr/bin/env pythonimport pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.queue_declare(queuehello)# 接收到消息后触发的回调函数def callback(ch, method, properties, body):print( [x] Received %r % body)# 消费者声明与消息监听channel.basic_consume(queuehello, on_message_callbackcallback, auto_ackTrue)print( [*] Waiting for messages. To exit press CTRLC)channel.start_consuming()测试首先运行 4 次发送程序$ python send.py[x] Sent Hello World$ python send.py[x] Sent Hello World$ python send.py[x] Sent Hello World$ python send.py[x] Sent Hello World从 Web 管理界面中可以看到此时队列中缓存了 4 条消息。overview运行接收端程序$ python receive.py[x] Waiting for messages. To exit press CTRLC[x] Received bHello World[x] Received bHello World[x] Received bHello World[x] Received bHello World发送端连续 4 次发送的消息被接收端收取队列中缓存的消息被清空。同时接收端保持运行状态等待新的消息被转发给自己。overview 2消息队列一直处于等待生产者发送消息和将收到或缓存的消息转发给消费者的状态。如未有消费者及时接收和处理被转发的消息则这部分消息缓存在队列中等待进一步操作。Work Queue结构示意图Work Queue本例中将创建一个 Work Queue 用来将消耗时间长的任务以轮询的方式分发给多个消费者处理。生产者源代码 new_task.py #!/usr/bin/env pythonimport pikaimport sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.queue_declare(queuetask_queue, durableTrue)message .join(sys.argv[1:]) or Hello World!channel.basic_publish(exchange,routing_keytask_queue,bodymessage,propertiespika.BasicProperties(delivery_mode2, # make message persistent))print( [x] Sent %r % message)connection.close()消费者源代码 worker.py #!/usr/bin/env pythonimport pikaimport timeconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.queue_declare(queuetask_queue, durableTrue)print( [*] Waiting for messages. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] Received %r % body)time.sleep(body.count(b.))print( [x] Done)ch.basic_ack(delivery_tagmethod.delivery_tag) # Message acknowledgmentchannel.basic_qos(prefetch_count1)channel.basic_consume(queuetask_queue, on_message_callbackcallback)channel.start_consuming()Message acknowledgment消费者在处理接收到的任务或消息时有可能会消耗比较多的时间在此过程中如消费者端出现软硬件故障则会出现消息丢失的情况。RabbitMQ 支持 Message acknowledgment 。即消费者在接收和处理完一个特定的消息后会向 RabbitMQ 返回一个应答(ack)说明该消息可以从队列中移除。如果消费者在返回应答之前丢失与队列的连接则 RabbitMQ 判定对应的消息未由消费者完全处理会将该消息保留在队列中并重新分发给其他在线的消费者。Message durability消息应答的机制可以确保即使消费者宕机的情况下任务仍不会丢失。但是当 RabbitMQ 服务本身出现故障时队列以及队列中缓存的消息仍旧会被清理掉。为了保证 RabbitMQ 中队列以及消息的持久化首先需要在生产者和消费者代码中同时声明队列为 durable channel.queue_declare(queuetask_queue, durableTrue)此外还需要将生产者代码中的 delivery_mode 属性设置为 2 确保消息的持久化propertiespika.BasicProperties(delivery_mode2,)测试打开两个命令行终端分别运行 worker.py 程序# Shell 1$ python worker.py[x] Waiting for messages. To exit press CTRLC# Shell 2$ python worker.py[x] Waiting for messages. To exit press CTRLC打开另一个终端窗口运行 new_task.py 程序发送 4 条消息# Shell 3$ python new_task.py First Message[x] Sent First Message$ python new_task.py Second Message[x] Sent Second Message$ python new_task.py Third Message[x] Sent Third Message$ python new_task.py Forth Message[x] Sent Forth Message最终两个消费者分别接收到队列分发的两条消息# Shell 1$ python worker.py[x] Waiting for messages. To exit press CTRLC[x] Received bFirst Message[x] Done[x] Received bThird Message[x] Done# Shell 2$ python worker.py[x] Waiting for messages. To exit press CTRLC[x] Received bSecond Message[x] Done[x] Received bForth Message[x] DoneFair dispatch当 RabbitMQ 以轮询的方式(即平均分配)将队列中的消息转发给多个消费者时如果这些消费者接收到的任务繁重程度差异很大则会导致某些消费者端任务的积压。为了避免这种情况发生可以使用 basic_qos 方法设置 prefetch 的值如 worker.py 程序中的以下代码channel.basic_qos(prefetch_count1) 。该行代码可以确保同一个消费者在任意时间点最多只接受 1 个任务分配给自己。即如果某个消费者当前有未处理完的消息则不再接收新的消息直到当前的任务处理完。Publish/Subscribe结构示意图Publish-SubscribeExchange在之前的示例中用到了消息队列模型中的以下几个组件producer 生产者即发送消息的应用queue 队列即存储消息的缓存consumer 消费者即接收消息的应用实际上在 RabbitMQ 的消息模型中生产者从来不会将消息直接发送到队列中而是将消息发送给一个名为 exchange 的组件。exchange 的一端用来接收生产者发送的消息一端用来将消息推送到队列中。它通过 exchange type 中的定义判断特定的消息是该推送给某个对应的队列还是将其广播给多个队列又或者直接丢弃。RabbitMQ 主要提供了 4 种 exchange 类型direct、topic、headers 和 fanout。本例中使用 fanout即 exchange 会将接收到的消息以广播的形式发送给所有关联的队列再由队列传递给消费者处理。源代码(emit_log.py)如下#!/usr/bin/env pythonimport pikaimport sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangelogs, exchange_typefanout)message .join(sys.argv[1:]) or info: Hello World!channel.basic_publish(exchangelogs, routing_key, bodymessage)print( [x] Sent %r % message)connection.close()receive_logs.py#!/usr/bin/env pythonimport pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangelogs, exchange_typefanout)result channel.queue_declare(queue, exclusiveTrue)queue_name result.method.queuechannel.queue_bind(exchangelogs, queuequeue_name)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r % body)channel.basic_consume(queuequeue_name, on_message_callbackcallback, auto_ackTrue)channel.start_consuming()receive_logs.py 文件中有一行 result channel.queue_declare(queue, exclusiveTrue) 代码用来声明一个临时队列( queue 没有指定名称因此会由 RabbitMQ 设置随机的名称)同时 exclusiveTrue 设置该队列在消费者断开连接后自行删除。测试同时打开两个命令行窗口分别运行 receive_logs.py 文件# Shell 1$ python receive_logs.py[*] Waiting for logs. To exit press CTRLC# Shell 2$ python receive_logs.py[*] Waiting for logs. To exit press CTRLC再打开第三个终端执行 emit_log.py 命令 4 次# Shell 3$ python emit_log.py[x] Sent info: Hello World!$ python emit_log.py[x] Sent info: Hello World!$ python emit_log.py[x] Sent info: Hello World!$ python emit_log.py[x] Sent info: Hello World!此时之前运行的两个 receive 程序同时收到发送的 4 条消息$ python receive_logs.py[*] Waiting for logs. To exit press CTRLC[x] binfo: Hello World![x] binfo: Hello World![x] binfo: Hello World![x] binfo: Hello World!$ python receive_logs.py[*] Waiting for logs. To exit press CTRLC[x] binfo: Hello World![x] binfo: Hello World![x] binfo: Hello World![x] binfo: Hello World!Routing结构示意图Routing与上一个例子中以广播的形式转发消息不同本例中允许消费者通过队列有选择地订阅生产者发送的部分消息。Binding 和 Direct exchange在 RabbitMQ 中binding 代表 exchange 与队列的对应关系即队列会根据 binding 的设置对 exchange 转发的消息有选择性地接收。因此 binding 的最终效果也依赖于 exchange 的类型。比如之前用到的 fanout 类型由于是广播的形式(转发给所有关联的队列)并不需要选择的动作则 binding 的值被忽略。但是对于 direct 类型的 exchange 则可以通过 binding 对消息进行筛选。在 direct exchange 下只有当队列的 binding_key 与消息的 routing_key 一致时队列才会收到 exchange 转发的消息。emit_log_direct.py#!/usr/bin/env pythonimport pikaimport sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangedirect_logs, exchange_typedirect)severity sys.argv[1] if len(sys.argv) 1 else infomessage .join(sys.argv[2:]) or Hello World!channel.basic_publish(exchangedirect_logs, routing_keyseverity, bodymessage)print( [x] Sent %r:%r % (severity, message))connection.close()receive_logs_direct.py#!/usr/bin/env pythonimport pikaimport sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangedirect_logs, exchange_typedirect)result channel.queue_declare(queue, exclusiveTrue)queue_name result.method.queueseverities sys.argv[1:]if not severities:sys.stderr.write(Usage: %s [info] [warning] [error]\n % sys.argv[0])sys.exit(1)for severity in severities:channel.queue_bind(exchangedirect_logs, queuequeue_name, routing_keyseverity)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(queuequeue_name, on_message_callbackcallback, auto_ackTrue)channel.start_consuming()测试首先运行 receive_logs_direct.py 程序并指定参数为 error(即只接收标记为“error”的消息)# Shell 1$ python receive_logs_direct.py error[*] Waiting for logs. To exit press CTRLC打开另一终端同样运行 receive_logs_direct.py 程序并指定参数为 info warning(即接收标记为 info 或 warning 的消息)# Shell 2$ python receive_logs_direct.py info warning[*] Waiting for logs. To exit press CTRLC打开第三个终端并运行 emit_log_direct.py 程序发送 4 条日志消息# Shell 3$ python emit_log_direct.py error This is an error[x] Sent error:This is an error$ python emit_log_direct.py info Hi, I am an info[x] Sent info:Hi, I am an info$ python emit_log_direct.py warning Yeah, its a warning[x] Sent warning:Yeah, its a warning$ python emit_log_direct.py error Hi, its an error again[x] Sent error:Hi, its an error again此时 Shell 1 中只接收到了标记为 error 的消息$ python receive_logs_direct.py error[*] Waiting for logs. To exit press CTRLC[x] error:bThis is an error[x] error:bHi, its an error again而 Shell 2 中接收到了标记为 info 和 warning 的消息$ python receive_logs_direct.py info warning[*] Waiting for logs. To exit press CTRLC[x] info:bHi, I am an info[x] warning:bYeah, its a warningTopics结构示意图Topicsdirect 类型的 exchange 虽然可以根据消息的 routing_key 以及队列的 binding_key 有选择性的推送消息到队列但是并不适合更复杂的场景。而 topic 类型的 exchange 与 direct 类型逻辑上大致相同只是 topic 类型的 exchange 并没有一个明确的 routing_key而是由几个点号(.)分隔的单词(如 lazy.orange.cat)进行定义。与之对应的 binding_key 也需要遵循同样的形式只不过 binding_key 额外支持两个特殊含义的字符星号(*)可以表示某一个任意的单词井号(#)可以表示任意 0 个或多个单词因此对于上图(Topics)中的情形routing_key 为 quick.orange.rabbit 的消息会被转发给 Q1 和 Q2 队列quick.orange.fox 则只会转发给 Q1 队列lazy.orange.male.rabbit 被转发给 Q2 队列。emit_log_topic#!/usr/bin/env pythonimport pikaimport sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangetopic_logs, exchange_typetopic)routing_key sys.argv[1] if len(sys.argv) 2 else anonymous.infomessage .join(sys.argv[2:]) or Hello World!channel.basic_publish(exchangetopic_logs, routing_keyrouting_key, bodymessage)print( [x] Sent %r:%r % (routing_key, message))connection.close()receive_logs_topic.py#!/usr/bin/env pythonimport pikaimport sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangetopic_logs, exchange_typetopic)result channel.queue_declare(, exclusiveTrue)queue_name result.method.queuebinding_keys sys.argv[1:]if not binding_keys:sys.stderr.write(Usage: %s [binding_key]...\n % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchangetopic_logs, queuequeue_name, routing_keybinding_key)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(queuequeue_name, on_message_callbackcallback, auto_ackTrue)channel.start_consuming()测试先运行接收端程序(Shell 1 和 Shell 2)再运行发送端(Shell 3)效果如下# Shell 3$ python emit_log_topic.py kern.warning A kernel warning message[x] Sent kern.warning:A kernel warning message$ python emit_log_topic.py network.critical A critical network error[x] Sent network.critical:A critical network error$ python emit_log_topic.py kern.critical A critical kernel error[x] Sent kern.critical:A critical kernel error# Shell 1$ python receive_logs_topic.py kern.*[*] Waiting for logs. To exit press CTRLC[x] kern.warning:bA kernel warning message[x] kern.critical:bA critical kernel error# Shell 2$ python receive_logs_topic.py *.critical[*] Waiting for logs. To exit press CTRLC[x] network.critical:bA critical network error[x] kern.critical:bA critical kernel error参考资料