珠海专业的免费建站,网站ftp用户名和密码,网站建设微信软文,北京网站建设专栏导航 RabbitMQ入门指南 从零开始了解大数据 目录
专栏导航
前言
一、AMQP协议
1.AMQP
2.Spring AMQP
二、使用Spring AMQP实现对RabbitMQ的消息收发
1.案例准备阶段
2.入门案例#xff08;无交换机#xff09;
3.任务模型案例#xff08;Work Queues#xff0… 专栏导航 RabbitMQ入门指南 从零开始了解大数据 目录
专栏导航
前言
一、AMQP协议
1.AMQP
2.Spring AMQP
二、使用Spring AMQP实现对RabbitMQ的消息收发
1.案例准备阶段
2.入门案例无交换机
3.任务模型案例Work Queues
总结 前言
RabbitMQ是一个高效、可靠的开源消息队列系统广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了AMQP、Spring AMQP和使用SpringAMQP实现对RabbitMQ的消息收发等内容。 一、AMQP协议
1.AMQP
全称为Advanced Message Queuing Protocol是一种用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关更符合微服务中独立性的要求。通过AMQP不同的应用程序可以在不改变各自实现方式的情况下进行跨平台、跨语言的消息通信。
AMQP协议定义了消息的传输方式和消息的元数据例如消息的发送者、接收者、消息体、消息类型等。这些元数据可以帮助应用程序对消息进行正确的处理。
2.Spring AMQP
在Spring框架中有一个Spring AMQP的项目它基于AMQP协议定义了一套API规范提供了模板来发送和接收消息。这个项目包含两部分其中spring-amqp是基础抽象而spring-rabbit是底层的默认实现。
Spring AMQP通过提供模板和抽象层简化了应用程序与RabbitMQ的交互。它提供了一组易于使用的API用于发送和接收消息。这些API可以帮助开发人员更专注于业务逻辑而不是消息的发送和接收细节。
spring-rabbit是Spring AMQP的一部分它基于RabbitMQ实现了AMQP协议。spring-rabbit提供了对RabbitMQ的封装使开发人员可以通过简单的配置和API调用与RabbitMQ进行交互。
Spring AMQP 主要功能
自动声明和配置队列、交换机及其绑定关系通过简化队列和交换器的创建和管理过程Spring AMQP 帮助开发人员专注于实现业务逻辑而不是手动配置消息中间件。基于注解的监听器模式实现异步消息接收通过注解Spring AMQP 可以自动将方法与特定的队列或交换机绑定从而实现异步接收和处理消息。这种模式提高了应用程序的响应性能和吞吐量。封装了 RabbitTemplate 工具用于发送消息RabbitTemplate 是 RabbitMQ 的核心类之一用于发送和接收消息。Spring AMQP 提供了对这个工具的封装使得开发人员可以方便地使用它来发送消息。
官方文档
Spring AMQPhttps://spring.io/projects/spring-amqp
二、使用Spring AMQP实现对RabbitMQ的消息收发
1.案例准备阶段
项目结构如下 项目结构介绍 mq-demo父工程管理项目依赖 publisher消息的发送者 consumer消息的消费者
在父工程引入spring-amqp依赖
!--AMQP依赖--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
项目完整依赖如下
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.rye.demo/groupIdartifactIdmq-demo/artifactIdversion1.0-SNAPSHOT/versionmodulesmodulepublisher/modulemoduleconsumer/module/modulespackagingpom/packagingparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.5.15/versionrelativePath//parentpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--AMQP依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--单元测试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency!--Jackson--dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactId/dependency/dependencies
/project
在application.yml中配置RabbitMQ服务端信息每个微服务都需要配置
spring:rabbitmq:host: 10.0.0.100port: 5672virtual-host: /demousername: userpassword: 123456
2.入门案例无交换机
案例模型 在RabbitMQ管理控制台新建队列 查看新建结果 在publisher服务中编写测试类并利用RabbitTemplate实现消息发送 Autowiredprivate RabbitTemplate rabbitTemplate;Testvoid testSendMessage2Queue() {// 队列名称String queueName demo.queue;// 消息String msg First demo;// 发送消息rabbitTemplate.convertAndSend(queueName, msg);}
运行测试用例查看结果 在consumer服务中新建一个类实现消息接收
Component
public class MqListener {RabbitListener(queues demo.queue)public void listenSimpleQueue(String msg) {System.out.println(消息 msg);}
}
启动consumer服务查看消息一旦监听的队列中有了消息就会推送给当前服务 3.任务模型案例Work Queues
让多个消费者绑定一个队列共同消费队列中的消息。
案例模型 在RabbitMQ管理控制台新建队列 查看新建结果 在publisher服务中的测试类添加一个测试方法通过循环发送模拟大量消息堆积现象 Testvoid testWorkQueue() throws InterruptedException {String queueName work.queue;for (int i 1; i 50; i) {String msg Work Queues i;rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}}
在consumer服务的类中添加2个新的方法模拟多个消费者绑定同一个队列 RabbitListener(queues work.queue)public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收work.queue消息 msg);}RabbitListener(queues work.queue)public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2接收work.queue消息 msg);}
运行结果 修改consumer服务类中的方法 消费者1 sleep了20毫秒相当于每秒钟处理50个消息 消费者2 sleep了200毫秒相当于每秒处理5个消息 RabbitListener(queues work.queue)public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收work.queue消息 msg);Thread.sleep(20);}RabbitListener(queues work.queue)public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2接收work.queue消息 msg);Thread.sleep(200);}
重启后查看运行结果 以上结果表明默认情况下RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者并没有考虑到消费者是否已经处理完消息可能出现消息堆积。
修改consumer服务的application.yml设置preFetch值为1确保同一时刻最多投递给消费者1条消息每次只能获取一条消息处理完成才能获取下一个消息:
spring:rabbitmq:host: 10.0.0.100port: 5672virtual-host: /demousername: userpassword: 123456listener:simple:prefetch: 1
重启后查看运行结果 总结
RabbitMQ是一个开源的消息队列软件旨在提供可靠的消息传递和消息队列功能。本文主要介绍了AMQP、Spring AMQP和使用Spring AMQP实现对RabbitMQ的消息收发等内容希望对大家有所帮助。