海会主机做的网站都能干什么的,企业画册宣传设计,网站开发计划书封面,垂直 社交网站 建设快有一周没有写博客了。前面几天正在做项目。正好#xff0c;项目中需要MQ#xff08;消息队列#xff09;#xff0c;这里我就补充一下我对mq的理解。其实在学习java中的时候#xff0c;自己也仿照RabbitMQ自己实现了一个单机的mq#xff0c;但是mq其中一个特点也就是项目中需要MQ消息队列这里我就补充一下我对mq的理解。其实在学习java中的时候自己也仿照RabbitMQ自己实现了一个单机的mq但是mq其中一个特点也就是分布式我在项目中没有涉及。这里我用go语言将RabbitMQ的操作进行一次整理 文章目录 MQ概念操作RabbitMQ安装连接生产者消费者例子生成者消费者 注意常见的问题匹配规则 MQ概念
MQ是消息队列Message Queue的缩写是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信而无需专用连接来链接它们。
市面上有许多成熟的消息队列非常多例如
RabbitMQ⾮常知名, 功能强⼤, ⼴泛使⽤的消息队列(今天要说明的)Kafka高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用大量用于大数据领域RocketMQ具有高性能、高可靠、高实时、分布式特点ActiveMQ 高可用、高性能、可伸缩的企业级面向消息服务的系统
而RabbitMQ为什么叫rabbit其实这就不得不说一个时代在16世纪左右有那么个事件吃人事件简单而言就是比人贵。而为了让羊有更多的食物。就需要和人以及动物rabbit抢地盘但是兔子一生就是一窝兔子行动非常迅速而且繁殖起来也非常疯狂所以就把Rabbit用作这个分布式软件的命名就是真么简单。
它能做什么
可靠性RabbitMQ 提供了各种功能让你权衡性能与可靠性其中包括持久性交付确认和高可用性。灵活的路由消息在到达队列之前通过交换机的路由。RabbitMQ 为典型的路由逻辑提供了几个内置的交换机类型。对于更复杂的路由则可以绑定几种交换机一起使用甚至可以自己实现交换机类型并且把它作为一个插件的来使用。集群在本地网络上的几个 RabbitMQ 服务器可以聚集在一起作为一个独立的逻辑代理来使用。联合对于服务器来说它比集群需要更多的松散和非可靠链接。为此 RabbitMQ 提供了联合模型。高度可用队列在群集中队列可以被镜像到几个机器中确保您的消息即使在出现硬件故障的安全。多协议RabbitMQ 支持上各种消息传递协议的消息传送.许多客户端有你能想到的几乎任何语言 RabbitMQ 客户端。管理用户界面RabbitMQ 附带一个简单使用管理用户界面允许您监视和控制您的消息代理的各个方面。追踪如果您的消息系统行为异常RabbitMQ 提供跟踪支持让你找出问题是什么。插件系统RabbitMQ 附带各种插件扩展并且你也可以写你自己插件.商业支持:提供商业支持、 培训和咨询。大型社区:有一个庞大的社区 RabbitMQ有各种各样的客户端、 插件、 指南等。
其基本能力
跨进程通信 指两个或多个进程之间进行数据交换的过程。 跨进程通信的方式有很多比如管道、消息队列、共享内存、信号量等异步处理 指在执行某个操作时不需要等待其完成可以继续执行其他操作的一种处理方式削峰填谷 指在同一刻传入的数据过大使得服务无法处理需要等待一些时间将峰值通过时间一批一批的处理
这个就是基础的架构
N–M 这个代表的是多对多的关系这个关系比较重要意味着必须要用多线程处理在图中 我表示的是多个交换机可以通过绑定去去控制多个队列N–1 这个代表多对1的关系。这个也比较重要也意味着我们需要多线程处理在图中表示多个客户端去访问一个服务器。
可以看出MQ其实主要由两部分构成一部分客户端一部分由服务器构成。而客户端的又分为生产客户端提交消息和消费客户端消费消息。服务器的构建就有些复杂需要构图 生产者Producer其实就是将消息上传到服务器消费者Consumer其实就是从服务器中拿出消息发布Publish就是生产者将消息上传给服务器的过程订阅Subscribe就是消费者将消息从服务器拿出的过程中间服务器BrokerServer核心服务器负责给生产者和消费者提供发布和订阅以及构建交换机和绑定以及队列的API 虚拟机VirtualHost 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可以存在多个 VirtualHost交换机Exchange⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发给不同的 Queue队列Queue真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息绑定Bind Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关系. 使⽤⼀个关联表就可以把这两个概念联系起来.不理解的可以看上面的图消息Message生产者和消费者之间传递的内容内存存储Memory方便使用快速调用硬盘存储Disk重启数据不丢失 操作RabbitMQ
安装
说起安装我就不得不说一下官方文件的安装简直令人炸毛。个人建议诺是自己有云就去看云官方文件如何实现的。如果安装在本地那就没有那么麻烦了看官网就行。我这边使用的是阿里云所以这个里放一个阿里云的文档
连接
首选就是在项目中拿到相应的操作远程的mq
go get github.com/streadway/amqp补充
端口号15672是管理rabbit的接口端口号5672是我们调用并使用的端口 如果发现端口调用不了就必须要在防火墙中将这个端口号打开强调一下我这里用的阿里云
生产者
创建一个生产者的步骤
连接 Connection创建 Channel创建或连接一个交换器创建或连接一个队列交换器绑定队列不绑定的话队列在运行完毕后会消失投递消息关闭 Channel关闭 Connection
创建连接
connection, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)创建通道
// channel
channel, err : connection.Channel()创建交换机
err channel.ExchangeDeclare(e1, direct, true, false, false, true, nil)参数依次说明
name 交换机名称kind 交换机类型durable 持久化标识autoDelete 是否自动删除internal 是否是内置交换机noWait 是否等待服务器确认args 其它配置
参数说明要点
autoDelete :
自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下处于不再使用的时候才会自动删除如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。
internal :
内置交换器是一种特殊的交换器这种交换器不能直接接收生产者发送的消息只能作为类似于队列的方式绑定到另一个交换器来接收这个交换器中路由的消息内置交换器同样可以绑定队列和路由消息只是其接收消息的来源与普通交换器不同。
noWait
当 noWait 为 true 时声明时无需等待服务器的确认。
该通道可能由于错误而关闭。 添加一个 NotifyClose 侦听器应对任何异常。创建交换器还有一个差不多的方法( ExchangeDeclarePassive )他主要是假定交换已存在并尝试连接到不存在的交换将导致 RabbitMQ 引发异常可用于检测交换器的存在。
创建队列
q, err : channel.QueueDeclare(q1, true, false, false, true, nil)参数说明
name 队列名称 durable 持久化autoDelete 自动删除exclusive 排他noWait 是否等待服务器确认args Table
参数说明要点
exclusive 排他
排他队列只对首次创建它的连接可见排他队列是基于连接 Connection 可见的并且该连接内的所有信道 Channel都可以访问这个排他队列在这个连接断开之后该队列自动删除由此可见这个队列可以说是绑到连接上的对同一服务器的其他连接不可见。
同一连接中不允许建立同名的排他队列的这种排他优先于持久化即使设置了队列持久化在连接断开后该队列也会自动删除。
非排他队列不依附于连接而存在同一服务器上的多个连接都可以访问这个队列。
autoDelete 设置是否自动删除。
为 true 则设置队列为自动删除。
自动删除的前提是:至少有一个消费者连接到这个队列之后所有与这个队列连接的消费者都断开时才会自动删除。
不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时这个队列自动删除”因为生产者客户端创建这个队列或者没有消费者客户端与这个队列连接时都不会自动删除这个队列。
创建队列还有一个差不多的方法( QueueDeclarePassive )他主要是假定队列已存在并尝试连接到不存在的队列将导致 RabbitMQ 引发异常可用于检测队列的存在。
绑定交换器和队列
err channel.QueueBind(q1, q1Key, e1, true, nil)参数解析
name 队列名称key BindingKey 根据交换机类型来设定exchange 交换机名称noWait 是否等待服务器确认args Table
绑定交换器可选
err channel.ExchangeBind(dest, q1Key, src, false, nil)参数解析
destination 目的交换器key RoutingKey 路由键source 源交换器noWait 是否等待服务器确认args Table 其它参数
生产者发送消息至交换器 source 中交换器 source 根据路由键找到与其匹配的另一个交换器 destination 井把消息转发到 destination 中进而存储在 destination 绑定的队列 queue 中某种程度上来说 destination 交换器可以看作一个队列。
投递消息
err channel.Publish(e1, q1Key, true, false, amqp.Publishing{Timestamp: time.Now(),DeliveryMode: amqp.Persistent, //Msg set as persistentContentType: text/plain,Body: []byte(Hello Golang and AMQP(Rabbitmq)!),
})exchange 交换器名称key RouterKeymandatory 是否为无法路由的消息进行返回处理immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃msg 消息体
mandatory 消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式设置为 true 表示将消息返回到生产者否则直接丢弃消息。
immediate 参数告诉服务器至少将该消息路由到一个队列中否则将消息返回给生产者。 imrnediate 参数告诉服务器如果该消息关联的队列上有消费者则立刻投递如果所有匹配的队列上都没有消费者则直接将消息返还给生产者不用将消息存入队列而等待消费者了。
RabbitMQ 3.0版本开始去掉了对 imrnediate 参数的支持。
其中 amqp.Publishing 的 DeliveryMode 如果设为 amqp.Persistent 则消息会持久化。需要注意的是如果需要消息持久化 Queue 也是需要设定为持久化才有效。
消费者
Rabbitmq 消费方式共有 2 种分别是推模式和拉模式
推模式
deliveries, err : channel.Consume(q1, any, false, false, false, true, nil)推模式是通过持续订阅的方式来消费信息 Consume 将信道( Channel )设置为接收模式直到取消队列的订阅为止。在接收模式期间 RabbitMQ 会不断地推送消息给消费者。推送消息的个数还是会受到 channel.Qos 的限制。
参数说明
queue 队列名称consumer 消息者名称autoAck 是否确认消费exclusive 排他noLocalnoWait boolargs Table
noLocal
设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个Connection 中的消费者
autoAck 可以设置为 true 或者false。
如果设为true则消费者一接收到就从queue中去除了如果消费者处理消息中发生意外该消息就丢失了。如果设为 false 则消费者在处理完消息后调用 msg.Ack(false) 后消息才从 queue 中去除。即便当前消费者处理该消息发生意外只要没有执行 msg.Ack(false) 那该消息就仍然在 queue 中不会丢失。
如果autoAck设置为 false 则表示需要手动进行 ack 消费
v, ok : -deliveries
if ok {// 手动ack确认// 注意 这里只要调用了ack就是手动确认模式// v.Ack的参数 multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认// 并不是表示设置为false就不进行当前ack确认if err : v.Ack(true); err ! nil {fmt.Println(err.Error())}
} else {fmt.Println(Channel close)
}拉模式 相对来说比较简单是由消费者主动拉取信息来消费每次只消费一条消息同样也需要进行 ack 确认消费。
channel.Get(queue string, autoAck bool)例子
生成者
这个文件起名叫new_task.go
package mainimport (fmtlogosstringsgithub.com/streadway/amqp
)func main() {// 1. 尝试连接RabbitMQ建立连接// 该连接抽象了套接字连接并为我们处理协议版本协商和认证等。conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)if err ! nil {fmt.Printf(connect to RabbitMQ failed, err:%v\n, err)return}defer conn.Close()// 2. 接下来我们创建一个通道大多数API都是用过该通道操作的。ch, err : conn.Channel()if err ! nil {fmt.Printf(open a channel failed, err:%v\n, err)return}defer ch.Close()// 3. 要发送我们必须声明要发送到的队列。q, err : ch.QueueDeclare(task_queue, // nametrue, // 持久的false, // delete when unusedfalse, // 独有的false, // no-waitnil, // arguments)if err ! nil {fmt.Printf(declare a queue failed, err:%v\n, err)return}// 4. 然后我们可以将消息发布到声明的队列body : bodyFrom(os.Args)err ch.Publish(, // exchangeq.Name, // routing keyfalse, // 立即false, // 强制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久ContentType: text/plain,Body: []byte(body),})if err ! nil {fmt.Printf(publish a message failed, err:%v\n, err)return}log.Printf( [x] Sent %s, body)
}// bodyFrom 从命令行获取将要发送的消息内容
func bodyFrom(args []string) string {var s stringif (len(args) 2) || os.Args[1] {s hello} else {s strings.Join(args[1:], )}return s
}消费者
这个文件起名叫work.go
package mainimport (bytesfmtlogtimegithub.com/streadway/amqp
)func main() {conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)if err ! nil {fmt.Printf(connect to RabbitMQ failed, err:%v\n, err)return}defer conn.Close()ch, err : conn.Channel()if err ! nil {fmt.Printf(open a channel failed, err:%v\n, err)return}defer ch.Close()// 声明一个queueq, err : ch.QueueDeclare(task_queue, // nametrue, // 声明为持久队列false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)err ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global)if err ! nil {fmt.Printf(ch.Qos() failed, err:%v\n, err)return}// 立即返回一个Delivery的通道msgs, err : ch.Consume(q.Name, // queue, // consumerfalse, // 注意这里传false,关闭自动消息确认false, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err ! nil {fmt.Printf(ch.Consume failed, err:%v\n, err)return}// 开启循环不断地消费消息forever : make(chan bool)go func() {for d : range msgs {log.Printf(Received a message: %s, d.Body)dotCount : bytes.Count(d.Body, []byte(.))t : time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf(Done)d.Ack(false) // 手动传递消息确认}}()log.Printf( [*] Waiting for messages. To exit press CTRLC)-forever
}自己要去终端DOS中去执行 go run new_task.go go run work.go 注意常见的问题
忘记确认
忘记确认是一个常见的错误。这是一个简单的错误但后果是严重的。当你的客户机退出时消息将被重新传递这看起来像随机重新传递但是RabbitMQ将消耗越来越多的内存因为它无法释放任何未确认的消息。
为了调试这种错误可以使用rabbitmqctl打印messages_unacknowledged字段 sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 在Windows平台去掉sudo rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged 持久化
将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上但是RabbitMQ接受了一条消息并且还没有保存它时仍然有一个很短的时间窗口。而且RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中而不是真正写入磁盘。持久性保证不是很强但是对于我们的简单任务队列来说已经足够了。
如果需要更强有力的担保那么可以使用publisher confirms。 ch.Qos
在一个有两个worker的情况下当所有的奇数消息都是重消息而偶数消息都是轻消息时一个worker将持续忙碌而另一个worker几乎不做任何工作RabbitMQ对此一无所知仍然会均匀地发送消息。
因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。
为了避免这种情况我们可以将预取计数设置为1。这告诉RabbitMQ不要一次向一个worker发出多个消息。或者换句话说在处理并确认前一条消息之前不要向worker发送新消息。相反它将把它发送给下一个不忙的worker。
err ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global
)关于队列大小的说明 如果所有的worker都很忙你的queue随时可能会满。你会想继续关注这一点也许需要增加更多的worker或者有一些其他的策略。 使用消息确认和预取计数可以设置工作队列work queue。即使RabbitMQ重新启动持久性选项也可以让任务继续存在。
临时队列
当我们连接到Rabbit时我们需要一个新的、空的队列。为此我们可以创建一个随机名称的队列或者更好的方法是让服务器为我们选择一个随机队列名称。
其次一旦我们断开消费者的连接队列就会自动删除。
在amqp客户端中当我们传递一个空字符串作为队列名称时我们将使用随机生成的名称创建一个非持久队列
当声明它的连接关闭时该队列将被删除因为它被声明为独占。
匹配规则
要说匹配规则就不得不说一个东西交换机类型
ExchangeType 是个表示的是交换机的类型为什么要分类型呢其实很简单需求决定业务业务决定技术。
所以这里的需求情况分为3种
一个生产者生产的消息由一个消费者去消费此时我们叫他DIRECT一个生产者生产的消息由有多个消费者去消费此时我们叫他FANOUT一个生产者生产的消息有锁必须由带有锁的消费者去消费此时我们叫他TOPIC 队列绑定在交换机的时候会指定一个bindingKey消息哪里指定一个outingKey当bindingKey与outingKey满足条件就投递到相应的队列中
所以这里的匹配规则说到底其实就是主题交换机的规则。
发送到topic交换器的消息不能具有随意的routing_key——它必须是单词列表以点分隔。这些词可以是任何东西但通常它们指定与消息相关的某些功能。一些有效的routing_key示例“stock.usd.nyse”“nyse.vmw”“quick.orange.rabbit”。routing_key中可以包含任意多个单词最多255个字节
如果我们打破约定并发送一个或四个单词的消息例如“orange”或“quick.orange.male.rabbit”,这些消息将不匹配任何绑定并且将会丢失
绑定键也必须采用相同的形式topic交换器背后的逻辑类似于direct交换器——用特定路由键发送的消息将传递到所有匹配绑定键绑定的队列。但是绑定键有两个重要的特殊情况
*星号可以代替一个单词。井号可以替代零个或多个单词。
topic交换器
topic交换器功能强大可以像其他交换器一样运行。
当队列用“#”井号绑定键绑定时它将接收所有消息而与路由键无关就像在fanout交换器中一样。
当在绑定中不使用特殊字符“*”星号和“#”井号时topic交换器的行为就像direct交换器一样。
例子
生产者emit_log_topic.go
package mainimport (logosstringsgithub.com/streadway/amqp
)func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()err ch.ExchangeDeclare(logs_topic, // nametopic, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, Failed to declare an exchange)body : bodyFrom(os.Args)err ch.Publish(logs_topic, // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: text/plain,Body: []byte(body),})failOnError(err, Failed to publish a message)log.Printf( [x] Sent %s, body)
}func bodyFrom(args []string) string {var s stringif (len(args) 3) || os.Args[2] {s hello} else {s strings.Join(args[2:], )}return s
}func severityFrom(args []string) string {var s stringif (len(args) 2) || os.Args[1] {s anonymous.info} else {s os.Args[1]}return s
}消费者receive_logs_topic.go
package mainimport (logosgithub.com/streadway/amqp
)func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()err ch.ExchangeDeclare(logs_topic, // nametopic, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, Failed to declare an exchange)q, err : ch.QueueDeclare(, // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments)failOnError(err, Failed to declare a queue)if len(os.Args) 2 {log.Printf(Usage: %s [binding_key]..., os.Args[0])os.Exit(0)}// 绑定topicfor _, s : range os.Args[1:] {log.Printf(Binding queue %s to exchange %s with routing key %s,q.Name, logs_topic, s)err ch.QueueBind(q.Name, // queue names, // routing keylogs_topic, // exchangefalse,nil)failOnError(err, Failed to bind a queue)}msgs, err : ch.Consume(q.Name, // queue, // consumertrue, // auto ackfalse, // exclusivefalse, // no localfalse, // no waitnil, // args)failOnError(err, Failed to register a consumer)forever : make(chan bool)go func() {for d : range msgs {log.Printf( [x] %s, d.Body)}}()log.Printf( [*] Waiting for logs. To exit press CTRLC)-forever
}想要接收所有的日志
go run receive_logs_topic.go #只想接收“critical”日志
go run receive_logs_topic.go *.critical如何使用rabbit到这里就结束了。