静态手机网站建设的基本特点,国外平面设计分享网站有哪些,网站后台怎么做友情链接,移动端网站建设的好处本案例根据某电网公司的真实业务需求#xff0c;通过Blink SQLUDAF实现实时流上的差值聚合计算#xff0c;通过本案例#xff0c;让读者熟悉UDAF编写#xff0c;并理解UDAF中的方法调用关系和顺序。 感谢军长在实现过程中的指导。笔者水平有限#xff0c;若有纰漏#xf…本案例根据某电网公司的真实业务需求通过Blink SQLUDAF实现实时流上的差值聚合计算通过本案例让读者熟悉UDAF编写并理解UDAF中的方法调用关系和顺序。 感谢军长在实现过程中的指导。笔者水平有限若有纰漏请批评指出。
一、客户需求
电网公司每天采集各个用户的电表数据格式如下表其中data_date为电表数据上报时间cons_id为电表idr1为电表度数其他字段与计算逻辑无关可忽略。为了后续演示方便仅输入cons_id100000002的数据。
no(string)data_date(string)cons_id(string)org_no(string)r1(double)101201907161000000023540113.76101201907171000000023540114.12101201907181000000023540116.59101201907191000000023540118.89
表1输入数据 电网公司希望通过实时计算Blink对电表数据处理后每天得到每个电表最近两天当天和前一天的差值数据结果类似如下表
cons_id(string)data_date(string)subDegreeR1(double)100000002201907170.36100000002201907182.47100000002201907192.3
表2期望的输出数据
二、需求分析
根据客户的需求比较容易得到两种解决方案1、通过over窗口2 rows over window开窗进行差值聚合2、通过hop窗口sliding1天size2天进行差值聚合。 over窗口和hop窗口均是Blink支持的标准窗口使用起来非常简单。本需求的最大难点在于差值聚合Blink支持SUM、MAX、MIN、AVG等内置的聚合函数但没有满足业务需求的差值聚合函数因此需要通过自定义聚合函数UDAF来实现。
三、UDAF开发
实时计算自定义函数开发搭建环境请参考UDX概述在此不再赘述。本案例使用Blink2.2.7版本下面简要描述关键代码的编写。 完整代码为了方便上传使用了txt格式SubtractionUdaf.txt 1、在com.alibaba.blink.sql.udx.SubtractionUdaf包中创建一个继承AggregateFunction类的SubtractionUdaf类。
public class SubtractionUdaf extends AggregateFunctionDouble, SubtractionUdaf.Accum
其中Double是UDAF输出的类型在本案例中为相邻两天的电表差值度数。SubtractionUdaf.Accum是内部自定义的accumulator数据结构。 2、定义accumulator数据结构用户保存UDAF的状态。 public static class Accum {private long currentTime;//最新度数的上报时间private double oldDegree;//前一次度数private double newDegree;//当前最新度数private long num; //accumulator中已经计算的record数量主要用于mergeprivate ListTuple2Double, Long listInput;//缓存所有的输入主要用于retract}
3、实现createAccumulator方法初始化UDAF的accumulator //初始化udaf的accumulatorpublic SubtractionUdaf.Accum createAccumulator() {SubtractionUdaf.Accum acc new SubtractionUdaf.Accum();acc.currentTime 0;acc.oldDegree 0.0;acc.newDegree 0.0;acc.num 0;acc.listInput new ArrayListTuple2Double, Long();return acc;}
4、实现getValue方法用于通过存放状态的accumulator计算UDAF的结果本案例需求是计算新旧数据两者的差值。 public Double getValue(SubtractionUdaf.Accum accumulator) {return accumulator.newDegree - accumulator.oldDegree;}
5、实现accumulate方法用于根据输入数据更新UDAF存放状态的accumulator。考虑到数据可能乱序以及可能的retract数据数据包括了对应的度数iValue还包括上报度数的时间构造的事件时间ts。 public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) {System.out.println(method : accumulate );accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts)));Collections.sort(accumulator.listInput,this.comparator);//按照时间排序accumulator.num ;if(accumulator.listInput.size() 1){accumulator.newDegree iValue;accumulator.oldDegree 0.0;accumulator.currentTime ts;}else {//处理可能存在的数据乱序问题accumulator.newDegree accumulator.listInput.get(0).f0;accumulator.currentTime accumulator.listInput.get(0).f1;accumulator.oldDegree accumulator.listInput.get(1).f0;}}
其中accumulator为UDAF的状态iValue和ts为实际的输入数据。 注意需要处理可能存在的输入数据乱序问题。 6、实现retract方法用于在某些优化场景下如使用over窗口对retract的数据进行处理。 public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) 0){//retract的是最新值accumulator.listInput.remove(0);accumulator.num--;if(accumulator.listInput.isEmpty()){accumulator.currentTime 0;accumulator.oldDegree 0.0;accumulator.newDegree 0.0;}else if(accumulator.listInput.size() 1) {accumulator.currentTime accumulator.listInput.get(0).f1;accumulator.newDegree accumulator.listInput.get(0).f0;accumulator.oldDegree 0.0;}else{accumulator.currentTime accumulator.listInput.get(0).f1;accumulator.newDegree accumulator.listInput.get(0).f0;accumulator.oldDegree accumulator.listInput.get(1).f0;}} else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) 1){//retract的是次新值accumulator.listInput.remove(1);accumulator.num--;if(accumulator.listInput.size() 1){accumulator.oldDegree 0.0;}else {accumulator.oldDegree accumulator.listInput.get(1).f0;}}else {//retract的是其他值accumulator.listInput.remove(Tuple2.of(iValue, ts));accumulator.num--;}}else {throw new Exception(Cannot retract a unexist record : iValue iValue timestamp ts);}}
需要考虑retract的是最新的数据还是次新的数据需要不同的逻辑处理。 7、实现merge方法用于某些优化场景如使用hop窗口。 public void merge(SubtractionUdaf.Accum accumulator, IterableSubtractionUdaf.Accum its) {int i 0;System.out.println(method : merge );System.out.println(accumulator : accumulator.newDegree);System.out.println(accumulator : accumulator.currentTime);for (SubtractionUdaf.Accum entry : its) {if(accumulator.currentTime entry.currentTime){if(entry.num 1){accumulator.currentTime entry.currentTime;accumulator.oldDegree entry.oldDegree;accumulator.newDegree entry.newDegree;accumulator.num entry.num;accumulator.listInput.addAll(entry.listInput);}else if(entry.num 1){accumulator.currentTime entry.currentTime;accumulator.oldDegree accumulator.newDegree;accumulator.newDegree entry.newDegree;accumulator.num ;accumulator.listInput.addAll(entry.listInput);}}else{if(accumulator.num 1){accumulator.num entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num 1){accumulator.oldDegree entry.newDegree;accumulator.num entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num 0){accumulator.currentTime entry.currentTime;accumulator.oldDegree entry.oldDegree;accumulator.newDegree entry.newDegree;accumulator.num entry.num;accumulator.listInput.addAll(entry.listInput);}}Collections.sort(accumulator.listInput,this.comparator);System.out.println(merge : i);System.out.println(newDegree : entry.newDegree);System.out.println(oldDegree entry.oldDegree);System.out.println(currentTime : entry.currentTime);}}
需要考虑merge的是否是比当前新的数据需要不同的处理逻辑。 8、其他方面考虑到需要对输入度数按照事件时间排序在open方法中实例化了自定义的Comparator类对accumulator数据结构中的inputList按事件时间的降序排序。 public void open(FunctionContext context) throws Exception {//定义record的先后顺序用于listInput的排序时间越新的record在list中越前面this.comparator new ComparatorTuple2Double, Long() {public int compare( Tuple2Double, Long o1, Tuple2Double, Long o2) {if (Long.valueOf(o1.f1) Long.valueOf(o2.f1)) {return 1;} else if (Long.valueOf(o1.f1) Long.valueOf(o2.f1)) {return -1;}else {return 0;}}};}
请参考[使用IntelliJ IDEA开发自定义函数]()完成UDAF编译、打包并参考UDX概述完成资源的上传和引用。 四、SQL开发及测试结果 一over窗口
SQL代码如下语法检查、上线、启动作业选择当前启动位点。并将表1数据上传至datahub。
CREATE FUNCTION OverWindowSubtractionUdaf as com.alibaba.blink.sql.udx.SubtractionUdaf;CREATE TABLE input_dh_e_mp_read_curve (no VARCHAR,data_date VARCHAR,cons_id VARCHAR,org_no VARCHAR,r1 DOUBLE,ts as TO_TIMESTAMP(concat(data_date,000000),yyyyMMddHHmmss),WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (type datahub,endPoint http://dh-cn-shanghai.aliyun-inc.com,roleArnacs:ram::XXX:role/aliyunstreamdefaultrole,project jszc_datahub,topic input_dh_e_mp_read_curve
);
CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE
)with(type print
);INSERT into data_out
SELECTcons_id,last_value(data_date) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date
FROM input_dh_e_mp_read_curve
由于使用了print connector从对应的sink的taskmanager.out日志中可以查看到输出如下已忽略其他debug日志
task-1 ()100000002,20190716,13.76
task-1 ()100000002,20190717,0.35999999999999943
task-1 ()100000002,20190718,2.4700000000000006
对比期望输出表220190717和20190718两个窗口的数据均正确表明业务逻辑正确但此输出与期望输出有少许差异 120190716输出为13.76这是因为第一个over窗口只有一条数据导致的这种数据可以在业务层过滤掉 220190719的数据没有输出这是因为我们设置了watermark测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。
二hop窗口
SQL代码如下语法检查、上线、启动作业选择当前启动位点。并将表1数据上传至datahub。
CREATE FUNCTION HopWindowSubtractionUdaf as com.alibaba.blink.sql.udx.SubtractionUdaf;CREATE TABLE input_dh_e_mp_read_curve (no VARCHAR,data_date VARCHAR,cons_id VARCHAR,org_no VARCHAR,r1 DOUBLE,ts as TO_TIMESTAMP(concat(data_date,000000),yyyyMMddHHmmss),WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (type datahub,endPoint http://dh-cn-shanghai.aliyun-inc.com,roleArnacs:ram::XXX:role/aliyunstreamdefaultrole,project jszc_datahub,topic input_dh_e_mp_read_curve
);
CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE
)with(type print
);
INSERT into data_out
SELECTcons_id,DATE_FORMAT(HOP_end(ts, INTERVAL 1 day,INTERVAL 2 day), yyyyMMdd),HopWindowSubtractionUdaf(r1,unix_timestamp(ts))
FROM input_dh_e_mp_read_curve
group by hop(ts, INTERVAL 1 day,INTERVAL 2 day),cons_id;
由于使用了print connector从对应的sink的taskmanager.out日志中可以查看到输出如下已忽略其他debug日志
task-1 ()100000002,20190716,13.76
task-1 ()100000002,20190717,0.35999999999999943
task-1 ()100000002,20190718,2.4700000000000006
对比期望输出表220190717和20190718两个窗口的数据均正确表明业务逻辑正确但此输出与期望输出有少许差异 120190716输出为13.76这是因为第一个hop窗口只有一条数据导致的这种数据可以在业务层过滤掉 220190719的数据没有输出这是因为我们设置了watermark测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。
五、几点思考 1、关于UDAF内部方法的调用关系和顺序
UDAF中主要有createAccumulator、getValue、accumulate、retract和merge方法其调用关系和顺序并不是完全确定而是与Blink底层优化、Blink版本、开窗类型如hop还是over窗口等相关。 比较确定的是一次正常没有failover的作业createAccumulator方法只在作业启动时调用一次accumulate方法在每条数据输入时调用一次在触发数据输出时会调用一次getValue并不代表只调用一次。 而retract方法和merge方法则跟具体的优化方式或开窗类型有关本案例中over窗口调用retract方法而不调用merge方法hop窗口调用merge方法而不调用retract方法。 大家可以增加日志观察这几个方法的调用顺序还是蛮有意思的。
2、如何知道需要实现UDAF中的哪些方法
UDAF中必须实现createAccumulator、getValue、accumulate方法可选择实现retract和merge方法。 一般情况下可先实现createAccumulator、getValue、accumulate三个方法然后编写SQL后进行语法检查SQL编译器会提示是否需要retract或merge方法。 比如如果没有实现retract方法在使用over窗口时语法检查会报类似如下错误
org.apache.flink.table.api.ValidationException: Function class com.alibaba.blink.sql.udx.SubtractionUdaf does not implement at least one method named retract which is public, not abstract and (in case of table functions) not static.
比如如果没有实现merge方法在使用over窗口时语法检查会报类似如下错误
org.apache.flink.table.api.ValidationException: Function class com.alibaba.blink.sql.udx.SubtractionUdaf does not implement at least one method named merge which is public, not abstract and (in case of table functions) not static. 3、本案例存在优化空间的地方
1本案例没有考虑数据缺失的问题比如因为某种原因网络问题、数据采集问题等缺少20190717的数据。这种情况下会是什么样的结果大家可以自行测试下 2本案例使用了一个List然后通过Collections.sort方法进行排序这不是很优的方法如果用优先级队列priority queue性能应该会更好
原文链接 本文为云栖社区原创内容未经允许不得转载。