网站字体大小是多少合适,wordpress高级插件,wordpress 自适应,北京市住房和城乡建设部网站首页文章目录 1、filter算子实现分流2、分流#xff1a;使用侧输出流3、合流#xff1a;union4、合流#xff1a;connect5、connect案例 分流#xff0c;很形象的一个词#xff0c;就像一条大河#xff0c;遇到岸边有分叉的#xff0c;而形成了主流和测流。对于数据流也一样… 文章目录 1、filter算子实现分流2、分流使用侧输出流3、合流union4、合流connect5、connect案例 分流很形象的一个词就像一条大河遇到岸边有分叉的而形成了主流和测流。对于数据流也一样不过是一个个水滴替换成了一条条数据。 将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream定义一些筛选条件将符合条件的数据拣选出来放到对应的流里。 1、filter算子实现分流
Demo案例读取一个整数数字流将数据流划分为奇数流和偶数流。实现思路针对同一个流多次条用filter算子来拆分
public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperatorInteger ds env.socketTextStream(node01, 9527).map(Integer::valueOf);//将ds 分为两个流 一个是奇数流一个是偶数流//使用filter 过滤两次SingleOutputStreamOperatorInteger ds1 ds.filter(x - x % 2 0);SingleOutputStreamOperatorInteger ds2 ds.filter(x - x % 2 1);ds1.print(偶数);ds2.print(奇数);env.execute();}
}
以上实现的明显缺陷是同一条数据被多次处理。以上其实是将原始数据流stream复制两份然后对每一份分别做筛选冗余且低效。
2、分流使用侧输出流
基本步骤为
使用process算子Flink分层API中的最底层的处理函数定义OutputTag对象即输出标签对象用于后面标记和提取侧流调用上下文ctx的.output()方法通过主流获取侧流
案例实现将WaterSensor按照Id类型进行分流先定义下MapFunction的转换规则用来将输入的数据转为自定义的WaterSensor对象
public class WaterSensorMapFunction implements MapFunctionStringWaterSensor{Overridepublic WaterSensor map(String value) throws Exception {String[] strArr value.split( regex: ,);//String组装对象return new WaterSensor(strArr[0],Long.value0f(strArr[1]),Integer.value0f(strArr[2]));}
}使用侧流
public class SplitStreamByOutputTag { public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperatorWaterSensor ds env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());//定义两个输出标签对象用于后面标记和提取侧流OutputTagWaterSensor s1 new OutputTag(s1, Types.POJO(WaterSensor.class));OutputTagWaterSensor s2 new OutputTag(s2, Types.POJO(WaterSensor.class));//返回的都是主流SingleOutputStreamOperatorWaterSensor ds1 ds.process(new ProcessFunctionWaterSensor, WaterSensor(){Override//形参为别为流中的一条数据、上下文对象、收集器public void processElement(WaterSensor value, Context ctx, CollectorWaterSensor out) throws Exception {if (s1.equals(value.getId())) {ctx.output(s1, value);} else if (s2.equals(value.getId())) {ctx.output(s2, value);} else {//主流out.collect(value);}}});ds1.print(主流);SideOutputDataStreamWaterSensor s1DS ds1.getSideOutput(s1);SideOutputDataStreamWaterSensor s2DS ds1.getSideOutput(s2);s1DS.printToErr(侧流s1); //区别主流让控制台输出标红s2DS.printToErr(侧流s2);env.execute();}
}
相关传参说明首先是创建OutputTag对象时的传参
第一个参数为标签名用于区分是哪一个侧流第二个是放入侧流中的数据的类型且必须是Flink的类型TypeInfomation借助Types类OutputTag的泛型是流到对应的侧流的数据类型
ProcessFunction接口的泛型中
第一个是输入的数据类型第二个是输出到主流上的数据类型
ctx.output方法的形参
第一个为outputTag对象第二个为数据上面代码中传value即直接输出数据本身也可输出处理后的数据主流侧流数据类型不用一致
看下运行效果 3、合流union
将来源不同的多条流合并成一条来联合处理即合流。最简单的合流操作就是直接将多条流合在一起叫作流的联合union union的条件是
每条流中要合并的数据类型必须相同原始不同可先借助map在union合并之后的新流会包括所有流中的元素数据类型不变
stream1.union(stream2, stream3, ...) //可变长参数public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger ds1 env.fromElements(1, 2, 3);DataStreamSourceInteger ds2 env.fromElements(2, 2, 3);DataStreamSourceString ds3 env.fromElements(2, 2, 3);ds1.union(ds2,ds3.map(Integer::valueOf)).print();env.execute();}
}
//输出
1
2
3
2
2
3
2
2
34、合流connect
union合并流受限于数据类型因此还有另一种合流操作connect public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Integer流SingleOutputStreamOperatorInteger source1 env.socketTextStream(node01, 9527).map(i - Integer.parseInt(i));//String流DataStreamSourceString source2 env.socketTextStream(node01, 2795);/*** 总结 使用 connect 合流* 1、一次只能连接 2条流* 2、流的数据类型可以不一样* 3、 连接后可以调用 map、flatmap、process来处理但是各处理各的*/ConnectedStreamsInteger, String connect source1.connect(source2);SingleOutputStreamOperatorString result connect.map(new CoMapFunctionInteger, String, String() {Overridepublic String map1(Integer value) throws Exception {return 来源于原source1流: value.toString();}Overridepublic String map2(String value) throws Exception {return 来源于原source2流: value;}});result.print();env.execute(); }
}
使用 connect 合流的总结
一次只能连接 2条流因为connect返回的是一个ConnectedStreams对象不再是DataStreamSource或其子类了两条流中的数据类型可以不一样连接后可以调用 map、flatmap、process来处理但是各处理各的
以map为例其形参是一个CoMapFuntion接口类型泛型则分别是流1的数据类型、流2的数据类型、合并及处理后输出的数据类型。两个map方法可以看出虽然两个流合并成一个了但处理数据时还是各玩各的。
.map1()就是对第一条流中数据的map操作.map2()则是针对第二条流 connect 就类比被逼相亲后结婚两个人看似成一家了但实际上各自玩各自的。往大了举例就相当于一国两制。
5、connect案例
和connect以后的map传CoMapFunction一样process算子也不再传ProcessFunction而是CoProcessFunction实现两个方法
processElement1()针对第一条流processElement2()针对第二条流
connect合并后得到的ConnectedStreams也可以直接调用.keyBy()进行按键分区分区后返回的还是一个ConnectedStreams
connectedStreams.keyBy(keySelector1, keySelector2);
//keySelector1和keySelector2是两条流中各自的键选择器ConnectedStreams进行keyBy操作其实就是把两条流中key相同的数据放到了一起然后针对来源的流再做各自处理 案例需求连接两条流输出能根据id匹配上的数据即两个流里元组f0相同的数据类似inner join效果public class ConnectKeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//二元组流DataStreamSourceTuple2Integer, String source1 env.fromElements(Tuple2.of(1, a1),Tuple2.of(1, a2),Tuple2.of(2, b),Tuple2.of(3, c));//三元组流DataStreamSourceTuple3Integer, String, Integer source2 env.fromElements(Tuple3.of(1, aa1, 1),Tuple3.of(1, aa2, 2),Tuple3.of(2, bb, 1),Tuple3.of(3, cc, 1));ConnectedStreamsTuple2Integer, String, Tuple3Integer, String, Integer connect source1.connect(source2);// 多并行度下需要根据 关联条件 进行keyby才能保证key相同的数据到一起去才能匹配上ConnectedStreamsTuple2Integer, String, Tuple3Integer, String, Integer connectKey connect.keyBy(s1 - s1.f0, s2 - s2.f0);SingleOutputStreamOperatorString result connectKey.process(new CoProcessFunctionTuple2Integer, String, Tuple3Integer, String, Integer, String() {// 定义 HashMap缓存来过的数据keyidvaluelist数据MapInteger, ListTuple2Integer, String s1Cache new HashMap();MapInteger, ListTuple3Integer, String, Integer s2Cache new HashMap();Overridepublic void processElement1(Tuple2Integer, String value, Context ctx, CollectorString out) throws Exception {Integer id value.f0;// TODO 1.来过的s1数据都存起来if (!s1Cache.containsKey(id)) {// 1.1 第一条数据初始化 value的list放入 hashmapListTuple2Integer, String s1Values new ArrayList();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 1.2 不是第一条直接添加到 list中s1Cache.get(id).add(value);}//TODO 2.根据id查找s2的数据只输出 匹配上 的数据if (s2Cache.containsKey(id)) {for (Tuple3Integer, String, Integer s2Element : s2Cache.get(id)) {out.collect(s1: value ---------s2: s2Element);}}}Overridepublic void processElement2(Tuple3Integer, String, Integer value, Context ctx, CollectorString out) throws Exception {Integer id value.f0;// TODO 1.来过的s2数据都存起来if (!s2Cache.containsKey(id)) {// 1.1 第一条数据初始化 value的list放入 hashmapListTuple3Integer, String, Integer s2Values new ArrayList();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 1.2 不是第一条直接添加到 list中s2Cache.get(id).add(value);}//TODO 2.根据id查找s1的数据只输出 匹配上 的数据if (s1Cache.containsKey(id)) {for (Tuple2Integer, String s1Element : s1Cache.get(id)) {out.collect(s1: s1Element ---------s2: value);}}}});result.print();env.execute();}
}
运行效果