做网站课程报告,设计师常备设计网站大全,微信h5页面制作软件哪个好,windows优化大师自动下载#x1f61c;作 者#xff1a;是江迪呀✒️本文关键词#xff1a;RocketMQ、消息队列☀️每日 一言#xff1a;在你心灰意冷、心烦意乱时也不要停下你的脚步#xff01; 一、前言
RocketMQ#xff08;Apache RocketMQ#xff09;是一种开源的分布式消息中间… 作 者是江迪呀✒️本文关键词RocketMQ、消息队列☀️每日 一言在你心灰意冷、心烦意乱时也不要停下你的脚步 一、前言
RocketMQApache RocketMQ是一种开源的分布式消息中间件系统最初由阿里巴巴开发并捐赠给 Apache 基金会。它提供了可靠的、低延迟的消息传递能力适用于构建大规模分布式系统中的消息通信。RocketMQ 主要用于解决分布式系统中异步通信、解耦、流量削峰等问题。下面让我们一起看下如何在Linux上部署RocketMQ~
二、介绍RocketMQ
2.1 RocketMQ产生背景
随着业务规模的扩大阿里巴巴面临着越来越多的分布式系统构建需求。为了解决这个问题阿里巴巴集团于2012年推出的开源分布式消息中间件 —— RocketMQ。
2.1 RocketMQ作用
1异步通信和解耦 RocketMQ可以在不同的服务之间实现异步通信解耦了服务之间的紧耦合关系提高了系统的可维护性和可扩展性。 2流量削峰 RocketMQ支持消息积压和消费速率不匹配时的流量削峰功能防止系统因突发流量而崩溃。 3实时数据同步 用于将数据实时同步到不同的存储介质保持数据的一致性。 4事件驱动架构 RocketMQ支持事件驱动的架构使得系统能够更加敏捷地响应业务事件。
2.2 RocketMQ的组件
RocketMQ 的主要组件包括 1Producer负责发送消息到 RocketMQ 服务器。 2Broker消息中转服务器负责存储消息并提供消息的读写服务。 3Consumer从 Broker 订阅并消费消息。 4Topic消息的分类Producer 发送消息到特定的 TopicConsumer 订阅相应的 Topic。 5Tag 对消息的进一步分类可以用于 Consumer 进一步过滤消息。 6Message Queue 每个 Topic 下可以分成多个 Message Queue实现消息的分区和负载均衡。
2.3 RocketMQ的优缺点
1优点
高吞吐量 RocketMQ具有高吞吐量的特点适用于大量消息的处理。可靠性 RocketMQ通过消息的持久化存储和复制机制确保消息不会丢失。低延迟 RocketMQ在消息传递过程中能够保持较低的延迟适用于实时性要求较高的场景。灵活的消息模式 支持发布-订阅和点对点两种消息模式根据业务需求进行选择。水平扩展 可以通过增加Broker节点来实现水平扩展提高消息处理能力。
2缺点
维护成本 RocketMQ需要维护多个组件包括Producer、Broker和Consumer等涉及到一定的运维成本。学习曲线 对于新手来说学习和理解RocketMQ的一些概念和配置可能需要一定的时间。一致性保障 虽然RocketMQ通过复制机制保障了消息的可靠性但在极端情况下可能会存在消息的重复传递或乱序问题。
三、 RocketMQ如何部署
3.1 下载
RocketMQ下载地址
3.2 上传、解压
上传文件到Linux有两种方式
1上传
通过rz命令
rz你可以使用rz命令在使用这个命令之前你必须确保linux已经安装了lrzsz安装命令如下:
sudo apt-get update
sudo apt-get install lrzsz使用xftp 这个我就不赘述了。
2解压
unzip rocketmq-all-4.5.2-bin-release.zip如果没有安装unzip需要安装一下
// 查看 unzip 包的安装情况
yum list unzip
//没有安装时使用命令安装 unzip
yum list unzipyum install unzip.x86_643.2 启动RocketMQ
RocketMQ的启动主要涉及到Namesrv命名服务和Broker消息存储和消费者服务两部分。要想启动RocketMQ首先进入解压后的bin目录
cd rocketmq-all-4.5.2-bin-release/bin1启动Namesrv并设置输出日志位置
nohup sh mqnamesrv namesrv.log 21 2启动Broker并设置输出日志位置
nohup sh mqbroker -n localhost:9876 broker.log 21 查看是否启动
jps输出下面的内容说明启动成功了
2931 NamesrvStartup
25599 Jps
25583 BrokerStartup在启动Broker会出现失败问题一般来说就是内存不足 RocketMq默认的虚拟机内存较大因而启动失败需要编辑如下两个配置文件修改jvm的内存大小
//编辑runbroker.sh和runserver.sh修改默认的JVM大小
vim runbroker.sh
vim runserver.sh 修改为
JAVA_OPT${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -xx:metaspaceSize128m -XX:MaxMetaspaceSize320m如果还是启动不了需要将NameServer关闭重新启动一下同样是先进入bin目录关闭命令如下
sh mqshutdown namesrv四、测试与关闭
4.1 测试
1 发送消息生产者
//设置环境变量
export NAMESRV_ADDRlocalhost:9876
//使用安装包的demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer上面的信息就是RocketMQ的producer发送的消息。特点启动发送完毕消息后就会停止。
2 接收消息消费者
//设置环境变量
export NAMESRV_ADDRlocalhost:9876
//接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer4.2 关闭RocketMQ
//关闭namesrv
sh bin/mqshutdown namesrv
//关闭Broker
sh bin/mqshutdown broker五、SpringBoot连接RocketMQ
5.1 引入依赖
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-thymeleaf/artifactIdversion2.3.5.RELEASE/version/dependencydependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.5.2/version /dependency
/dependencies
5.2 配置文件application.properties
# Name Server地址
rocketmq.name-serveryour-nameserver-ip:9876
# 生产者组名
rocketmq.producer.groupmy-producer-group
# 消费者组名
rocketmq.consumer.groupmy-consumer-group5.3 生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;Component
public class RocketMQProducer {Value(${rocketmq.name-server})private String nameServer;Value(${rocketmq.producer.group})private String producerGroup;public void sendMessage(String topic, String message) throws Exception {DefaultMQProducer producer new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(nameServer);producer.start();// 创建消息对象设置消息内容org.apache.rocketmq.common.message.Message msg new org.apache.rocketmq.common.message.Message(topic, message.getBytes());// 发送消息producer.send(msg);producer.shutdown();}
}
5.4 消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;Component
public class RocketMQConsumer {Value(${rocketmq.name-server})private String nameServer;Value(${rocketmq.consumer.group})private String consumerGroup;public void startConsumer(String topic) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(nameServer);// 订阅主题和标签可以根据需要进行过滤consumer.subscribe(topic, *);// 注册消息监听器consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) - {for (org.apache.rocketmq.common.message.MessageExt msg : msgs) {System.out.println(Received message: new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}5.5 启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;SpringBootApplication
public class RocketMQDemoApplication {public static void main(String[] args) throws Exception {ConfigurableApplicationContext context SpringApplication.run(RocketMQDemoApplication.class, args);RocketMQProducer producer context.getBean(RocketMQProducer.class);producer.sendMessage(my-topic, Hello, RocketMQ!);RocketMQConsumer consumer context.getBean(RocketMQConsumer.class);consumer.startConsumer(my-topic);}
}
六、RocketMQ集群
上面所述的是单体RocketMQ也能使用。但是如果你想要实现高可用在实际的业务场景中。RocketMQ大部分都不会单体存在需要搭建集群来实现高可用。
有人已经写好了而且很详细传送门