当前位置: 首页 > news >正文

静态手机网站建设的基本特点国外平面设计分享网站有哪些

静态手机网站建设的基本特点,国外平面设计分享网站有哪些,网站后台怎么做友情链接,移动端网站建设的好处本案例根据某电网公司的真实业务需求#xff0c;通过Blink SQLUDAF实现实时流上的差值聚合计算#xff0c;通过本案例#xff0c;让读者熟悉UDAF编写#xff0c;并理解UDAF中的方法调用关系和顺序。 感谢军长在实现过程中的指导。笔者水平有限#xff0c;若有纰漏#xf…本案例根据某电网公司的真实业务需求通过Blink SQLUDAF实现实时流上的差值聚合计算通过本案例让读者熟悉UDAF编写并理解UDAF中的方法调用关系和顺序。 感谢军长在实现过程中的指导。笔者水平有限若有纰漏请批评指出。 一、客户需求 电网公司每天采集各个用户的电表数据格式如下表其中data_date为电表数据上报时间cons_id为电表idr1为电表度数其他字段与计算逻辑无关可忽略。为了后续演示方便仅输入cons_id100000002的数据。 no(string)data_date(string)cons_id(string)org_no(string)r1(double)101201907161000000023540113.76101201907171000000023540114.12101201907181000000023540116.59101201907191000000023540118.89 表1输入数据 电网公司希望通过实时计算Blink对电表数据处理后每天得到每个电表最近两天当天和前一天的差值数据结果类似如下表 cons_id(string)data_date(string)subDegreeR1(double)100000002201907170.36100000002201907182.47100000002201907192.3 表2期望的输出数据 二、需求分析 根据客户的需求比较容易得到两种解决方案1、通过over窗口2 rows over window开窗进行差值聚合2、通过hop窗口sliding1天size2天进行差值聚合。 over窗口和hop窗口均是Blink支持的标准窗口使用起来非常简单。本需求的最大难点在于差值聚合Blink支持SUM、MAX、MIN、AVG等内置的聚合函数但没有满足业务需求的差值聚合函数因此需要通过自定义聚合函数UDAF来实现。 三、UDAF开发 实时计算自定义函数开发搭建环境请参考UDX概述在此不再赘述。本案例使用Blink2.2.7版本下面简要描述关键代码的编写。 完整代码为了方便上传使用了txt格式SubtractionUdaf.txt 1、在com.alibaba.blink.sql.udx.SubtractionUdaf包中创建一个继承AggregateFunction类的SubtractionUdaf类。 public class SubtractionUdaf extends AggregateFunctionDouble, SubtractionUdaf.Accum 其中Double是UDAF输出的类型在本案例中为相邻两天的电表差值度数。SubtractionUdaf.Accum是内部自定义的accumulator数据结构。 2、定义accumulator数据结构用户保存UDAF的状态。 public static class Accum {private long currentTime;//最新度数的上报时间private double oldDegree;//前一次度数private double newDegree;//当前最新度数private long num; //accumulator中已经计算的record数量主要用于mergeprivate ListTuple2Double, Long listInput;//缓存所有的输入主要用于retract} 3、实现createAccumulator方法初始化UDAF的accumulator //初始化udaf的accumulatorpublic SubtractionUdaf.Accum createAccumulator() {SubtractionUdaf.Accum acc new SubtractionUdaf.Accum();acc.currentTime 0;acc.oldDegree 0.0;acc.newDegree 0.0;acc.num 0;acc.listInput new ArrayListTuple2Double, Long();return acc;} 4、实现getValue方法用于通过存放状态的accumulator计算UDAF的结果本案例需求是计算新旧数据两者的差值。 public Double getValue(SubtractionUdaf.Accum accumulator) {return accumulator.newDegree - accumulator.oldDegree;} 5、实现accumulate方法用于根据输入数据更新UDAF存放状态的accumulator。考虑到数据可能乱序以及可能的retract数据数据包括了对应的度数iValue还包括上报度数的时间构造的事件时间ts。 public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) {System.out.println(method : accumulate );accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts)));Collections.sort(accumulator.listInput,this.comparator);//按照时间排序accumulator.num ;if(accumulator.listInput.size() 1){accumulator.newDegree iValue;accumulator.oldDegree 0.0;accumulator.currentTime ts;}else {//处理可能存在的数据乱序问题accumulator.newDegree accumulator.listInput.get(0).f0;accumulator.currentTime accumulator.listInput.get(0).f1;accumulator.oldDegree accumulator.listInput.get(1).f0;}} 其中accumulator为UDAF的状态iValue和ts为实际的输入数据。 注意需要处理可能存在的输入数据乱序问题。 6、实现retract方法用于在某些优化场景下如使用over窗口对retract的数据进行处理。 public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) 0){//retract的是最新值accumulator.listInput.remove(0);accumulator.num--;if(accumulator.listInput.isEmpty()){accumulator.currentTime 0;accumulator.oldDegree 0.0;accumulator.newDegree 0.0;}else if(accumulator.listInput.size() 1) {accumulator.currentTime accumulator.listInput.get(0).f1;accumulator.newDegree accumulator.listInput.get(0).f0;accumulator.oldDegree 0.0;}else{accumulator.currentTime accumulator.listInput.get(0).f1;accumulator.newDegree accumulator.listInput.get(0).f0;accumulator.oldDegree accumulator.listInput.get(1).f0;}} else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) 1){//retract的是次新值accumulator.listInput.remove(1);accumulator.num--;if(accumulator.listInput.size() 1){accumulator.oldDegree 0.0;}else {accumulator.oldDegree accumulator.listInput.get(1).f0;}}else {//retract的是其他值accumulator.listInput.remove(Tuple2.of(iValue, ts));accumulator.num--;}}else {throw new Exception(Cannot retract a unexist record : iValue iValue timestamp ts);}} 需要考虑retract的是最新的数据还是次新的数据需要不同的逻辑处理。 7、实现merge方法用于某些优化场景如使用hop窗口。 public void merge(SubtractionUdaf.Accum accumulator, IterableSubtractionUdaf.Accum its) {int i 0;System.out.println(method : merge );System.out.println(accumulator : accumulator.newDegree);System.out.println(accumulator : accumulator.currentTime);for (SubtractionUdaf.Accum entry : its) {if(accumulator.currentTime entry.currentTime){if(entry.num 1){accumulator.currentTime entry.currentTime;accumulator.oldDegree entry.oldDegree;accumulator.newDegree entry.newDegree;accumulator.num entry.num;accumulator.listInput.addAll(entry.listInput);}else if(entry.num 1){accumulator.currentTime entry.currentTime;accumulator.oldDegree accumulator.newDegree;accumulator.newDegree entry.newDegree;accumulator.num ;accumulator.listInput.addAll(entry.listInput);}}else{if(accumulator.num 1){accumulator.num entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num 1){accumulator.oldDegree entry.newDegree;accumulator.num entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num 0){accumulator.currentTime entry.currentTime;accumulator.oldDegree entry.oldDegree;accumulator.newDegree entry.newDegree;accumulator.num entry.num;accumulator.listInput.addAll(entry.listInput);}}Collections.sort(accumulator.listInput,this.comparator);System.out.println(merge : i);System.out.println(newDegree : entry.newDegree);System.out.println(oldDegree entry.oldDegree);System.out.println(currentTime : entry.currentTime);}} 需要考虑merge的是否是比当前新的数据需要不同的处理逻辑。 8、其他方面考虑到需要对输入度数按照事件时间排序在open方法中实例化了自定义的Comparator类对accumulator数据结构中的inputList按事件时间的降序排序。 public void open(FunctionContext context) throws Exception {//定义record的先后顺序用于listInput的排序时间越新的record在list中越前面this.comparator new ComparatorTuple2Double, Long() {public int compare( Tuple2Double, Long o1, Tuple2Double, Long o2) {if (Long.valueOf(o1.f1) Long.valueOf(o2.f1)) {return 1;} else if (Long.valueOf(o1.f1) Long.valueOf(o2.f1)) {return -1;}else {return 0;}}};} 请参考[使用IntelliJ IDEA开发自定义函数]()完成UDAF编译、打包并参考UDX概述完成资源的上传和引用。 四、SQL开发及测试结果 一over窗口 SQL代码如下语法检查、上线、启动作业选择当前启动位点。并将表1数据上传至datahub。 CREATE FUNCTION OverWindowSubtractionUdaf as com.alibaba.blink.sql.udx.SubtractionUdaf;CREATE TABLE input_dh_e_mp_read_curve (no VARCHAR,data_date VARCHAR,cons_id VARCHAR,org_no VARCHAR,r1 DOUBLE,ts as TO_TIMESTAMP(concat(data_date,000000),yyyyMMddHHmmss),WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH (type datahub,endPoint http://dh-cn-shanghai.aliyun-inc.com,roleArnacs:ram::XXX:role/aliyunstreamdefaultrole,project jszc_datahub,topic input_dh_e_mp_read_curve ); CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE )with(type print );INSERT into data_out SELECTcons_id,last_value(data_date) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date FROM input_dh_e_mp_read_curve 由于使用了print connector从对应的sink的taskmanager.out日志中可以查看到输出如下已忽略其他debug日志 task-1 ()100000002,20190716,13.76 task-1 ()100000002,20190717,0.35999999999999943 task-1 ()100000002,20190718,2.4700000000000006 对比期望输出表220190717和20190718两个窗口的数据均正确表明业务逻辑正确但此输出与期望输出有少许差异 120190716输出为13.76这是因为第一个over窗口只有一条数据导致的这种数据可以在业务层过滤掉 220190719的数据没有输出这是因为我们设置了watermark测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。 二hop窗口 SQL代码如下语法检查、上线、启动作业选择当前启动位点。并将表1数据上传至datahub。 CREATE FUNCTION HopWindowSubtractionUdaf as com.alibaba.blink.sql.udx.SubtractionUdaf;CREATE TABLE input_dh_e_mp_read_curve (no VARCHAR,data_date VARCHAR,cons_id VARCHAR,org_no VARCHAR,r1 DOUBLE,ts as TO_TIMESTAMP(concat(data_date,000000),yyyyMMddHHmmss),WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH (type datahub,endPoint http://dh-cn-shanghai.aliyun-inc.com,roleArnacs:ram::XXX:role/aliyunstreamdefaultrole,project jszc_datahub,topic input_dh_e_mp_read_curve ); CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE )with(type print ); INSERT into data_out SELECTcons_id,DATE_FORMAT(HOP_end(ts, INTERVAL 1 day,INTERVAL 2 day), yyyyMMdd),HopWindowSubtractionUdaf(r1,unix_timestamp(ts)) FROM input_dh_e_mp_read_curve group by hop(ts, INTERVAL 1 day,INTERVAL 2 day),cons_id; 由于使用了print connector从对应的sink的taskmanager.out日志中可以查看到输出如下已忽略其他debug日志 task-1 ()100000002,20190716,13.76 task-1 ()100000002,20190717,0.35999999999999943 task-1 ()100000002,20190718,2.4700000000000006 对比期望输出表220190717和20190718两个窗口的数据均正确表明业务逻辑正确但此输出与期望输出有少许差异 120190716输出为13.76这是因为第一个hop窗口只有一条数据导致的这种数据可以在业务层过滤掉 220190719的数据没有输出这是因为我们设置了watermark测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。 五、几点思考 1、关于UDAF内部方法的调用关系和顺序 UDAF中主要有createAccumulator、getValue、accumulate、retract和merge方法其调用关系和顺序并不是完全确定而是与Blink底层优化、Blink版本、开窗类型如hop还是over窗口等相关。 比较确定的是一次正常没有failover的作业createAccumulator方法只在作业启动时调用一次accumulate方法在每条数据输入时调用一次在触发数据输出时会调用一次getValue并不代表只调用一次。 而retract方法和merge方法则跟具体的优化方式或开窗类型有关本案例中over窗口调用retract方法而不调用merge方法hop窗口调用merge方法而不调用retract方法。 大家可以增加日志观察这几个方法的调用顺序还是蛮有意思的。 2、如何知道需要实现UDAF中的哪些方法 UDAF中必须实现createAccumulator、getValue、accumulate方法可选择实现retract和merge方法。 一般情况下可先实现createAccumulator、getValue、accumulate三个方法然后编写SQL后进行语法检查SQL编译器会提示是否需要retract或merge方法。 比如如果没有实现retract方法在使用over窗口时语法检查会报类似如下错误 org.apache.flink.table.api.ValidationException: Function class com.alibaba.blink.sql.udx.SubtractionUdaf does not implement at least one method named retract which is public, not abstract and (in case of table functions) not static. 比如如果没有实现merge方法在使用over窗口时语法检查会报类似如下错误 org.apache.flink.table.api.ValidationException: Function class com.alibaba.blink.sql.udx.SubtractionUdaf does not implement at least one method named merge which is public, not abstract and (in case of table functions) not static. 3、本案例存在优化空间的地方 1本案例没有考虑数据缺失的问题比如因为某种原因网络问题、数据采集问题等缺少20190717的数据。这种情况下会是什么样的结果大家可以自行测试下 2本案例使用了一个List然后通过Collections.sort方法进行排序这不是很优的方法如果用优先级队列priority queue性能应该会更好 原文链接 本文为云栖社区原创内容未经允许不得转载。
http://www.huolong8.cn/news/206290/

