当前位置: 首页 > news >正文

做网站的回扣网络营销推广方法

做网站的回扣,网络营销推广方法,山西人工智能建站系统软件,建立网站模板目录 #xff08;零#xff09;本文简介 意外收获#xff1a; #xff08;一#xff09;背景 #xff08;二#xff09;探索梳理过程 #xff08;三#xff09;源码改造 #xff08;四#xff09;修改效果 1、JOB状态 2、Level5的dataFile总大小 3、数据延…目录 零本文简介 意外收获 一背景 二探索梳理过程 三源码改造 四修改效果 1、JOB状态 2、Level5的dataFile总大小 3、数据延迟 4、关联率 五未来展望异步Compact 零本文简介 Paimon多流拼接/合并性能优化 为解决离线T1多流拼接数据时效性、Flink实时状态太大任务稳定性问题这里基于数据湖工具Apache Paimon进行近实时的多流拼接。 使用FlinkPaimon基于ParmaryKey TablePartialUpdate进行多流拼接的时候跑一段时间有时会遇到周期性背压、checkpoint时间过长等情况本文通过剖析源码逻辑、修改源码在一定程度上解决了这个问题。 note下文对源码的修改可能需要了解一点paimon的实现原理比如LSM Treelevel DB 可参考LSM树详解 - 知乎 LSM(Log-Structured Merge Tree)_lsm tree_一介草民kk的博客-CSDN博客 Apache Paimon基础 、多流拼接方法 及 与Hudi 的对比 可参考前面文章 新一代数据湖存储技术Apache Paimon入门Demo_Leonardo_KY的博客-CSDN博客 基于数据湖的多流拼接方案-HUDI概念篇_Leonardo_KY的博客-CSDN博客 意外收获 本文通过修改源码还意外解决了【跨分区关联率偏低】的问题详见下文。 一背景 这里使用 Flink 1.14 Apache Paimon 0.5 snapshot 进行多流拼接前端埋点流 服务端埋点流 当前情况是一天一个分区一个分区100个bucket就会出现如下情况分区/bucket中的数据越来越多到达下午或者傍晚的时候就会出现 paimon 作业周期性背压因为mergeTree中维护的数据越来越多tree越来越大checkpoint时间也会比较长于是决定将mergeTree中的过期数据删除即让其不进入tree中减少计算量 这里的“过期”按需自定义比如调研发现99.9%的数据都可以使用3个小时之内的数据拼接上那就根据时间戳与当前时间戳假设没有很严重的消费积压相比时间差超过3小时的数据就将其丢弃 具体细节涉及到这里先将结论给出 data文件创建后是否还会修改不会根据时间排序的data数据文件是增量还是全量几个最新文件加起来就是全量应该根据dataFile的创建/修改时间判断过期 还是 通过具体每个record字段值的时间戳判断过期通过record 二探索梳理过程 1、首先观察hdfs文件之后发现dataFile只保留最近一个小时的文件超过一小时的文件就会被删除这里应该对应参数 partition.expiration-check-interval 1h由此可知data文件不是增量的【下文compact只有几个文件再次加强验证】那么就不能通过dataFile的最新修改时间判断文件过期将数据过滤 2、观察flink log发现每次compaction都只读几个文件如下所示 每次其实只读取一个level0的file再加上几个level5的filelevel5这里file就是之前的全部数据包含多个流的最后将compact之后的文件再命名为新的名字写到level5 随着分区数据量的增多参与compact的file也会越来越多这也是会导致tree偏大出现周期性背压的原因 另外dataFile命名呈现如下规律 level5的第二个文件总是跟第一个中间隔一个这个跟改源码没有关系只是适合观察规律 到晚间的时候参与compact的file更多了 3、观察每次level5生成的dataFile理论上level5的dataFile会越来越大/多当单个文件大小超过128M *1rate时会生成新文件 所有level5的文件大小加起来会越来越大即永远是呈增长趋势 如下每一层的总大小在不断增大同时当文件到一定程度之后每层2个文件变成3个文件 4、【以上3点均为原始实现思路从这里开始改造】思考既然已知每个bucket中只要最新的几个dataFile就包含了全部的data数据dataFile不是增量的那么就不能通过文件最新修改时间来判断数据是否过期只能从最新的几个dataFile的每条记录来进行判断了即原本每次参与合并的record是从这个partitionbucket建立开始的全部数据那么是否可以通过修改源码判断每条record是否过期从而不参与mergeTree在compact完成之后也不会再次写入新的dataFile如果还是写进来每次读进tree时都需要判断是否过期是否进入tree【答案当然是可以的】 三源码改造 1、首先说明一下在源码中有这么一段 // IntervalPartition.partition() public ListListSortedRun partition() {ListListSortedRun result new ArrayList();ListDataFileMeta section new ArrayList();BinaryRow bound null;for (DataFileMeta meta : files) {if (!section.isEmpty() keyComparator.compare(meta.minKey(), bound) 0) {// larger than current right bound, conclude current section and create a new oneresult.add(partition(section));section.clear();bound null;}section.add(meta);if (bound null || keyComparator.compare(meta.maxKey(), bound) 0) {// update right boundbound meta.maxKey();}}if (!section.isEmpty()) {// conclude last sectionresult.add(partition(section));}return result; } 此处为了将文件排序、再将有overlap的放在一个list里边一但产生gap即没有overlap那么就创建新的list最终将这些 list 再放到List中 示意图如下 2、后续通过一些处理变成 List 的格式这里的KeyValue就包含我们想要去操纵的record 源码是这样的 public T RecordReaderT mergeSort(ListReaderSupplierKeyValue lazyReaders,ComparatorInternalRow keyComparator,MergeFunctionWrapperT mergeFunction)throws IOException {if (ioManager ! null lazyReaders.size() spillThreshold) {return spillMergeSort(lazyReaders, keyComparator, mergeFunction);}ListRecordReaderKeyValue readers new ArrayList(lazyReaders.size());for (ReaderSupplierKeyValue supplier : lazyReaders) {try {readers.add(supplier.get());} catch (IOException e) {// if one of the readers creating failed, we need to close them all.readers.forEach(IOUtils::closeQuietly);throw e;}}return SortMergeReader.createSortMergeReader(readers, keyComparator, mergeFunction, sortEngine); } 这里的return就会创建sortMergeReader了我们可以在将数据传入这里之前先进行过滤通过判断每一条record是否超过过期时间修改如下 public T RecordReaderT mergeSort(ListReaderSupplierKeyValue lazyReaders,ComparatorInternalRow keyComparator,MergeFunctionWrapperT mergeFunction)throws IOException {if (ioManager ! null lazyReaders.size() spillThreshold) {return spillMergeSort(lazyReaders, keyComparator, mergeFunction);}ListRecordReaderKeyValue readers new ArrayList(lazyReaders.size());for (ReaderSupplierKeyValue supplier : lazyReaders) {try {// 过滤掉过期数据RecordReaderKeyValue filterSupplier supplier.get().filter((KeyValue keyValue) -isNotExpiredRecord(keyValue.value(), expireTimeMillis));readers.add(filterSupplier);} catch (IOException e) {// if one of the readers creating failed, we need to close them all.readers.forEach(IOUtils::closeQuietly);throw e;}}return SortMergeReader.createSortMergeReader(readers,keyComparator,mergeFunction,sortEngine,keyType.getFieldTypes(),valueType.getFieldTypes()); }// 判断这条数据是否过期 public boolean isNotExpiredRecord(InternalRow row, long expireTimeMillis) {if (expireTimeMillis 0) {return true;}// 只要有一个字段不为空且大于0且过期时间大于expireTimeMillis就判断为过期for (Integer pos : expireFieldsPosSet) {if ((!row.isNullAt(pos)) row.getLong(pos) 0 (System.currentTimeMillis() - row.getLong(pos)) expireTimeMillis) {return false;}}return true; } 与此同时将相关参数暴露出来可以在建表时进行自定义配置 public static final ConfigOptionInteger RECORDS_EXPIRED_HOUR key(record.expired-hour).intType().defaultValue(-1).withDescription(Records in streams WONT be offered into MergeTree when they are expired. (Inorder to avoid too large MergeTree; -1 means never expired). );public static final ConfigOptionString RECORDS_EXPIRED_FIELDS key(record.expired-fields).stringType().noDefaultValue().withDescription(Records in streams WONT be offered into MergeTree when they are judged as [expired] according to these fields. If you specify multiple fields, delimiter is ,.); 使用方法 val createPaimonJoinTable (sCREATE TABLE IF NOT EXISTS ${paimonTable}(\n uuid STRING,\n metaid STRING,\n cid STRING,\n area STRING,\n ts1 bigint,\n ts2 bigint,\n d STRING, \n PRIMARY KEY (d, uuid) NOT ENFORCED \n ) PARTITIONED BY (d) \n WITH (\n merge-engine partial-update,\n changelog-producer full-compaction, \n file.format orc, \n s sink.managed.writer-buffer-memory ${sinkWriterBuffer}, \n s full-compaction.delta-commits ${fullCompactionCommits}, \n s scan.mode ${scanMode}, \n s bucket ${bucketNum}, \n s sink.parallelism ${sinkTaskNum}, \n s record.expired-hour 3 , \n // user defined para record.expired-fileds 4,5 , \n // user defined para sequence.field ts1 \n )) tableEnv.executeSql(createPaimonJoinTable) 四修改效果 1、JOB状态 运行到晚上20点尚未出现背压 checkpoint时间也没有过长如果不剔除过期数据到这个时间cp时长应该在3分钟左右 生产到Kafka的消息也没有严重的断流或者锯齿现象 还是有可能出现exception如下但对数据量没有任何影响 2、Level5的dataFile总大小 上边只是现象最终还是要数据说话。 修改源码之后观察dataFile理论上每一层的size总大小可能会出现减小的情况 因为过期数据就不会再写入到 level5 新的data文件中了 如下图levelSize diff下一次level总size - 上一次level总size确实出现了“有正有负”的情况于是验证源码修改生效即每次进行compact只会读取近 n 个小时的数据进行合并 3、数据延迟 有意思的是当我们修改源码将过期的数据丢弃之后数据延迟也变小了。 数据延迟计算方法paimon处理完将数据写到kafka队列的时间戳 - 前端埋点被触发被服务器接收到的时间戳 修改前 修改后 4、关联率 意外收获 经过上述过程改造源码还可以解决“跨分区关联率偏低”的问题 既然是多个流相关联那么就必然存在一个关联率的问题一定会有部分数据因为埋点上报缺失/延迟导致关联不上。于是就会存在如下问题如果数据按“天”进行分区那么在跨分区时刻也就必然会存在更多的数据关联不上因为两个流的时间不是完全同步的一条流可能落到前一天分区另一条流可能落在第二天分区数据不在同一个分区就不会进入同一个mergeTree也就关联不上。 那么修改了源码之后是如何解决上述问题的呢 如前文所述我们修改源码的目的是“使参与compact的数据不会持续增加”于是修改代码使部分数据过期最终level5LSM tree的最深一层的数据总量不持续增加。那么既然数据不会持续增加我们就可以将所有的数据全部放在一个分区中或者理解为不设分区一直在一个hdfs路径下此时只有一开始跑的时候前一少部分数据关联率偏低后边会维持在一个稳定水平也就没有过跨分区一说了。 五未来展望异步Compact 官方提供的paimon源码里边的compaction是 sync 模式的我尝试改成过 async 的但是时不时会出现很少量的数据丢失感觉可能是因为同一时刻有多个compact任务在进行后续有机会可以再继续尝试一下。
http://www.yutouwan.com/news/197879/

