一站式营销推广,公司地址,商业设计要点,跨境电商怎么注册RocketMQ是阿里巴巴旗下一款开源的MQ框架#xff0c;经历过双十一考验、Java编程语言实现#xff0c;有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件#xff0c;支持事务消息、顺序消息、批量消息、定时消息、消息回溯等#xff0c;总之…RocketMQ是阿里巴巴旗下一款开源的MQ框架经历过双十一考验、Java编程语言实现有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件支持事务消息、顺序消息、批量消息、定时消息、消息回溯等总之就是葛大爷的一句话 整篇文章可以大致分为三个部分第一部分属于一些核心概念和工作流程的讲解第二部分就是纯手动搭建了一套环境第三部分是基于环境进行测试和集成到SpringBoot因为整个过程讲的比较细所以我称之为“保姆级教程”。
好了废话补多少直接进入主题。
核心概念 NameServer可以理解为是一个注册中心主要是用来保存topic路由信息管理Broker。在NameServer的集群中NameServer与NameServer之间是没有任何通信的。 Broker核心的一个角色主要是用来保存topic的信息接受生产者产生的消息持久化消息。在一个Broker集群中相同的BrokerName可以称为一个Broker组一个Broker组中,BrokerId为0的为主节点其它的为从节点。BrokerName和BrokerId是可以在Broker启动时通过配置文件配置的。每个Broker组只存放一部分消息。 生产者生产消息的一方就是生产者 生产者组一个生产者组可以有很多生产者只需要在创建生产者的时候指定生产者组那么这个生产者就在那个生产者组 消费者用来消费生产者消息的一方 消费者组跟生产者一样每个消费者都有所在的消费者组一个消费者组可以有很多的消费者不同的消费者组消费消息是互不影响的。 topic主题可以理解为一个消息的集合的名字生产者在发送消息的时候需要指定发到哪个topic下消费者消费消息的时候也需要知道自己消费的是哪些topic底下的消息。 Tag子主题比topic低一级可以用来区分同一topic下的不同业务类型的消息发送消息的时候也需要指定。
这里有组的概念是因为可以用来做到不同的生产者组或者消费者组有不同的配置这样就可以使得生产者或者消费者更加灵活。
工作流程
说完核心概念再来说一下核心的工作流程这里我先画了一张图。 通过这张图就可以很清楚的知道RocketMQ大致的工作流程 Broker启动的时候会往每台NameServer因为NameServer之间不通信所以每台都得注册注册自己的信息这些信息包括自己的ip和端口号自己这台Broker有哪些topic等信息。 Producer在启动之后会跟会NameServer建立连接定期从NameServer中获取Broker的信息当发送消息的时候会根据消息需要发送到哪个topic去找对应的Broker地址如果有的话就向这台Broker发送请求没有找到的话就看根据是否允许自动创建topic来决定是否发送消息。 Broker在接收到Producer的消息之后会将消息存起来持久化如果有从节点的话也会主动同步给从节点实现数据的备份 Consumer启动之后也会跟会NameServer建立连接定期从NameServer中获取Broker和对应topic的信息然后根据自己需要订阅的topic信息找到对应的Broker的地址然后跟Broker建立连接获取消息进行消费
就跟上面的图一样整体的工作流程还是比较简单的这里我简化了很多概念主要是为了好理解。
环境搭建
终于讲完了一些简单的概念接下来就来搭建一套RocketMQ的环境。
通过上面分析我们知道在RocketMQ中有NameServer、Broker、生产者、消费者四种角色。而生产者和消费者实际上就是业务系统所以这里不需要搭建真正要搭建的就是NameServer和Broker但是为了方便RocketMQ数据的可视化这里我多搭建一套可视化的服务。
搭建过程比较简单按照步骤一步一步来就可以完成如果提示一些命令不存在那么直接通过yum安装这些命令就行。
一、准备
需要准备一个linux服务器需要先安装好JDK
关闭防火墙
systemctl stop firewalld
systemctl disable firewalld下载并解压RocketMQ
1、创建一个目录用来存放rocketmq相关的东西
mkdir /usr/rocketmq
cd /usr/rocketmq2、下载并解压rocketmq
下载
wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip解压
unzip rocketmq-all-4.7.1-bin-release.zip看到这一个文件夹就完成了 然后进入rocketmq-all-4.7.1-bin-release文件夹
cd rocketmq-all-4.7.1-bin-releaseRocketMQ的东西都在这了 二、搭建NameServer
修改jvm参数
在启动NameServer之前强烈建议修改一下启动时的jvm参数因为默认的参数都比较大为了避免内存不够建议修改小当然如果你的内存足够大可以忽略。
vi bin/runserver.sh修改画圈的这一行 这里你可以直接修改成跟我一样的
-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize32m -XX:MaxMetaspaceSize50m
启动NameServer
修改完之后执行如下命令就可以启动NameServer了
nohup sh bin/mqnamesrv 查看NameServer日志
tail -f ~/logs/rocketmqlogs/namesrv.log如果看到如下的日志就说明启动成功了 NameServer日志
三、搭建Broker
这里启动单机版的Broker
修改jvm参数
跟启动NameServer一样也建议去修改jvm参数
vi bin/runbroker.sh将画圈的地方设置小点当然也别太小啊 当然你还是可以跟我设置的一样
-server -Xms1g -Xmx1g -Xmn512m修改Broker配置文件broker.conf
这里需要改一下Broker配置文件需要指定NameServer的地址因为需要Broker需要往NameServer注册
vi conf/broker.confBroker配置文件 Broker配置文件
这里就能看出Broker的配置了什么Broker集群的名称啊Broker的名称啊Broker的id啊都跟前面说的对上了。
在文件末尾追加地址
namesrvAddr localhost:9876因为NameServer跟Broker在同一台机器所以是localhostNameServer端口默认的是9876。
不过这里我还建议再修改一处信息因为Broker向NameServer进行注册的时候带过去的ip如果不指定就会自动获取但是自动获取的有个坑就是有可能你的电脑无法访问到这个自动获取的ip所以我建议手动指定你的电脑可以访问到的服务器ip。
我的虚拟机的ip是192.168.200.143所以就指定为192.168.200.143如下
brokerIP1 192.168.200.143
brokerIP2 192.168.200.143如果以上都配置的话最终的配置文件应该如下红圈的为新加的 启动Broker
nohup sh bin/mqbroker -c conf/broker.conf -c 参数就是指定配置文件
查看日志
tail -f ~/logs/rocketmqlogs/broker.log当看到如下日志就说明启动成功了 四、搭建可视化控制台
其实前面NameServer和Broker搭建完成之后就可以用来收发消息了但是为了更加直观可以搭一套可视化的服务。
可视化服务其实就是一个jar包启动就行了。
jar包可以从这获取 链接https://pan.baidu.com/s/16s1qwY2qzE2bxR81t5Wm6w 提取码s0sd 将jar包上传到服务器放到/usr/rocketmq的目录底下当然放哪都无所谓这里只是为了方便因为rocketmq的东西都在这里
然后进入/usr/rocketmq下执行如下命名
nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddrlocalhost:9876 -Dserver.port8088 rocketmq-console-ng-1.0.1.jar rocketmq.config.namesrvAddr就是用来指定NameServer的地址的
查看日志
tail -f ~/logs/consolelogs/rocketmq-console.log当看到如下日志就说明启动成功了 然后在浏览器中输入http://linux服务器的ip:8088/就可以看到控制台了如果无法访问可以看看防火墙有没有关闭 右上角可以把语言切换成中文 Broker集群信息 topic信息
通过控制台可以查看生产者、消费者、Broker集群等信息非常直观。
功能很多这里就不一一介绍了。
测试
环境搭好之后就可以进行测试了。
引入依赖
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.7.1/version
/dependency生产者发送消息
public class Producer {public static void main(String[] args) throws Exception {//创建一个生产者指定生产者组为sanyouProducerDefaultMQProducer producer new DefaultMQProducer(sanyouProducer);// 指定NameServer的地址producer.setNamesrvAddr(192.168.200.143:9876);// 第一次发送可能会超时我设置的比较大producer.setSendMsgTimeout(60000);// 启动生产者producer.start();// 创建一条消息// topic为 sanyouTopic// 消息内容为 三友的java日记// tags 为 TagAMessage msg new Message(sanyouTopic, TagA, 三友的java日记 .getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息并得到消息的发送结果然后打印SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);// 关闭生产者producer.shutdown();}}构建一个消息生产者DefaultMQProducer实例然后指定生产者组为sanyouProducer 指定NameServer的地址服务器的ip:9876因为需要从NameServer拉取Broker的信息 producer.start() 启动生产者 构建一个内容为三友的java日记的消息然后指定这个消息往sanyouTopic这个topic发送 producer.send(msg)发送消息打印结果 关闭生产者
运行结果如下
SendResult [sendStatusSEND_OK, msgIdC0A81FAF54F818B4AAC2475FD2010000, offsetMsgIdC0A8C88F00002A9F000000000009AE55, messageQueueMessageQueue [topicsanyouTopic, brokerNamebroker-a, queueId0], queueOffset0]sendStatusSEND_OK 说明发送成功了此时就能后控制台看到未消费的消息了。 到控制台看到消息那块然后选定发送的topic查询的时间范围手动再选一下不选就查不出来(我怀疑这是个bug)然后查询就能看到了一条消息。
然后点击一下MESSAGE DETAIL就能够看到详情。 这里就能看到发送消息的详细信息。
左下角消息的消费的消费因为我们还没有消费者订阅这个topic所以左下角没数据。
消费者消费消息
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 通过push模式消费消息指定消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(sanyouConsumer);// 指定NameServer的地址consumer.setNamesrvAddr(192.168.200.143:9876);// 订阅这个topic下的所有的消息consumer.subscribe(sanyouTopic, *);// 注册一个消费的监听器当有消息的时候会回调这个监听器来消费消息consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf(消费消息:%s, new String(msg.getBody()) \n);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.printf(Consumer Started.%n);}
}创建一个消费者实例对象指定消费者组为sanyouConsumer 指定NameServer的地址服务器的ip:9876 订阅 sanyouTopic 这个topic的所有信息 consumer.registerMessageListener 这个很重要是注册一个监听器这个监听器是当有消息的时候就会回调这个监听器处理消息所以需要用户实现这个接口然后处理消息。 启动消费者
启动之后消费者就会消费刚才生产者发送的消息于是控制台就打印出如下信息
Consumer Started.
消费消息:三友的java日记 此时再去看控制台 发现被sanyouConsumer这个消费者组给消费了。
SpringBoot环境下集成RocketMQ
集成
在实际项目中肯定不会像上面测试那样用都是集成SpringBoot的。
1、引入依赖
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.1.1/version
/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdversion2.1.1.RELEASE/version
/dependency2、yml配置
rocketmq:producer:group: sanyouProducername-server: 192.168.200.143:98763、创建消费者
SpringBoot底下只需要实现RocketMQListener接口然后加上RocketMQMessageListener注解即可
Component
RocketMQMessageListener(consumerGroup sanyouConsumer, topic sanyouTopic)
public class SanYouTopicListener implements RocketMQListenerString {Overridepublic void onMessage(String msg) {System.out.println(处理消息: msg);}}RocketMQMessageListener需要指定消费者属于哪个消费者组消费哪个topicNameServer的地址已经通过yml配置文件配置类
4、测试
SpringBootTest(classes RocketMQApplication.class)
RunWith(SpringRunner.class)
public class RocketMQTest {Autowiredprivate RocketMQTemplate template;Testpublic void send() throws InterruptedException {template.convertAndSend(sanyouTopic, 三友的java日记);Thread.sleep(60000);}}直接注入一个RocketMQTemplate然后通过RocketMQTemplate发送消息。
运行结果如下
处理消息:三友的java日记的确消费到消息了。
原理
其实原理是一样的只不过在SpringBoot中给封装了一层让使用起来更加简单。
1、RocketMQTemplate构造代码 所以从这可以看出最终在构造RocketMQTemplate的时候传入了一个DefaultMQProducer所以可想而知最终RocketMQTemplate发送消息也是通过DefaultMQProducer发送的。
2、RocketMQMessageListener 注解处理 从这可以看出会为每一个加了RocketMQMessageListener注解的对象创建一个DefaultMQPushConsumer所以最终也是通过DefaultMQPushConsumer消费消息的。
至于监听器是在这 遍历每条消息然后调用handleMessage最终会调用实现了RocketMQListener的对象处理消息。