专做logo网站叫什么,做搬家服务网站问卷调查结果,网络教学平台登录,网站推广打包第 4 章 Kafka Broker
4.1 Kafka Broker 工作流程
4.1.1 Zookeeper 存储的 Kafka 信息
#xff08;1#xff09;启动 Zookeeper 客户端。
bin/zkCli.sh 因为你在配置kafka的时候指定了它的名字。 #xff08;2#xff09;通过 ls 命令可以查看 kafka 相关信息。
[zk: …第 4 章 Kafka Broker
4.1 Kafka Broker 工作流程
4.1.1 Zookeeper 存储的 Kafka 信息
1启动 Zookeeper 客户端。
bin/zkCli.sh 因为你在配置kafka的时候指定了它的名字。 2通过 ls 命令可以查看 kafka 相关信息。
[zk: localhost:2181(CONNECTED) 2] ls /kafka zk中有一个节点 consumers 这个里面老版本0.9版本之前存放的是消费者的偏移量offset这次消费者消费到哪个地方了下次从这个地方继续消费新版本的根本没放在zk中直接放在集群中了。 可以借助一个工具漂亮的zoo通过图形化界面查看zk中的消息。 4.1.2 Kafka Broker 总体工作流程 1每一个broker上线时会在zk中进行注册
2每个broker中都有一个controllercontroller会争先抢占zk中 controller节点的注册权谁先抢到谁选举时说了算。假如broker0中的controller中抢到了那它就是说了算的人。该controller一直监听ids节点是否有挂掉的节点。
3选举规则是在ISR中存活为前提按照AR中排在前面的优先例如 ar[1,0,2] ,isr[1,0,2],那么Leader会按照102 进行顺序的轮询。
4选举出来的新节点注册到zk中将信息记录在zk中。
5其他contorller将zk中的信息同步下来。
6假定broker中的leader挂掉了会进行重新的选举。
7客户端发送消息给LeaderLeader记录数据落盘形成LogLog底层使用的是SegmentSegment底层每一个G是一个单独的文件1G内的数据要想查找迅速又分成了两个文件 log和index. 模拟 Kafka 上下线Zookeeper 中数据变化
1查看/kafka/brokers/ids 路径上的节点。
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[0, 1, 2]
2停止 hadoop104 上的 kafka。
bin/kafka-server-stop.sh
3再次查看/kafka/brokers/ids 路径上的节点。
[zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids
[0, 1]
实操将hadoop13停止掉会重新选举两次查看的对比图如下 4.2 生产经验——节点服役和退役
4.2.1 服役新节点
1新节点准备
1关闭 bigdata03进行一个快照并右键执行克隆操作。
2开启 bigdata04并修改 IP 地址。
vi /etc/sysconfig/network-scripts/ifcfg-ens33修改完记得重启网卡
systemctl restart network
3在 bigdata04 上修改主机名称为 bigdata04。
hostname bigdata04 # 临时修改
[rootbigdata04 ~]# vim /etc/hostname
bigdata04
还要记得修改 /etc/hosts文件并进行同步
修改bigdata01的hosts 文件修改完之后记得同步一下192.168.52.11 bigdata01
192.168.52.12 bigdata03
192.168.52.13 bigdata02
192.168.52.14 bigdata04xsync.sh /etc/hosts
scp -r /etc/hosts rootbigdata04:/etc/ 4重新启动 bigdata03、bigdata04。 5修改 bigdata04 中 kafka 的 broker.id 为 3。
进入bigdata04的kafka中修改里面的配置文件 config/server.properties
6删除 bigdata04 中 kafka 下的 datas 和 logs。
rm -rf datas/* logs/*
7启动 bigdata01、bigdata02、bigdata03 上的 kafka 集群。
先启动zk集群
xcall.sh zkServer.sh stop
xcall.sh zkServer.sh start
启动kafka集群只能启动三台
kf.sh start
8单独启动 bigdata04 中的 kafka。
bin/kafka-server-start.sh -daemon ./config/server.properties
查看kafka集群first主题的详情
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --topic first --describe
发现副本数并没有增加。由于我之前创建first这个主题的时候只有一个副本不是三个副本所以呢演示效果不佳。
kafka-topics.sh --bootstrap-server bigdata01:9092 --topic third --create --partitions 3 --replication-factor 3 2执行负载均衡操作
1创建一个要均衡的主题
创建一个文件vi topics-to-move.json
写上如下代码如果多个topic 可以使用,分隔
{topics: [{topic: third}],version: 1
}
2生成一个负载均衡的计划
在创建的时候记得启动bigdata04节点否则计划中还是没有bigdata04
bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2,3 --generate 未来的分区策略拷贝一份
{version:1,partitions:[{topic:third,partition:0,replicas:[3,0,1],log_dirs:[any,any,any]},{topic:third,partition:1,replicas:[0,1,2],log_dirs:[any,any,any]},{topic:third,partition:2,replicas:[1,2,3],log_dirs:[any,any,any]}]}
3创建副本存储计划所有副本存储在 broker0、broker1、broker2、broker3 中。
vi increase-replication-factor.json
{version:1,partitions:[{topic:first,partition:0,replicas:[3,2,0],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[0,3,1],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[1,0,2],log_dirs:[any,any,any]}]}
以上这个内容来自于第二步的执行计划。
4执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --execute
5验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --verify
如果不相信添加成功可以查看first节点的详情 4.2.2 退役旧节点
1执行负载均衡操作
先按照退役一台节点生成执行计划然后按照服役时操作流程执行负载均衡。
1创建一个要均衡的主题
kafka下添加文件vim topics-to-move.json
添加如下内容
{topics: [{topic: third}],version: 1
}
2创建执行计划。
bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2 --generate
3创建副本存储计划所有副本存储在 broker0、broker1、broker2 中。
添加文件 vi increase-replication-factor.json
添加如下代码
{version:1,partitions:[{topic:first,partition:0,replicas:[0,2,1],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[1,0,2],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[2,1,0],log_dirs:[any,any,any]}]}
4执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop11:9092 --reassignment-json-file increase-replication-factor.json --execute
5验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop11:9092 --reassignment-json-file increase-replication-factor.json --verify 2执行停止命令
在 bigdata04上执行停止命令即可。
bin/kafka-server-stop.sh
4.3 Kafka 副本
4.3.1 副本基本信息 --副本又叫副本因子
1Kafka 副本作用提高数据可靠性。
2Kafka 默认副本 1 个生产环境一般配置为 2 个保证数据可靠性太多副本会增加磁盘存储空间增加网络上数据传输降低效率。
3Kafka 中副本分为Leader 和 Follower。Kafka 生产者只会把数据发往 Leader 然后 Follower 找 Leader 进行同步数据。
4Kafka 分区中的所有副本包含Leader统称为 ARAssigned Repllicas。
AR ISR OSR
ISR表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定默认 30s。Leader 发生故障之后就会从 ISR 中选举新的 Leader。
OSR表示 Follower 与 Leader 副本同步时延迟过多的副本.
4.3.2 Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader负责管理集群broker 的上下线所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。 1创建一个新的 topic4 个分区4 个副本
bin/kafka-topics.sh --bootstrap-server hadoop11:9092 --create --topic bigdata2308 --partitions 4 --replication-factor 4
2查看 Leader 分布情况 bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic bigdata2305
Topic: bigdata2301 Topic: bigdata2301 Partition: 0 Leader: 0 Replicas: 0,2,3,1 Isr: 0,2,3,1Topic: bigdata2301 Partition: 1 Leader: 2 Replicas: 2,3,1,0 Isr: 2,3,1,0Topic: bigdata2301 Partition: 2 Leader: 3 Replicas: 3,1,0,2 Isr: 3,1,0,2Topic: bigdata2301 Partition: 3 Leader: 1 Replicas: 1,0,2,3 Isr: 1,0,2,3 3停止掉 hadoop13 的 kafka 进程并查看 Leader 分区情况
bin/kafka-server-stop.shbin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe
--topic bigdata2305 Topic: bigdata2301 Partition: 0 Leader: 0 Replicas: 0,2,3,1 Isr: 0,2,1
Topic: bigdata2301 Partition: 1 Leader: 2 Replicas: 2,3,1,0 Isr: 2,1,0
Topic: bigdata2301 Partition: 2 Leader: 1 Replicas: 3,1,0,2 Isr: 1,0,2
Topic: bigdata2301 Partition: 3 Leader: 1 Replicas: 1,0,2,3 Isr: 1,0,2 4停止掉 hadoop14 的 kafka 进程并查看 Leader 分区情况 通过以上演示大家可以发现选举是按照AR(跟Replicas一样)进行的而不是ISR 4.3.3 Leader 和 Follower 故障处理细节 LEO演示-- 每一个副本最后的偏移量offset 1 HW(高水位线 High Water) 演示所有副本中最小的LEO 由于数据同步的时候先进入Leader,随后同步给Follower假如Follower挂掉了Leader和其他的Follower 继续往前存储数据挂掉的节点从ISR集合中剔除此时挂掉的Follower又重启了它会先从上一次挂掉的节点的HW开始同步数据直到追上最后一个Follower为止,此时会重新回归ISR。 4.3.4 分区副本分配
如果 kafka 服务器只有 4 个节点那么设置 kafka 的分区数大于服务器台数在 kafka底层如何分配存储副本呢 答案是否定的。
1创建 16 分区3 个副本
1创建一个新的 topic名称为 second。
bin/kafka-topics.sh --bootstrap-server hadoop11:9092 --create --partitions 16 --replication-factor 3 --topic second bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic bigdata230801 --partitions 3 --replication-factor 4
假如你有3个broker ,却创建4个副本报错Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
[2023-09-13 18:43:47,458] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic bigdata23 --partitions 4 --replication-factor 2
假如你有3个broker ,却创建4个分区是可以的。
以上错误的意思是目前只有2台服务器却要创建3个副本创建不了。
2查看分区和副本情况
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic second Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1 kafka在进行初始化的时候选举谁当第一Leader是有一定的算法的。算法保障了Leader不在一个broker里面。 4.3.5 生产经验——手动调整分区副本的存储 在生产环境中每台服务器的配置和性能不一致但是Kafka只会根据自己的代码规则创建对应的分区副本就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
需求创建一个新的topic4个分区两个副本名称为three。将 该topic的所有副本都存储到broker0和broker1两台服务器上。 手动调整分区副本存储的步骤如下
1创建一个新的 topic名称为 three。
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 4 --replication-factor 2 --topic three
2查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic three
3创建副本存储计划所有副本都指定存储在 broker0、broker1 中。
vi increase-replication-factor.json
输入如下内容
{ version:1, partitions:[{topic:three,partition:0,replicas:[0,1]}, {topic:three,partition:1,replicas:[0,1]}, {topic:three,partition:2,replicas:[1,0]}, {topic:three,partition:3,replicas:[1,0]}]
}
4执行副本存储计划。
屁股决定脑袋
bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --execute
5验证副本存储计划。
bin/kafka-reassign-partitions.sh -- bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --verify
6查看分区副本存储情况
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic three 4.3.6 生产经验——Leader Partition 负载平衡
正常情况下Kafka本身会自动把Leader Partition均匀分散在各个机器上来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机会导致Leader Partition过于集中在其他少部分几台broker上这会导致少数几台broker的读写请求压力过高其他宕机的broker重启之后都是follower partition读写请求很低造成集群负载不均衡。 auto.leader.rebalance.enable默认是true。 自动Leader Partition 平衡
• leader.imbalance.per.broker.percentage 默认是10%。每个broker允许的不平衡 的leader的比率。如果每个broker超过 了这个值控制器会触发leader的平衡。
• leader.imbalance.check.interval.seconds 默认值300秒。检查leader负载是否平衡的间隔时间。 从以上可以看出Leader 0明明是3需要变为Leader,就说明这个中有Leader挂了再重启的情况4个节点一个节点不平衡1/4 10%,所以会触发再平衡其他节点也是一样的。
生产环境下该值默认为true一般修改为false因为不影响正常的使用再平衡会造成资源的浪费。 4.3.7 生产经验——增加副本因子
在生产环境当中由于某个主题的重要等级需要提升我们考虑增加副本。副本数的
增加需要先制定计划然后根据计划执行。
1创建 topic
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 3 --replication-factor 1 --topic four
通过命令行修改副本是否成功
分区是可以通过语句修改的只能改多不能改少副本创建以后就不能直接修改了。
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --partitions 3 --replication-factor 3 --topic four
没办法使用命令修改的。
2手动增加副本存储
通过命令查看副本情况
1创建副本存储计划所有副本都指定存储在 broker0、broker1、broker2 中。
vi increase-replication-factor.json
添加如下内容
{version:1,partitions:[{topic:four,partition:0,replicas:[0,1,2]},{topic:four,partition:1,replicas:[0,1,2]},{topic:four,partition:2,replicas:[0,1,2]}]}
2执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop11:9092 --reassignment-json-file increase-replication-factor.json --execute
查看副本情况 4.4 文件存储
4.4.1 文件存储机制 重要
1Topic 数据的存储机制
Topic是逻辑上的概念而partition是物理上的概念每个partition对应于一个log文件该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端为防止log文件过大导致数据定位效率低下Kafka采取了分片和索引机制将每个partition分为多个segment。每个segment包括“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下该文件夹的命名规则为topic名称分区序号例如first-0。 2思考Topic 数据到底存储在什么位置
1启动生产者并发送消息。
bin/kafka-console-producer.sh -- bootstrap-server bigdata01:9092 --topic first
hello world
2查看 hadoop11或者 hadoop12、hadoop13的/opt/installs/kafka3/datas/first-1 first-0、first-2路径上的文件。 进入查看ls
00000000000000000092.index
00000000000000000092.log
00000000000000000092.snapshot
00000000000000000092.timeindex
leader-epoch-checkpoint
partition.metadata
3直接查看 log 日志发现是乱码
4通过工具查看 index 和 log 信息。
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log kafka存储数据的时候使用的是稀疏索引所以运行速度快。 4.4.2 文件清理策略
Kafka 中默认的日志这个地方是数据的意思就是Segment保存时间为 7 天可以通过调整如下参数修改保存时间。
log.retention.hours最低优先级小时默认 7 天。
log.retention.minutes分钟。 --如果设置了该值小时的设置不起作用。
log.retention.ms最高优先级毫秒。 --如果设置了该值分钟的设置不起作用。 log.retention.check.interval.ms负责设置检查周期默认 5 分钟。
那么日志一旦超过了设置的时间怎么处理呢
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
1delete 日志删除将过期数据删除
log.cleanup.policy delete 所有数据启用删除策略
1基于时间默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
2基于大小默认关闭。超过设置的所有日志总大小删除最早的 segment。
log.retention.bytes默认等于-1表示无穷大。
思考如果一个 segment 中有一部分数据过期一部分没有过期怎么处理 2compact 日志压缩(合并的意思不是真的压缩)
compact日志压缩对于相同key的不同value值只保留最后一个版本。 log.cleanup.policy compact 所有数据启用压缩策略 压缩后的offset可能是不连续的比如上图中没有6当从这些offset消费消息时将会拿到比这个offset大的offset对应的消息实际上会拿到offset为7的消息并从这个位置开始消费。
这种策略只适合特殊场景比如消息的key是用户IDvalue是用户的资料通过这种压缩策略整个消息集里就保存了所有用户最新的资料。
比如张三 去年18岁今年19岁这种场景下可以进行压缩。 4.5 高效读写数据 面试题
1Kafka 本身是分布式集群可以采用分区技术并行度高
2读数据采用稀疏索引可以快速定位要消费的数据。mysql中索引多了之后写入速度就慢了
3顺序写磁盘
Kafka 的 producer 生产数据要写入到 log 文件中写的过程是一直追加到文件末端 为顺序写。官网有数据表明同样的磁盘顺序写能到 600M/s而随机写只有 100K/s。这与磁盘的机械机构有关顺序写之所以快是因为其省去了大量磁头寻址的时间。
4页缓存 零拷贝技术
零拷贝Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据所以就不用 走应用层传输效率高
PageCache页缓存Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时操作系统只是将数据写入 PageCache。当读操作发生时先从PageCache中查找如果找不到再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用 生产者将数据发送给kafka,kafka将数据交给Linux内核Linux内核将数据放入自身操作系统的页缓存中然后到一定值写入磁盘假如消费者过来消费直接从页缓存中通过网卡发送给消费者根本就没有去kafka的业务系统中获取数据所以速度比较快。
跟这个问题非常像mysql读取数据的速度为什么这么快--buffer pool