相关文章:

  • 重庆网站推广报价免费背景图片素材网
  • 哪里做百度网站做网站维护一工资多少钱
  • app网站开发小程序综合商城网站建设
  • 做网站的流程分析-图灵吧WordPress批量删除无用标签
  • 家具网站首页模板重庆建站公司
  • 福田做棋牌网站建设哪家技术好承揽合同和建设工程合同的区别
  • 建设网站都需要什么网站推广和网络推广
  • 做网站前端要会什么wordpress和公众号对接
  • 个人介绍网站内容wordpress黑糖主题
  • 医院如何做网站策划移商网站建设
  • 东莞网站优化哪家好wordpress相关推荐
  • 网站设计网站公司小程序开发需要多少钱
  • 怎样做免费抽皮肤的网站网站建设市场
  • 陕西的网站建设公司排名成都网站建设零一
  • 大连企业建站系统模板wordpress 安装平台
  • 建设银行网站适用浏览器wordpress 手机管理
  • 手机端网站建设备案深圳龙岗租房子多少钱一个月
  • 网站按钮代码做招聘网站的怎么让人注册简历
  • wordpress 做手机站您的网站对百度设置了ip封禁
  • 河南天丰建设工程有限公司网站paypal外贸门户网站
  • 做任务可以给钱的网站网站开发规划书
  • 学做网站 为了熊掌号网站建设合同 费用
  • 东道 网站建设小广告发布
  • 谁有网站推荐一下好在线网站建设诚信经营
  • 网站特效网大连建设工程信息网华宇凤凰城东侧市政管网配套工程
  • 承德市网站开发大连企业建设网站
  • 网站页面效果图怎么做唐山做企业网站的公司
  • 网站开发前台mip网站怎么做匹配
  • 建设网站沙井东营网站设计公司
  • 网站从服务器上下载文件怎么做百度联盟网站