相关文章:

  • 大气手机企业网站极简wordpress主题
  • 教人做窗帘的视频网站屯留做网站哪里好
  • 怎样制作自己公司的网站企业名录2019企业黄页
  • 全球云邮登陆网站公司网站建设注意什么
  • 网站活动平台推广计划服务公司名称大全
  • 雄安网站设计制作烟台门户网站开发
  • 国外电商网站建设哪里医院做无痛人流便宜 咨询网站在线
  • 罗湖实惠的网站建设费用常州网站的优化
  • 网站开发软件 d中国建设教育协会网站打不开
  • wap网站建设多少钱wordpress无法加载媒体库
  • 网站怎么找的建设部网站查询公司
  • 免费论坛建站系统wordpress文字上传
  • 山西省建设监理协会官方网站随州网站设计开发服务
  • 做 在线观看免费网站有哪些搜收录批量查询
  • 承德网站制作公司制作游戏的软件手机版
  • 网站服务器有哪几种东道设计公司招聘
  • 海口网站建设搜q.479185700网页微信怎么登陆
  • 个人建设网站流程郑州做供暖的公司网站
  • 网站改版合同书网站开发环境实验报告
  • 手机视频网站建站园林公司网站模板
  • 个人网站有哪些板块房地产网站的设计要求
  • 山西省煤炭厅基本建设局网站网站制作素材图片
  • 广州 营销型网站建设公司做笔记的网站
  • 天猫网站做的比京东好网站游戏正规网站建设
  • 阿里云怎么做网站wordpress上传上限
  • 做p2p网站的主页模板莱芜网站优化是什么
  • net公司网站开发框架源代码免费html网站开发教程
  • 石家庄网站建设联系电话做App和网站 聚马
  • 部门网站建设和维护iis默认网站 没有属性
  • 网站制作软件手机版下载如何学网站建设