网站制作需要多少钱k,呼伦贝尔市建设网站,成都平台网站建设公司排名,网站改版建设正在学习storm的大兄弟们#xff0c;我又来传道授业解惑了#xff0c;是不是觉得自己会用ack了。好吧#xff0c;那就让我开始啪啪打你们脸吧。
先说一下ACK机制#xff1a; 为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪。 这里面涉及到ac…正在学习storm的大兄弟们我又来传道授业解惑了是不是觉得自己会用ack了。好吧那就让我开始啪啪打你们脸吧。
先说一下ACK机制 为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪。 这里面涉及到ack/fail的处理如果一个tuple处理成功是指这个Tuple以及这个Tuple产生的所有Tuple都被成功处理, 会调用spout的ack方法 如果失败是指这个Tuple或这个Tuple产生的所有Tuple中的某一个tuple处理失败, 则会调用spout的fail方法 在处理tuple的每一个bolt都会通过OutputCollector来告知storm, 当前bolt处理是否成功。 另外需要注意的当spout触发fail动作时不会自动重发失败的tuple需要我们在spout中重新获取发送失败数据手动重新再发送一次。
Ack原理 Storm中有个特殊的task名叫acker他们负责跟踪spout发出的每一个Tuple的Tuple树因为一个tuple通过spout发出了经过每一个bolt处理后会生成一个新的tuple发送出去。当acker框架自启动的task发现一个Tuple树已经处理完成了它会发送一个消息给产生这个Tuple的那个task。 Acker的跟踪算法是Storm的主要突破之一对任意大的一个Tuple树它只需要恒定的20字节就可以进行跟踪。 Acker跟踪算法的原理acker对于每个spout-tuple保存一个ack-val的校验值它的初始值是0然后每发射一个Tuple或Ack一个Tuple时这个Tuple的id就要跟这个校验值异或一下并且把得到的值更新为ack-val的新值。那么假设每个发射出去的Tuple都被ack了那么最后ack-val的值就一定是0。Acker就根据ack-val是否为0来判断是否完全处理如果为0则认为已完全处理。
要实现ack机制 1spout发射tuple的时候指定messageId 2spout要重写BaseRichSpout的fail和ack方法 3spout对发射的tuple进行缓存(否则spout的fail方法收到acker发来的messsageIdspout也无法获取到发送失败的数据进行重发)看看系统提供的接口只有msgId这个参数这里的设计不合理其实在系统里是有cache整个msg的只给用户一个messageid用户如何取得原来的msg貌似需要自己cache然后用这个msgId去查询太坑爹了 3spout根据messageId对于ack的tuple则从缓存队列中删除对于fail的tuple可以选择重发。 4,设置acker数至少大于0Config.setNumAckers(conf, ackerParal);
Storm的Bolt有BsicBolt和RichBolt: 在BasicBolt中BasicOutputCollector在emit数据的时候会自动和输入的tuple相关联而在execute方法结束的时候那个输入tuple会被自动ack。 使用RichBolt需要在emit数据的时候显示指定该数据的源tuple要加上第二个参数anchor tuple以保持tracker链路即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时执行OutputCollector.fail(tuple);
由一个tuple产生一个新的tuple称为anchoring你发射一个tuple的同时也就完成了一次anchoring。 ack机制即spout发送的每一条消息在规定的时间内spout收到Acker的ack响应即认为该tuple 被后续bolt成功处理;在规定的时间内默认是30秒没有收到Acker的ack响应tuple就触发fail动作即认为该tuple处理失败timeout时间可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来设定。或者收到Acker发送的fail响应tuple也认为失败触发fail动作 注意我开始以为如果继承BaseBasicBolt那么程序抛出异常也会让spout进行重发但是我错了程序直接异常停止了 这里我以分布式程序入门案例worldcount为例子吧。请看下面大屏幕没有错我就是那个你们走在路上经常听见的名字刘洋。 这里spout1-1task发送句子i am liu yang给bolt2-2task进行处理该task把句子切分为单词根据字段分发到下一个bolt中bolt2-2,bolt4-4,bolt5-5对每一个单词添加一个后缀1后再发送给下一个bolt进行存储到数据库的操作这个时候bolt7-7task在存储数据到数据库时失败向spout发送fail响应这个时候spout收到消息就会再次发送的该数据。 好那么我思考一个问题spout如何保证再次发送的数据就是之前失败的数据所以在spout实例中绝对要定义一个map缓存缓存发出去的每一条数据key当然就是messageId,当spout实例收到所有bolt的响应后如果是ack就会调用我们重写的ack方法在这个方法里面我们就要根据messageId删除这个key-value,如果spout实例收到所有bolt响应后发现是faile,则会调用我们重写的fail方法根据messageId查询到对应的数据再次发送该数据出去。 spout代码如下
public class MySpout extends BaseRichSpout {private static final long serialVersionUID 5028304756439810609L;// key:messageId,Dataprivate HashMapString, String waitAck new HashMapString, String();private SpoutOutputCollector collector;public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(sentence));}public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector collector;}public void nextTuple() {String sentence i am liu yang;String messageId UUID.randomUUID().toString().replaceAll(-, );waitAck.put(messageId, sentence);//指定messageId开启ackfail机制collector.emit(new Values(sentence), messageId);}Overridepublic void ack(Object msgId) {System.out.println(消息处理成功: msgId);System.out.println(删除缓存中的数据...);waitAck.remove(msgId);}Overridepublic void fail(Object msgId) {System.out.println(消息处理失败: msgId);System.out.println(重新发送失败的信息...);//重发如果不开启ackfail机制那么spout的map对象中的该数据不会被删除的。collector.emit(new Values(waitAck.get(msgId)),msgId);}
}
虽然在storm项目中我们的spout源通常来源kafka,而且我们使用storm提供的工具类KafkaSpout类其实这个类里面就维护者messageId,Tuple对的集合。
Storm怎么处理重复的tuple 因为Storm要保证tuple的可靠处理当tuple处理失败或者超时的时候spout会fail并重新发送该tuple那么就会有tuple重复计算的问题。这个问题是很难解决的storm也没有提供机制帮助你解决。一些可行的策略
1不处理这也算是种策略。因为实时计算通常并不要求很高的精确度后续的批处理计算会更正实时计算的误差。
2使用第三方集中存储来过滤比如利用mysql,memcached或者redis根据逻辑主键来去重。
3使用bloom filter做过滤简单高效。
问题一你们有没有想过如果某一个task节点处理的tuple一直失败消息一直重发会怎么样 我们都知道spout作为消息的发送源在没有收到该tuple来至左右bolt的返回信息前是不会删除的那么如果消息一直失败就会导致spout节点存储的tuple数据越来越多导致内存溢出。
问题二有没有想过如果该tuple的众多子tuple中某一个子tuple处理failed了但是另外的子tuple仍然会继续执行如果子tuple都是执行数据存储操作那么就算整个消息失败那些生成的子tuple还是会成功执行而不会回滚的。 这个时候storm的原生api是无法支持这种事务性操作我们可以使用storm提供的高级api-trident来做到具体如何我不清楚目前没有研究它但是我可以它内部一定是根据分布式协议比如两阶段提交协议等。向这种业务中要保证事务性功能我们完全可以根据我们自身的业务来做到比如这里的入库操作我们先记录该消息是否已经入库的状态再入库时查询状态来决定是否给予执行。
问题三tuple的追踪并不一定要是从spout结点到最后一个bolt,只要是spout开始可以在任意层次bolt停止追踪做出应答。
Acker task 组件来设置一个topology里面的acker的数量默认值是一如果你的topoogy里面的tuple比较多的话那么请把acker的数量设置多一点效率会更高一点。
调整可靠性 acker task是非常轻量级的 所以一个topology里面不需要很多acker。你可以通过Strom UI(id: -1)来跟踪它的性能。 如果它的吞吐量看起来不正常那么你就需要多加点acker了。如果可靠性对你来说不是那么重要 — 你不太在意在一些失败的情况下损失一些数据 那么你可以通过不跟踪这些tuple树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量减少一半 因为对于每一个tuple都要发送一个ack消息。并且它需要更少的id来保存下游的tuple 减少带宽占用。 有三种方法可以去掉可靠性。
第一是把Config.TOPOLOGY_ACKERS 设置成 0. 在这种情况下 storm会在spout发射一个tuple之后马上调用spout的ack方法。也就是说这个tuple树不会被跟踪。 第二个方法是在tuple层面去掉可靠性。 你可以在发射tuple的时候不指定messageid来达到不跟粽某个特定的spout tuple的目的。 最后一个方法是如果你对于一个tuple树里面的某一部分到底成不成功不是很关心那么可以在发射这些tuple的时候unanchor它们。 这样这些tuple就不在tuple树里面 也就不会被跟踪了。
可靠性配置
有三种方法可以去掉消息的可靠性
将参数Config.TOPOLOGY_ACKERS设置为0通过此方法当Spout发送一个消息的时候它的ack方法将立刻被调用
Spout发送一个消息时不指定此消息的messageID。当需要关闭特定消息可靠性的时候可以使用此方法
最后如果你不在意某个消息派生出来的子孙消息的可靠性则此消息派生出来的子消息在发送时不要做锚定即在emit方法中不指定输入消息。因为这些子孙消息没有被锚定在任何tuple tree中因此他们的失败不会引起任何spout重新发送消息。
如何关闭Ack机制
有2种途径
spout发送数据是不带上msgid
设置acker数等于0
值得注意的一点是Storm调用Ack或者fail的task始终是产生这个tuple的那个task所以如果一个Spout被分为很多个task来执行消息执行的成功失败与否始终会通知最开始发出tuple的那个task。
作为Storm的使用者有两件事情要做以更好的利用Storm的可靠性特征首先你在生成一个tuple的时候要通知Storm其次完全处理一个tuple之后要通知Storm这样Storm就可以检测到整个tuple树有没有完成处理并且通知源Spout处理结果。
1 由于对应的task挂掉了一个tuple没有被Ack
Storm的超时机制在超时之后会把这个tuple标记为失败从而可以重新处理。
2 Acker挂掉了 在这种情况下由这个Acker所跟踪的所有spout tuple都会出现超时也会被重新的处理。
3 Spout 挂掉了在这种情况下给Spout发送消息的消息源负责重新发送这些消息。
三个基本的机制保证了Storm的完全分布式可伸缩的并且高度容错的。
另外Ack机制还常用于限流作用 为了避免spout发送数据太快而bolt处理太慢常常设置pending数当spout有等于或超过pending数的tuple没有收到ack或fail响应时跳过执行nextTuple 从而限制spout发送数据。
通过conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);设置spout pend数。
转发:https://www.cnblogs.com/intsmaze/p/5918087.html