哪家微网站做的好,公司免费网站注册,网络设计解决如何将初步规划中的各个子系统从内部用,怎么网站后台集成Spring Boot和RocketMQ
在现代的微服务架构中#xff0c;消息队列已经成为一种常见的异步处理模式#xff0c;它能解决服务间的同步调用、耦合度高、流量高峰等问题。RocketMQ是阿里巴巴开源的一款消息中间件#xff0c;性能优秀#xff0c;功能齐全#xff0c;被广泛…集成Spring Boot和RocketMQ
在现代的微服务架构中消息队列已经成为一种常见的异步处理模式它能解决服务间的同步调用、耦合度高、流量高峰等问题。RocketMQ是阿里巴巴开源的一款消息中间件性能优秀功能齐全被广泛应用在各种业务场景。
本文将详细介绍如何在Spring Boot项目中集成RocketMQ实现消息的生产和消费。
开发环境
JDK 1.8 或更高RocketMQ 4.8.0 或更高Spring Boot 2.3.1.RELEASE 或更高Maven 3.0 或更高
RocketMQ服务器部署
首先我们需要在本地或服务器上部署RocketMQ。具体的部署步骤可以参考RocketMQ官方文档。为了简化部署我们可以使用Docker进行部署。
Spring Boot项目创建
我们使用Spring Initializr创建一个新的Spring Boot项目选择Web、Lombok和RocketMQ Spring Boot Starter为项目依赖。
pom.xml示例
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-jpa/artifactId/dependencydependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.1.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies配置RocketMQ
在application.properties文件中配置RocketMQ的服务器地址和其他相关参数。
rocketmq.name-server127.0.0.1:9876
rocketmq.producer.groupmy-group在这里rocketmq.name-server是RocketMQ服务器的地址rocketmq.producer.group是生产者的组名。
消息生产者
接下来我们创建一个消息生产者。在Spring Boot项目中我们可以使用RocketMQTemplate来发送消息。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;RestController
public class ProducerController {Autowiredprivate RocketMQTemplate rocketMQTemplate;GetMapping(/send)public String send(String message) {rocketMQTemplate.convertAndSend(test-topic, message);return Message: message sent.;}
}上述代码中我们创建了一个RESTful接口/send当接口被调用时它将发送一个消息到test-topic主题。
消息消费者
接下来我们创建一个消息消费者。在Spring Boot项目中我们可以使用RocketMQMessageListener注解来定义一个消息消费者。
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;Service
RocketMQMessageListener(topic test-topic, consumerGroup my-consumer_test-topic)
public class ConsumerService implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.printf(------- StringConsumer received: %s \n, message);}
}上述代码中我们定义了一个消息消费者它将监听test-topic主题的消息当有新的消息时它将打印消息内容。
测试
至此我们已经完成了Spring Boot集成RocketMQ的所有代码。接下来我们就可以运行Spring Boot项目并通过访问/send接口来发送消息查看控制台的输出来验证消息消费者是否可以正常接收消息。
这就是Spring Boot集成RocketMQ的全过程。RocketMQ作为一款功能强大的消息中间件不仅支持基本的消息生产和消费还支持许多高级特性如事务消息、顺序消息、延迟消息等。在实际的项目开发中我们可以根据业务需求选择合适的消息模型提高系统的可用性和可靠性。
事务消息
RocketMQ支持发送事务消息也就是说在发送消息的同时我们可以执行本地的数据库操作只有当本地的数据库操作成功时消息才会真正被发送出去。
下面是一个发送事务消息的例子
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.*;RestController
public class TransactionProducerController {Autowiredprivate RocketMQTemplate rocketMQTemplate;GetMapping(/sendTransaction)public String sendTransaction(String message) {ExecutorService executor new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(5000), r - {Thread thread new Thread(r);thread.setName(client-transaction-msg-check-thread);return thread;});TransactionListener transactionListener new TransactionListenerImpl();TransactionMQProducer producer rocketMQTemplate.createAndStartTransactionMQProducer(transaction-group,transactionListener,executor);producer.sendMessageInTransaction(test-topic, TagA, message, null);return Transaction Message: message sent.;}
}
在上述代码中我们创建了一个TransactionMQProducer并设置了一个TransactionListener来处理事务的提交和回滚。当发送事务消息时我们需要调用sendMessageInTransaction方法。
顺序消息
RocketMQ支持发送顺序消息也就是说消息会按照发送的顺序被消费。
下面是一个发送顺序消息的例子
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;RestController
public class OrderlyProducerController {Autowiredprivate RocketMQTemplate rocketMQTemplate;GetMapping(/sendOrderly)public String sendOrderly(String message) {for (int i 0; i 100; i) {rocketMQTemplate.syncSendOrderly(orderly_topic, MessageBuilder.withPayload(message i).build(), hashkey);}return Orderly Message: message sent.;}
}在上述代码中我们调用syncSendOrderly方法发送顺序消息。该方法的第三个参数是hashkeyRocketMQ会根据这个key来决定消息发送到哪个队列具有相同hashkey的消息会发送到同一个队列。
延迟消息
RocketMQ支持发送延迟消息也就是说消息不会立即被消费而是会在指定的时间后被消费。
下面是一个发送延迟消息的例子
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;RestController
public class DelayProducerController {Autowiredprivate RocketMQTemplate rocketMQTemplate;GetMapping(/sendDelay)public String sendDelay(String message) {rocketMQTemplate.syncSend(delay_topic, MessageBuilder.withPayload(message).build(), 1000, 4);return Delay Message: message sent.;}
}在上述代码中我们调用syncSend方法发送延迟消息。该方法的第三个参数是延迟时间第四个参数是延迟级别。
以上就是Spring Boot集成RocketMQ的详细步骤和示例代码希望对大家有所帮助。