当前位置: 首页 > news >正文

登陆建设银行wap网站网站建设中怎么回事

登陆建设银行wap网站,网站建设中怎么回事,wordpress用户注册插件汉化,网站做qq链接这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法。 我在这里关注代码设计#xff0c;而不是监督或冗余之类的部署良好实践。 由于Storm的实时流性质#xff0c;当面对大多数错误时#xff0c;我们最终将不得不移至下一个数据。 在这种情况下#xff0c;错误… 这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法。 我在这里关注代码设计而不是监督或冗余之类的部署良好实践。 由于Storm的实时流性质当面对大多数错误时我们最终将不得不移至下一个数据。 在这种情况下错误处理归结为或没有报告此错误并在以后或没有重试处理失败的输入数据。 这篇文章的第1部分是关于这方面的。 这意味着在处理元组时通常很难确定它是我们第一次遇到它还是它的内容已经部分地应用于持久性。 因此我们需要使状态更新操作成为幂等这是本文的第二部分。 不要对这篇文章的大小印象深刻Storm实际上为我们完成了大部分工作。 真正需要做的只是了解如何以合理的方式插入东西。 这篇文章基于Storm 0.9Cassandra 2.0.4和Kafka 0.7。 我在github上放置了一个玩具项目 以说明下面讨论的几点。 该项目实际上是根据我在上一篇文章中介绍的“房间存在”示例改编的 。 第1部分处理错误情况 决定何时要求重试 第一个简单的错误处理策略是简单地接受运行时错误导致的计算质量下降。 例如如果拓扑在最近的滑动窗口上计算一些实时趋势估计或者如果我们已经在处理诸如Twitter公开流之类的采样数据则可能是这种情况。 如果我们选择忽略此类错误则实现起来非常简单只需用大量的try / catch包装拓扑逻辑以某种方式报告错误并且不要让任何事情冒充Storm。 但是在大多数情况下我们关心一致性因此必须对尝试重试或不尝试失败的数据做出谨慎的决定。 运行时错误的一个典型示例是入站数据格式问题。 在那种情况下重试当然是没有意义的因为它不会第二次变得更好。 相反我们应该记录故障数据并可能要求某些人进行调查。 这是我的玩具项目中BytesToString Storm函数的一个简单示例 public class BytesToString extends BaseFunction { Overridepublic void execute(TridentTuple tuple, TridentCollector tridentCollector) { try { String asString new String((byte[]) tuple.getValueByField(bytes), UTF-8); tridentCollector.emit(new Values(asString)); } catch (UnsupportedEncodingException e) { logger.err(ERROR: lost data: unable to parse inbound message from Kafka (expecting UTF-8 string), e); } } 另一方面如果错误与某些不可访问的外部数据源有关例如由网络分区引起的错误我们应按下一节所述触发重试。 除上述两种错误外还有许多其他类型的错误但要点仍然是区分可重试错误与不可重试错误并做出相应反应很重要。 最后一点当您决定不报告在IBackingMap的multiget中发生的错误时请格外小心 因为该函数必须返回与输入键列表大小相同的列表。 因此如果出现不可重试的错误我们必须以某种方式返回某些结果。 在大多数情况下如果我们选择不重试这种情况下的错误那是因为某些过去的错误已经在持久性方面破坏了某些内容并且为时已晚。 在下面的示例中由于对从DB读取的某些数据进行的解析失败而发生错误并且代码仅返回null值这等同于考虑到持久性没有任何作用至少没有用处。 另请参阅下面的第3部分以了解针对这种情况的可能解决方案。 Override public ListOpaqueValue multiGet(ListListObject keys) { try { return Utils.opaqueStringToOpaqueValues(opaqueStrings, HourlyTimeline.class); } catch (IOException e) { logger.err(error while trying to deserialize data from json giving up (data is lost!), e); return Utils.listOfNulls(keys.size()); // this assumes previous state does not exist destroys data! } } 好吧来自TimelineBackingMap的这段代码实际上将所有数据替换为null这使情况变得更糟但这是一个玩具项目…… 导致三叉戟元组被重播… 一旦确定触发元组重播是合理的我们只需要询问它Storm就会做其他所有事情只需插入正确的喷嘴请参阅下一节。 从技术上讲这很简单从功能或过滤器之类的Trident原语中触发重试就像抛出FailedException一样简单就像玩具项目中的TimeLineBackingMap中一样其中包括重试和非重试错误的示例请注意代码下面来自TimelineBackingMap的示例假定任何数据库错误都是可重试的这过于简化了 Override public void multiPut(ListListObject keys, ListOpaqueValue timelines) {;ListOpaqueValue jsonOpaqueTimelines; try { jsonOpaqueTimelines Utils.opaqueValuesToOpaqueJson(timelines); } catch (IOException e) { System.err.println(error while trying to serialize data to json giving up (data is lost!)); return; }if (jsonOpaqueTimelines ! null) { try { DB.put(room_timelines, toSingleKeys(keys), jsonOpaqueTimelines); } catch (Exception e) { logger.err(error while storing timelines to cassandra, triggering a retry..., e); throw new FailedException(could not store data into Cassandra, triggering a retry..., e); } } }; 然后Storm会将错误传播回喷嘴以强制重播元组。 如果我们希望在Storm UI中报告错误则可以抛出ReportedFailedException。 我强烈不建议使用的另一种方法是让任何其他类型的RuntimeException冒泡到Storm。 这本质上以更高的性能成本实现了相同的结果它将触发工作节点崩溃并且Nimbus将自动重启并且所有spout将恢复从最新的已知成功索引中读取spout实现如Kafka spout将其最新成功处理的偏移存储在zookeeper中为了这个目的。 这种快速失败策略是Storm设计的一部分请参阅有关工人监督和容错的文档。 从本质上讲这实现了与让spout重播某些元组相同的一致性保证但是对性能的影响当然更大因为我们具有完整的JVM重新启动并重置了所有当前正在运行的拓扑实例。 因此切勿故意这样做。 仍然令人放心的是如果我们的节点崩溃数据不会中断并且流量自然会继续。 Storm决定重播元组的第三种情况是它们是否在配置的超时之前未到达拓扑的末尾。 更确切地说如果未按时收到ACK则该机制实际上是由发出该元组的spout触发的因此如果元组成功处理但由于某些网络分区ACK无法到达该spout则也可以触发这些重播。 用于控制此设置的Storm参数是topology.enable.message.timeouts和topology.message.timeout.secs 根据defaults.yaml的默认值为“ true”和30秒。 这只是为什么拓扑中的幂等性如此重要的又一个原因。 …并实际上重播元组 一旦失败通知到达喷嘴或在超时情况下由通知生成我们需要确保失败的元组将被重播。 除非您自己开发喷嘴否则只能归结为选择正确的喷嘴口味 。 此选择会影响元组的重播或不重播方式因此它必须与适当的策略保持一致以处理拓扑中的已重播的元组这是下一部分的主题。 有3种喷口 非事务性无保证但如果您选择的实现提供“至少一次”保证在某些情况下它们仍然有用 事务性的不建议使用因为它们在某些分区情况下可能会阻止拓扑 opaque不透明就重播而言它们达到元组至少会被播放一次但在重播方面提供了弱保证但在重播的情况下发出的批次可能会不同。 在实践中使用它们时我所建议的所有重要事项是确保拓扑对于这种灵活的重放具有鲁棒性这将在下一部分中进行讨论。 关于元组和批处理重播的最后说明 我在元组级别上进行了讨论因为这使设计决策更简单。 实际上要求Storm重播单个元组将触发同一批中包含的许多其他元组的重播其中一些可能没有错误。 第2部分重播元组的幂等处理 故事的另一面是既然我们知道元组可能会被处理几次请确保拓扑是幂等的即发送相同元组的次数不会使状态不一致。 没有副作用的拓扑部分当然不受元组重播的影响。 关于状态一致性的Storm Trident文档非常清楚因此我在这里仅添加一些内容。 如果我们的状态更新操作已经幂等 如果状态更新操作本质上已经是幂等的那么它已经具有元组重播的弹性并且不需要Storm特殊机制。 如果id值完全基于入站元组内容则任何“按id存储”操作都是这种情况。 例如在我的玩具项目中我存储了占用会话这些会话的主键是从入站事件中找到的相关ID派生的因此在这种情况下写操作已经可以重播了因为任何重播都只会覆盖相同的现有数据信息而不会破坏任何数据假设我们有订购保证在这种情况下是正确的。 public void multiPut(ListListObject keys, ListRoomPresencePeriod newOrUpdatedPeriods) { DB.upsertPeriods(newOrUpdatedPeriods); } 在CassandraDB.java中 try { PreparedStatement statement getSession().prepare(INSERT INTO presence (id, payload) values (?,?)); execute(new BoundStatement(statement).bind(rpp.getId(), periodJson)); } catch (Exception e) { logger.error(error while contacting Cassandra, triggering a retry..., e); new FailedException(error while trying to record room presence in Cassandra , e); } 同时使read-update-write操作成为幂等 我在先前的博客文章中描述了Storm如何使我们能够实现执行以下操作而不需要DB锁并且仍然避免出现竞争情况 从数据库读取以前的状态 根据新的元组数据更新内存中的状态 将新状态保存到数据库 风暴的美丽之处在于为了处理重播的元组而不破坏状态我们只需要调整步骤1和3。这是非常重要的我们现在可以在步骤2中实现所有处理逻辑就像每个元组只被播放一次然后根本不关心重播只要我们是“纯”的请参见下面的评论…。 这就是“风暴只有一次语义”的含义。 而且如果我们在内部实现1和3则使它们重播即可只是将它们与现有的Storm类包装在一起即可。 最健壮的方式是使用Opaque逻辑但代价是每个状态存储两次状态如Trident文档中关于transaction spout的说明 。 更好的是已经有很多不透明的BackingMap实现可用于Storm-contrib中的诸如Cassandra或Mysql的许多后端因此在大多数情况下除了选择正确的之外实际上没有任何其他事情可做。 最重要的一点是要使用处理重播元组的不透明BackingMap必须使用尊重不透明先决条件的喷嘴如本矩阵所述 。 如果由于某种原因需要实现自己的BackingMap我们唯一要做的就是使它存储数据的当前和先前版本以及交易ID。 这是我的玩具项目中的一个简单示例但实际上在编写类似代码之前请考虑一下Storm-contrib public void put(String table, ListString keys, ListOpaqueValue opaqueStrings) {;// this should be optimized with C* batches... for (PairString, OpaqueValue keyValue : Utils.zip(keys, opaqueStrings)) { PreparedStatement statement getSession().prepare(format(INSERT INTO %s (id, txid, prev, curr) values (?, ?, ?, ?), table)); OpaqueValue opaqueVal keyValue.getValue(); execute(new BoundStatement(statement).bind(keyValue.getKey(), opaqueVal.getCurrTxid(), opaqueVal.getCurr(), opaqueVal.getPrev())); } } public ListOpaqueValue get(String table, ListString keys) {;ListOpaqueValue vals new ArrayList(keys.size()); ResultSet rs execute(format(select id, txid, prev, curr from %s where id in ( %s ) , table, toCsv(keys) )); MapString, OpaqueValue data toMapOfOpaque(rs); for (String key: keys){ vals.add(data.get(key)); }return vals; } 然后要真正获得Trident的一次语义唯一要做的就是将其包装在OpaqueMap中如下所示 public static StateFactory FACTORY new StateFactory() { public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return OpaqueMap.build(new TimelineBackingMap(new CassandraDB(conf))); } } 幕后发生的事情是 OpaqueMap将根据与当前批处理元组关联的事务ID和在持久性中找到的事务ID选择要显示给我们的更新逻辑的先前存储的状态“ curr”或“ prev”。 该事务ID是由喷嘴提供的因此这就是保持喷嘴与状态选择对齐如此重要的原因状态对每个事务ID的含义进行假设。 不要破坏前一个实例 让我们回到上面提到的read-update-write序列的步骤2。 既然我们知道不透明逻辑需要存储任何状态的新版本和旧版本请查看以下Reducer代码并尝试确定其损坏原因 public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event (LocationChangedEvent) tuple.getValueByField(occupancyEvent);;if (ENTER event.getEventType()) { curr.setStartTime(event.getTime()); // buggy code } else { curr.setEndTme(event.getTime()); // buggy code } return curr; } 函数式编程的专家称其为“不纯”方法因为它会修改其输入参数。 它破坏Storm不透明逻辑的原因是现在“当前”和“先前” java引用实际上都引用内存中的同一实例。 因此当不透明逻辑同时保留某个状态的先前版本和当前版本时实际上它保存的是新版本的两倍因此先前的版本丢失了。 更好的实现可能是这样的 public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event (LocationChangedEvent) tuple.getValueByField(occupancyEvent);;RoomPresencePeriod updated new RoomPresencePeriod(curr); // copy constructor if (ENTER event.getEventType()) { updated.setStartTime(event.getTime()); } else { updated.setEndTme(event.getTime()); } return updated; }第3部分人为错误全部重播 最后一点我们必须谦虚地意识到无论我们采取了多少上述努力和保障我们仍然会在生产环境中部署错误对此我发誓抱歉。 对于数据处理平台错误可能意味着破坏数据的错误当数据是我们的业务时这是很糟糕的。 在某些情况下我们只会发现事实之后数据已损坏就像上面有关multiget的注释中所述。 内森·马兹Nathan Marz在他的《 大数据》书中 描述了一个简单的基于Lambda架构的“重播所有”想法以解决该想法。 这本书的简短摘要也可以在这里找到 。 参考来自Svend博客的 JCG合作伙伴 Svend Vanderveken 在Storm Trident拓扑中的错误处理 。 翻译自: https://www.javacodegeeks.com/2014/02/error-handling-in-storm-trident-topologies.html
http://www.huolong8.cn/news/276910/

