html5网站后台制作,网站错误列表,做外贸客户要求看网站,网页设计与制作考试2020Flink总共有三种时间语义#xff1a;Processing time(处理时间)、Event time(事件时间)以及Ingestion time(摄入时间)。关于这些时间语义的具体解释#xff0c;可以参考另一篇文章Flink的时间与watermarks详解。本文主要讲解Flink Table API SQL中基于时间的算子如何定…Flink总共有三种时间语义Processing time(处理时间)、Event time(事件时间)以及Ingestion time(摄入时间)。关于这些时间语义的具体解释可以参考另一篇文章Flink的时间与watermarks详解。本文主要讲解Flink Table API SQL中基于时间的算子如何定义时间语义。通过本文你可以了解到时间属性的简介处理时间事件时间时间属性简介Flink TableAPISQL中的基于时间的操作(如window)需要指定时间语义表可以根据指定的时间戳提供一个逻辑时间属性。时间属性是表schama的一部分当使用DDL创建表时、DataStream转为表时或者使用TableSource时会定义时间属性。一旦时间属性被定义完成该时间属性可以看做是一个字段的引用从而在基于时间的操作中使用该字段。时间属性像一个时间戳可以被访问并参与计算如果一个时间属性参与计算那么该时间属性会被雾化成一个常规的时间戳常规的时间戳不能与Flink的时间与水位线兼容不能被基于时间的操作所使用。Flink TableAPI SQL所需要的时间属性可以通过Datastream程序中指定如下final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认// 可以选择:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
处理时间基于本地的机器时间是一种最简单的时间语义但是不能保证结果一致性使用该时间语义不需要提取时间戳和生成水位线。总共有三种方式定义处理时间属性具体如下DDL语句创建表时定义处理时间处理时间的属性可以在DDL语句中被定义为一个计算列需要使用PROCTIME()函数如下所示CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 声明一个额外字段作为处理时间属性
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL 10 MINUTE); -- 10分钟的滚动窗口
DataStream转为Table的过程中定义处理时间在将DataStream转为表时在schema定义中可以通过.proctime属性指定时间属性并将其放在其他schema字段的最后面具体如下DataStreamTuple2String, String stream ...;
// 声明一个额外逻辑字段作为处理时间属性
Table table tEnv.fromDataStream(stream, user_name, data, user_action_time.proctime);WindowedTable windowedTable table.window(Tumble.over(10.minutes).on(user_action_time).as(userActionWindow));
使用TableSource自定义TableSource并实现DefinedProctimeAttribute 接口如下// 定义个带有处理时间属性的table source
public class UserActionSource implements StreamTableSourceRow, DefinedProctimeAttribute {Overridepublic TypeInformationRow getReturnType() {String[] names new String[] {user_name , data};TypeInformation[] types new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}Overridepublic DataStreamRow getDataStream(StreamExecutionEnvironment execEnv) {// 创建streamDataStreamRow stream ...;return stream;}Overridepublic String getProctimeAttribute() {// 该字段会追加到schema中作为第三个字段return user_action_time;}
}// 注册table source
tEnv.registerTableSource(user_actions, new UserActionSource());WindowedTable windowedTable tEnv.from(user_actions).window(Tumble.over(10.minutes).on(user_action_time).as(userActionWindow));
事件时间基于记录的具体时间戳即便是存在乱序或者迟到数据也会保证结果的一致性。总共有三种方式定义处理时间属性具体如下DDL语句创建表时定事件时间事件时间属性可以通过 WATERMARK语句进行定义如下CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 声明user_action_time作为事件时间属性并允许5S的延迟 WATERMARK FOR user_action_time AS user_action_time - INTERVAL 5 SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL 10 MINUTE);
DataStream转为Table的过程中定义事件时间当定义Schema时通过.rowtime属性指定事件时间属性必须在DataStream中指定时间戳与水位线。例如在数据集中事件时间属性为event_time此时Table中的事件时间字段中可以通过’event_time. rowtime‘来指定。目前Flink支持两种方式定义EventTime字段如下// 方式1:
// 提取timestamp并分配watermarks
DataStreamTuple2String, String stream inputStream.assignTimestampsAndWatermarks(...);// 声明一个额外逻辑字段作为事件时间属性
// 在table schema的末尾使用user_action_time.rowtime定义事件时间属性
// 系统会在TableEnvironment中获取事件时间属性
Table table tEnv.fromDataStream(stream, user_name, data, user_action_time.rowtime);// 方式2:// 从第一个字段提取timestamp并分配watermarks
DataStreamTuple3Long, String, String stream inputStream.assignTimestampsAndWatermarks(...);// 第一个字段已经用来提取时间戳可以直接使用对应的字段作为事件时间属性
Table table tEnv.fromDataStream(stream, user_action_time.rowtime, user_name, data);// 使用:WindowedTable windowedTable table.window(Tumble.over(10.minutes).on(user_action_time).as(userActionWindow));
使用TableSource另外也可以在创建TableSource的时候实现DefinedRowtimeAttributes接口来定义EventTime字段在接口中需要实现getRowtimeAttributeDescriptors方法创建基于EventTime的时间属性信息。// 定义带有rowtime属性的table source
public class UserActionSource implements StreamTableSourceRow, DefinedRowtimeAttributes {Overridepublic TypeInformationRow getReturnType() {String[] names new String[] {user_name, data, user_action_time};TypeInformation[] types new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};return Types.ROW(names, types);}Overridepublic DataStreamRow getDataStream(StreamExecutionEnvironment execEnv) {// 创建流基于user_action_time属性分配水位线DataStreamRow stream inputStream.assignTimestampsAndWatermarks(...);return stream;}Overridepublic ListRowtimeAttributeDescriptor getRowtimeAttributeDescriptors() {// 标记user_action_time字段作为事件时间属性// 创建user_action_time描述符用来标识时间属性字段RowtimeAttributeDescriptor rowtimeAttrDescr new RowtimeAttributeDescriptor(user_action_time,new ExistingField(user_action_time),new AscendingTimestamps());ListRowtimeAttributeDescriptor listRowtimeAttrDescr Collections.singletonList(rowtimeAttrDescr);return listRowtimeAttrDescr;}
}// register表
tEnv.registerTableSource(user_actions, new UserActionSource());WindowedTable windowedTable tEnv.from(user_actions).window(Tumble.over(10.minutes).on(user_action_time).as(userActionWindow));
小结本文主要介绍了如何在Flink Table API和SQL中使用时间语义可以使用两种时间语义处理时间和事件时间。分别对每种的时间语义的使用方式进行了详细解释。往期精彩回顾Flink Table API SQL编程指南(1)Flink Table API SQL编程指南之动态表(2)