招聘网站比对表怎么做,县蒙文网站建设汇报,wordpress5.6,建设一个大型网站需要多少钱【README】
本文阐述了kafka可靠消息传递机制#xff1b;
本文部分内容总结于《kafka权威指南》#xff08;一本好书#xff0c;墙裂推荐#xff09;#xff0c;再加上自己的理解#xff1b; 【1】可靠性保证
1#xff0c;在讨论可靠性时#xff0c;一般使用保证这个…【README】
本文阐述了kafka可靠消息传递机制
本文部分内容总结于《kafka权威指南》一本好书墙裂推荐再加上自己的理解 【1】可靠性保证
1在讨论可靠性时一般使用保证这个词
保证指的是 确保系统在各种不同的环境下能够发生一致的行为
2kafka在哪些方面做了保证呢
保证分区消息顺序只有当消息被写入分区所有副本时它才被认为是已提交的无论生产者acks设置为多少只要有一个副本是活跃的则已提交消息就不会丢失消费者只能读取已提交消息【2】复制
1kafka复制机制和分区多副本架构是kafka可靠性保证的核心
2把消息写入多个副本可以使kafka 在发生崩溃时仍能保证消息的持久性
3分区分区是kafka存储数据的基本单位一个主题的数据被分到多个分区进行存储分区内的数据是有序的
4分区副本每个分区可以有多个副本副本又分为首领副本跟随者副本生产者消费者只与首领副本交互跟随者副本只需要及时从首领副本复制最新事件以与首领副本保持同步
当首领副本不可用时其中一个同步副本选举为新首领注意是同步副本才可以选举为新首领
5跟随者副本成为同步副本的条件
与zk有一个活跃的会话即在过去6s内可配置向zk发送心跳在过去10s内可配置从首领获取过最新消息
【补充】非同步副本问题
1如果一个副本在同步与不同步状态间频繁切换说明集群内部出现了问题通常是由于jvm 不恰当gc导致的需要优化系统性能2一个滞后的同步副本会导致生产者消费者变慢因为消息被认为已提交前客户端会等待所有同步副本接收消息 2.1 当一个副本不再同步了我们就不需要关心它是否接收到消息参见跟随者副本称为同步副本的条件即一个同步副本变为不同步中间的时长是可以配置的2.2更少的同步副本意味着更低的有效复制系统发生宕机时丢失数据的风险更大【3】broker配置
1 broker指的是kakfa服务器又称中心点算是知识review了
2broker有3个参数影响可靠性非常重要*
副本系数复制系数每个分区副本数量是否不完全首领选举非同步副本在首领不可用时是否可以被选为首领最少同步副本消息被认为已提交时消息被写入的最少副本个数
2.1创建topic的复制系数replication.factor
每个分区有多少个副本 建议在要求可用性场景里把副本系数设置为3
2.2是否允许不完全的首领选举unclean.leader.election
在首领不可用但其他副本都是不同步的我们应该怎么办情况1分区有3个副本1个正常的首领副本2个不可用的跟随者副本消息被写入首领副本后首领所在broker宕机了这个时候如果之前的一个跟随者副本重新启动他就成为了分区的唯一不同步副本 问题来了是否选择它作为首领副本即便它是不同步副本 情况2分区有3个副本1个正常首领副本2个因为网络问题导致同步滞后的跟随者副本尽管跟随者副本还在复制消息但已经不同步了首领副本作为唯一同步副本还在接收生产者消费者请求。这个时候如果首领不可用另外两个副本就再也无法变成同步的了 问题来了是否选择它作为首领副本即便它是不同步副本 如何选择
选择1如果不同步副本不能提升为新首领则分区在旧首领恢复前都是不可用的有时候这种状态会持续数个小时在旧首领恢复前会导致整个集群不可用甚至长时间不可用选择2如果不同步副本提升为新首领则在这个副本变为不同步之后写入旧首领的消息全部丢失这会导致数据不一致问题
这需要我们根据具体的业务场景在系统可用性和一致性两方面做出权衡
小结
如果允许不同步副本成为首领就要承担丢失数据和出现数据不一致的风险如果不允许不同步副本成为首领就要接收较低的可用性因为必须原地等待旧首领恢复正常
不完全首领选举的意思就是 允许不同步副本成为首领unclean.leader.election 设置为true
2.3最少同步副本min.insync.replicas
默认情况下一条消息被写入所有副本才被认为是已提交的才可以继续向分区写入和消费下一条数据
如果设置了 min.insync.replicasX则一条消息被写入了X个副本而无需写入所有副本则消息就会被认为已提交
如对于一个包含3个副本的主题若 min.insync.replicas 设置为2则至少要存在两个同步副本才能向分区写入数据若只剩下一个同步副本集群就变成只读了这是为了避免在发生不完全选举时数据的写入和读取出现非预期的行为数据不一致 【4】在可靠系统里使用生产者
即便把broker配置为可靠生产者若没有进行可靠性配置仍有可能发生数据丢失风险
即kafka可靠系统依赖 broker生产者消费者这三者的可靠性配置
1看个例子 情况1为broker配置了3个副本禁用不完全首领选举生产者发送消息设置acks1 生产者发送消息A给首领首领成功写入并告诉生产者成功写入但跟随者副本还没有收到这个消息这是首领崩溃了而此时消息还没有被跟随者副本复制过去。 结果另外两个副本仍然被认为是同步的毕竟判断一个副本不同步需要一段时间而其中一个副本称为新首领。 对于生产者来说 消息A成功写入了对于消费者来说因为消息A没有被复制到所有副本即不会认为已提交所以消费者是看不到消息A的它认为数据是一致的没有丢失消息小结从生产者角度来讲实际上就是丢失了一条消息即便kafka系统看起来数据是一致的 情况2为broker配置了3个副本禁用不完全首领选举生产者发送消息设置acksall 生产者往kafka发送消息分区首领刚好崩溃了新首领正在选举中kafka会向生产者返回首领不可用的响应 这个时候若生产者没有正确处理错误或没有重试直到发送成功则消息就有可能丢失 结果 这不算broker可靠性问题因为broker没有收到这个消息这也不是不一致性问题因为消费者没有读到这个消息问题在于 生产者没有正确处理错误弄丢消息的是它自己 2如何避免上述两种问题
根据可靠性需求配置恰当acks值在参数配置和代码里正确处理错误
【4.1】发送确认
acks的3个值 0 1 all
0 表示 发送到kafka broker就认为写入成功而不管是否写入首领副本和所有副本1 表示消息写入首领副本就算成功all 表示消息写入所有副本才算成功
【4.2】配置生产者重试次数
1错误 生产者需要处理的错误分为两类包括自动处理的错误手动处理的错误
2重试 若broker返回的错误可以通过重试来解决则生产者自动处理这些错误
3错误响应码
重试之后可以解决的如 首领不可用错误-LEADER_NOT_AVAILABLE重试几次首领选举完成消息成功写入重试之后无法解决的如 配置错误消息大小超过阈值
注意重试发送一个已经失败的消息会带来风险因为如果两个消息都写入成功则消息重复这需要消费者在处理消息时保证幂等性
幂等性 服务器对先后多次相同客户端请求的响应是相同的如转账
4其他错误场景
不可重试的broker错误如消息大小超长错误认证错误消息发送前的错误如序列化错误生产者达到重试次数上限时或消息占用内存达到上限时的错误【5】在可靠系统里使用消费者
1消费者读取消息时不丢失消息的关键
消费者需要跟踪哪些消息是读取过的哪些还没有读取
2丢失消息
如果消费者1 提交了消息X偏移量T 却没有处理完消息X那就有可能造成消息X丢失
因为如果消费者1宕机其他消费者接手处理它是不会再次消费消息X的会被忽略【5.1】消费者可靠性配置
消费者需要注意以下4个参数配置 如下
1group.id消费者组编号两个消费者具有相同组id每个消费者会分到主题消息的子集如果希望看到所有消息消费者组编号需要唯一2auto.offset.reset重置消费者读取消息的偏移量两个值如下 earliest 从分区开始位置读取数据latest从分区末尾位置读取数据3enable.auto.commit 启动自动提交偏移量也可以在代码里手动提交 取值 [true | false] 如果在轮询里处理所有数据那么自动提交可以保证只提交已经处理过的消息的偏移量但如果在子线程处理数据自动提交可能在消息没有处理完就提交了有风险4auto.commit.interval.ms 设置自动提交偏移量的频率默认值是每5秒自动提交一次
消费者属性配置例子
/* 1.创建消费者配置信息 */
Properties props new Properties();
/*2.给配置信息赋值*/
/*2.1连接的集群*/
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092);
/*2.2开启自动提交 */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
/*2.3 自动提交的间隔时间*/
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
/*2.4 key value的反序列化 */
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*2.5 消费者组 */
props.put(ConsumerConfig.GROUP_ID_CONFIG, hello04Group01); // group.id
/*2.6 重置消费者的offset */
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 默认值是 lastest
/*2.7 关闭自动提交 */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 【5.2】显式提交偏移量手动提交
可靠消费者需要注意的8个事项
1总是在处理完事件后再提交偏移量
如果消费处理过程都在轮询里完成且不需要在轮询间维护状态可以使用自动提交或在轮询结束时使用手动提交
2提交频度是性能和重复消息数量之间的权衡
如果消费处理过程都在轮询里完成且不需要在轮询间维护状态可以在一个循环里多次提交偏移量每循环一次提交一次也是可以的或者循环退出后提交一次这取决于在性能和重复处理消息间做出的权衡
3确保对提交的偏移量心里有数
处理完消息后再提交偏移量是非常关键的否则会导致消费者错过消息
4消费者再均衡
在分区被撤销之前提交偏移量并在分配到新分区是清理之前的状态消费者再均衡 在同一个消费者组当中分区所有权从一个消费者转移到另外一个消费者的机制
5消费者可能需要重试
情况1在遇到可重试错误时提交最后一个处理成功的偏移量然后把没有处理好的消息保存到缓存里下一个轮询就不会把它覆盖掉调用消费者的 pause() 来确保其他轮询不会返回数据如果重试成功或重试次数达到上限把错误消息丢弃调用 resume() 让消费者从轮询重新获取新数据情况2在遇到可重试错误时把错误消息写入另外的主题B解耦由主题B的消费者来处理错误类似于MQ的死信队列
6消费者可能需要维护状态
不建议多个轮询间维护状态太复杂建议尝试使用 kafkaStreams 类库为聚合连接时间窗和其他复杂分析提供了高级的dsl api
7长时间处理
暂停轮询时间不能超过几秒钟即使不想获取更多数据也要保持轮询这样消费者才会往 broker 发送心跳才不会发生消费者再均衡推荐做法 把数据交给工作线程线程池去处理然后暂停消费者但保持轮询以防止消费者再均衡不获取新数据当工作线程处理完成后让消费者继续获取新数据干货——消费者处理大量数据的推荐做法
注意 区分暂停轮询 与 暂停消费者间的区别
8仅一次传递
方案1 最常用的方法是把结果写到一个支持唯一键的系统里如存储引擎关系型数据库es要么消息自带一个唯一键要么使用消费者组主题分区偏移量的组合创建唯一键这可以唯一标识一个kafka记录且消费者逻辑保证幂等性即可方案2若消费者系统支持事务可以把消息和偏移量持久化到数据库当消费者启动时从数据库读取偏移量并调用seek() 方法从该偏移量的下一个位置读取数据即可【6】验证系统可靠性
【6.1】配置验证
首领选举 如停掉首领所在broker会发生什么控制器选举如停掉重启控制器所在broker 会发生什么 依次重启依次重启broker 不会丢失数据吗不完全首领选举测试不同步副本可以选举为首领副本如果依次停止所有副本然后启动一个不同步的broker会发生什么 要怎样才恢复正常
【6.2】应用程序验证
客户端从服务器断开连接首领选举依次重启broker依次重启生产者依次重启消费者
【6.3】在生产环境监控可靠性
1kafka的java客户端包含了 JMX 度量指标 可以监控客户端的状态和事件
对于生产者最重要的可靠性指标是 error-rate 和 retry-rate 对于消费者最重要的可靠性指标是 consumer-lag