相关文章:

  • 做网站编程的电脑配置wordpress 英文 中文
  • 广安哪里做网站网站建设有何好处
  • wordpress建站网营销专业网站
  • 池州网站开发公司招聘淮南建筑学院
  • 做服装外贸的网站建设做网站常见程序
  • 苏州网站建设官网自己开网店需要什么流程
  • 中专网站建设课程wordpress 淘宝客 主题
  • 大同市网站建设建设电子商务平台网站
  • 个性化网站定制价格wordpress 多数据库
  • 杭州制作企业公司网站展馆设计方案ppt
  • 德州网络资阳优化团队平台
  • 网站免费部署单站点网站
  • 网站如何做分布式国内大的做网站的公司
  • 指示灯具网站建设网站seo的重要性
  • 网站建设公司比较好的有哪些网站建设与网站优化销售
  • 成都创建公司网站官方网站建设 安全还踏实磐石网络
  • 花网站开发背景动漫制作专业认识
  • 无限弹窗网站链接怎么做北京城市副中心投资建设公司网站
  • 徐州网站定制app推广策划方案
  • 网站标题会影响吗中英文外贸网站模板 生成静态html
  • 东莞网站外包免费推广平台软件有哪些
  • 临沭县建设局官方网站学校网站后台管理源码
  • 重庆住建网站昵称小写 wordpress
  • 专门做漫画的网站网上商城系统平台官网
  • 梅州市五华县建设银行网站制作网页可以用
  • 网站首页命名万州集团网站建设
  • 安定网站建设关于计算机网站开发的论文题目
  • 怎么简单攻击一个网站网站开发行业前景
  • 加强机构编制网站建设力度网络钟点工
  • 做外贸生意最好的网站wordpress添加字段