商城型网站建设多少钱,管理系统考生端重置密码,今天的新闻内容摘抄30字,网站建设需求分析的实施一、前言
1、Kafka简介
Kafka是一个开源的分布式消息引擎/消息中间件#xff0c;同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息#xff0c;同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Had…一、前言
1、Kafka简介
Kafka是一个开源的分布式消息引擎/消息中间件同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等)
Kafka最核心的最成熟的还是他的消息引擎所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外Kafka也是目前性能最好的消息中间件。
2、Kafka架构 在Kafka集群(Cluster)中一个Kafka节点就是一个Broker消息由Topic来承载可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。
概念/对象简单说明BrokerKafka节点Topic主题用来承载消息Partition分区用于主题分片存储Producer生产者向主题发布消息的应用Consumer消费者从主题订阅消息的应用Consumer Group消费者组由多个消费者组成
3、准备工作
1、Kafka服务器
准备3台CentOS服务器并配置好静态IP、主机名
服务器名IP说明kafka01192.168.88.51Kafka节点1kafka02192.168.88.52Kafka节点2kafka03192.168.88.53Kafka节点3
软件版本说明
项说明Linux ServerCentOS 7Kafka2.3.0
2、ZooKeeper集群
Kakfa集群需要依赖ZooKeeper存储Broker、Topic等信息这里我们部署三台ZK
服务器名IP说明zk01192.168.88.21ZooKeeper节点zk02192.168.88.22ZooKeeper节点zk03192.168.88.23ZooKeeper节点
部署过程参考https://ken.io/note/zookeeper-cluster-deploy-guide
二、部署过程
1、应用数据目录
#创建应用目录
mkdir /usr/kafka#创建Kafka数据目录
mkdir /kafka
mkdir /kafka/logs
chmod 777 -R /kafka2、下载解压
Kafka官方下载地址https://kafka.apache.org/downloads 这次我下载的是2.3.0版本
#创建并进入下载目录
mkdir /home/downloads
cd /home/downloads#下载安装包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz #解压到应用目录
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafkakafka_2.12-2.3.0.tgz 其中2.12是Scala编译器的版本2.3.0才是Kafka的版本 3、Kafka节点配置
#进入应用目录
cd /usr/kafka/kafka_2.12-2.3.0/#修改配置文件
vi config/server.properties通用配置
配置日志目录、指定ZooKeeper服务器
# A comma separated list of directories under which to store log files
log.dirs/kafka/logs# root directory for all kafka znodes.
zookeeper.connect192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181分节点配置
Kafka01
broker.id0#listenersPLAINTEXT://:9092
listenersPLAINTEXT://192.168.88.51:9092Kafka02
broker.id1#listenersPLAINTEXT://:9092
listenersPLAINTEXT://192.168.88.52:9092Kafka03
broker.id2#listenersPLAINTEXT://:9092
listenersPLAINTEXT://192.168.88.53:90924、防火墙配置
#开放端口
firewall-cmd --add-port9092/tcp --permanent#重新加载防火墙配置
firewall-cmd --reload5、启动Kafka
#进入kafka根目录
cd /usr/kafka/kafka_2.12-2.3.0/
#启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties #启动成功输出示例(最后几行)
[2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,185] INFO [KafkaServer id0] started (kafka.server.KafkaServer)三、Kafka测试
1、创建Topic
在kafka01(Broker)上创建测试Tpoictest-ken-io这里我们指定了3个副本、1个分区
bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-ioTopic在kafka01上创建后也会同步到集群中另外两个Brokerkafka02、kafka03
2、查看Topic
我们可以通过命令列出指定Broker的
bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:90923、发送消息
这里我们向Broker(id0)的Topictest-ken-io发送消息
bin/kafka-console-producer.sh --broker-list 192.168.88.51:9092 --topic test-ken-io#消息内容test by ken.io4、消费消息
在Kafka02上消费Broker03的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning在Kafka03上消费Broker02的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning然后均能收到消息
test by ken.io这是因为这两个消费消息的命令是建立了两个不同的Consumer 如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工1个消息同时只会被一个Consumer消费到
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_kenbin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken四、备注
1、Kafka常用配置项说明
Kafka常用Broker配置说明
配置项默认值/示例值说明broker.id0Broker唯一标识listenersPLAINTEXT://192.168.88.53:9092监听信息PLAINTEXT表示明文传输log.dirskafka/logskafka数据存放地址可以填写多个。用”,”间隔message.max.bytesmessage.max.bytes单个消息长度限制单位是字节num.partitions1默认分区数log.flush.interval.messagesLong.MaxValue在数据被写入到硬盘和消费者可用前最大累积的消息的数量log.flush.interval.msLong.MaxValue在数据被写入到硬盘前的最大时间log.flush.scheduler.interval.msLong.MaxValue检查数据是否要写入到硬盘的时间间隔。log.retention.hours24控制一个log保留时间单位小时zookeeper.connect192.168.88.21:2181ZooKeeper服务器地址多台用”,”间隔
2、附录
https://kafka.apache.org/https://zh.wikipedia.org/zh-cn/Kafka