当前位置: 首页 > news >正文

大理高端网站建设友点cms

大理高端网站建设,友点cms,中国核工业集团2024校园招聘,wordpress首页置顶推荐问题1、什么是Flink中的转换算子 在使用 Flink DataStream API 开发流式计算任务时#xff0c;可以将一个或多个 DataStream 转换成新的 DataStream#xff0c;在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑图。 2、常用的转换算子 Flink提供了功能各异的转换算…1、什么是Flink中的转换算子 在使用 Flink DataStream API 开发流式计算任务时可以将一个或多个 DataStream 转换成新的 DataStream在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑图。 2、常用的转换算子  Flink提供了功能各异的转换算子Map,FlatMap,Filter,KeyBy,Reduce,Window,WindowAll... 通过操作各种转换算子来获取新的DataStream及子类的实例来完成计算需求。 Tips: 下面测试用例基于 Flink1.17.0、java1.8 编写 3、基本转换算子map/ filter/ flatMap 3.1 Map 功能说明 DataStream[T] → DataStream[R] 输入一个元素同时输出一个元素可以对元素的数据类型和内容做转换好比SQL中的UDF函数 代码示例 package com.baidu.datastream.transform;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Map {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.使用 Map 算子// 方式1使用 Lambda表达式env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).map(value - value _).print();// 方式2使用 MapFunction实现类/** TODO MapFunctionT, O* 功能说明* 对元素做11映射转换* 泛型说明* T : 输入数据类型* O : 输出数据类型* */MapFunctionString, Integer mapFunction new MapFunctionString, Integer() {Overridepublic Integer map(String value) throws Exception {return value.length();}};env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).map(mapFunction).print();// 3.触发程序执行env.execute();} }执行结果 3.2 FlatMap  功能说明 DataStream[T] → DataStream[R] 输入一个元素同时产生零个、一个或多个元素好比SQL中的UDTF(1对多)函数 代码示例 package com.baidu.datastream.transform;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class FlatMap {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.使用 FlatMap 算子// 方式使用 flatMapFunction实现类/** TODO flatMapFunctionT, O* 功能说明* 对输入元素做1:多的转换好比SQL中的UDTF函数* 泛型说明* T : 输入数据类型* O : 输出数据类型* */FlatMapFunctionString, String flatMapFunction new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {for (String s : value.split(_)) {out.collect(s);}}};env.fromElements(刘_备, 张_飞, 关_羽, 赵_云, 马_超, 黄_忠).flatMap(flatMapFunction).print();// 3.触发程序执行env.execute();} }执行结果 3.3 Filter 功能说明 DataStream[T] → DataStream[T] 为每个元素执行一个逻辑判断并保留那些判断为 true 的元素好比SQL中的where 代码示例 package com.baidu.datastream.transform;import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Filter {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.使用 Filter 算子// 方式1使用 Lambda表达式env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).filter(value - value.equals(刘备)).print();// 方式2使用 FilterFunction实现类/** TODO FilterFunctionT, O* 功能说明* 对元素过滤处理* 泛型说明* T : 输入数据类型* */FilterFunctionString filterFunction new FilterFunctionString() {Overridepublic boolean filter(String value) throws Exception {return value.equals(张飞);}};env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).filter(filterFunction).print();// 3.触发程序执行env.execute();} } 执行结果 4、聚合算子 4.1 KeyBy按键分区 功能说明 DataStream[T] → KeyedStream[T,K] 根据指定的字段(key)将数据划分到不相交的分区中。相同key的元素会被分到同一个分区中。 分区规则           分区编号   指定字段(key) 的哈希值 % 分区个数(并行度)    思考 1、哪些 数据类型 不能作为分区的key 数组类型不能作为key      当key的类型为bean类型时bean类必须要重写hashCode方法 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyBy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.使用 KeyBy 算子// 方式1使用 Lambda表达式// TODO key的类型为 StringKeyedStreamString, String stringKeyedStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策).keyBy(value - value.split(_)[0]);stringKeyedStream.print();// TODO key的类型为 bean (需重写hashCode方法)KeyedStreamFlinkUser, FlinkUser userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L), new FlinkUser(2L, x, 110L), new FlinkUser(3L, y, 120L), new FlinkUser(4L, y, 130L), new FlinkUser(5L, z, 140L)).keyBy(user - user);// TODO key的类型为 数组(不支持) // KeyedStreamString, String[] arrayKeyedStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策) // .keyBy(value - value.split(_));// 方式2使用 KeySelector实现类/** TODO KeySelectorIN, KEY* 功能说明* 从输入的数据中提取key然后根据 key的hashcode%并行度 进行分区* 注意这里的分区是逻辑分区* 泛型说明* IN 输入数据类型* KEY key的数据类型* 重要提示* 什么类型的数据不能作为key呢* 1.当 POJO 类且没有重写 hashCode() 方法而是依赖依赖于 Object.hashCode() 实现时* 2.任意类型的数组* */KeySelectorFlinkUser, String keySelector new KeySelectorFlinkUser, String() {Overridepublic String getKey(FlinkUser value) throws Exception {return value.name;}};KeyedStreamFlinkUser, String userNameKeyedStream env.fromElements(new FlinkUser(1L, x, 100L), new FlinkUser(2L, x, 110L), new FlinkUser(3L, y, 120L), new FlinkUser(4L, y, 130L), new FlinkUser(5L, z, 140L)).keyBy(keySelector);// max(字段名称) pojo类一定要含有空参构造//userNameKeyedStream.sum(id).print();// 3.触发程序执行env.execute();} }执行结果 4.2 Reduce 功能说明 KeyedStream[T,K] → DataStream[T] 在相同key的数据流上滚动执行聚合操作。将当前元素与上次一次聚合后得到的值(保存的状态值)组合然后输出新值并将这个值作为状态进行保存。 Reduce函数的弊端         聚合前数据类型 聚合后数据类型不能修改数据类型         不能提供初始值进行聚合操作当只有一个元素时不会触发reduce函数 代码示例 package com.baidu.datastream.transform;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Reduce {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);// 2.使用 Reduce 算子/** TODO ReduceFunctionT* 功能说明* 对相同key中的元素进行聚合操作(依次聚合)* 泛型说明* 输入数据和输出数据的类型* 重要说明* 这种聚合方式不能修改value的数据类型** */ReduceFunctionTuple2String, Integer reduceFunction new ReduceFunctionTuple2String, Integer() {Overridepublic Tuple2String, Integer reduce(Tuple2String, Integer value1, Tuple2String, Integer value2) throws Exception {return new Tuple2(value1.f0, value1.f1 value2.f1);}};// 统计每个国家出现的次数env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return new Tuple2(value.split(_)[0], 1);}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}}).reduce(reduceFunction).print();// 3.触发程序执行env.execute();} }运行结果 4.3 sum、min、max、minBy、maxBy 功能说明 KeyedStream[T,K] → DataStream[T] 在相同key的数据流上滚动执行相应聚合操作。 min、minBy的区别              min聚合状态中保存的是第一个元素的非聚合字段          minBy聚合状态中保存的是当前元素的非聚合字段 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SumMinMaxMinByMaxBy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, y, 140L)).keyBy(user - user.name);/** TODO max()、max(num)* 功能说明* 根据指定的字段做聚合操作* 怎样指定聚合字段* 当 value类型为 pojo时通过 max(字段名称) 来指定字段* 当 value类型为 tuple时通过 max(num) 来指定字段* 重点说明* 当 value类型为pojo时必须实现空参构造方法才能提取字段* *///userKeyedStream.max(id).print();//userKeyedStream.min(id).print();//userKeyedStream.sum(id).print();//userKeyedStream.maxBy(id).print();userKeyedStream.minBy(id).print();env.execute();} }5、物理分区算子 Flink提供了将数据重新分区的方法当任务发生数据倾斜时这个算子会很有用。 5.1 shuffle - 随机分区 功能说明 DataStream[T] → DataStream[T]         将元素随机地均匀分配到下游分区 Tips         因为是完全随机当输入相同时每次执行的结果可能会不同 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Shuffle {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);/** TODO 问题由于 keyBy 算子导致数据倾斜(key相同导致数据都被同一个并行子任务处理)* 我们可以使用 shuffle 算子将数据均匀的在分配到其他并行子任务中去* 重点提示* shuffle 算子只能操作 DataStream不能操作 KeyedStream* */userKeyedStream.sum(id).shuffle().print();env.execute();} }运行结果 5.2 rebalance - 轮询分区 功能说明 DataStream[T] → DataStream[T]         使用Round-Robin负载均衡算法将输入的数据平均的分配到下游分区中去。    代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Rebalance {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);/** TODO 问题由于 keyBy 算子导致数据倾斜(key相同导致数据都被同一个并行子任务处理)* 我们可以使用 rebalance 算子将数据均匀的在分配到其他并行子任务中去* 重点提示* rebalance 算子只能操作 DataStream不能操作 KeyedStream* */userKeyedStream.sum(id).rebalance().print();env.execute();} }运行结果 5.3 rescale - 重缩分区 功能说明 DataStream[T] → DataStream[T]         使用Round-Robin负载均衡算法将以分区为单位将输入的数据平均的分配到下游分区中去。 和rebalance的区别 rebalance将输入数据作为一个整体根据数据输入的顺序随机分发到下游分区(涉及到了网络传输)           rescale将以上游分区为单位随机的分配到下游分区中去 使用场景     当source算子为可并发数据源时(如kafka5个分区)设置5个Task来读取分别读取每个分区的数据     此时可以使用rescale来分发到下游实现负载均衡这样可以做到数据只在本地传输而不是网络传输 5.4 global - 全局分区 功能说明 DataStream[T] → DataStream[T]         将元素分发到下游的一个分区中去  5.5 broadcast - 广播分区 功能说明 DataStream[T] → DataStream[T]         将元素广播到下游的每个分区  Tips         数据被广播后会在下游算子的每个分区中都保留一份可以将数据进行重复处理 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Broadcast {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);userKeyedStream.sum(id).broadcast().print();env.execute();} }运行结果 5.6  自定义分区 功能说明 DataStream[T] → DataStream[T]         使用用户定义的 Partitioner 将元素分发到下游算子的分区中去 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class PartitionCustom {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);/** TODO PartitionerK* 功能说明* 自定义分区器根据输入的数据获取分区编号* 泛型说明* K key的数据类型* */PartitionerLong partitioner new PartitionerLong() {Overridepublic int partition(Long key, int numPartitions) {if (key 1L || key 2L) {return 0;} else if (key 3L || key 4L) {return 1;} else {return 2;}}};/** TODO KeySelectorIN, KEY* 功能说明* key提取器根据输入的数据获取key* 泛型说明* IN 输入数据类型* KEY 输出数据类型(key)* */KeySelectorFlinkUser, Long keySelector new KeySelectorFlinkUser, Long() {Overridepublic Long getKey(FlinkUser value) throws Exception {return value.id;}};userKeyedStream.sum(id).partitionCustom(partitioner, keySelector).print();env.execute();} }运行结果 6、分流 在处理数据的时候经常会将一条流或者一个表根据某些条件拆分成多条流或者多个表 flink中提供了分流的方式1、使用filter算子分流   2、使用侧输出流分流 6.1 使用filter算子分流 - 不推荐 这种分流方式的弊端 需要将原始流复制多份并对每一份做一次判断效率很低 (多次读取多次判断) 代码示例 // 通过 filter 分流public static void ByFilter() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 根据国家将 totalStream 分为三股流DataStreamSourceString totalStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策);SingleOutputStreamOperatorString weiStream totalStream.filter(e - e.contains(魏));SingleOutputStreamOperatorString shuStream totalStream.filter(e - e.contains(蜀));SingleOutputStreamOperatorString wuStream totalStream.filter(e - e.contains(吴));weiStream.print();shuStream.print();wuStream.print();// 3.触发程序执行env.execute();} 6.2 使用侧输出流分流 - 推荐 避免了使用filter算子的弊端指定source读取一次判断一次即可完成分流操作 代码示例 // 通过 侧输入流 分流public static void ByOutputTag() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 根据国家将 totalStream 分为三股流DataStreamSourceString totalStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策);// 初始化侧输出流OutputTag weiOutputTag new OutputTag(wei, Types.STRING);OutputTag shuOutputTag new OutputTag(shu, Types.STRING);OutputTag wuOutputTag new OutputTag(wu, Types.STRING);// 通过 ProcessFunction向 侧输出流发送数据SingleOutputStreamOperatorString process totalStream.process(new ProcessFunctionString, String() {Overridepublic void processElement(String value, ProcessFunctionString, String.Context ctx, CollectorString out) throws Exception {// 往侧输出流中发送数据if (value.contains(魏)) {ctx.output(weiOutputTag, value);} else if (value.contains(蜀)) {ctx.output(shuOutputTag, value);} else if (value.contains(吴)) {ctx.output(wuOutputTag, value);}}});SideOutputDataStream weiStream process.getSideOutput(weiOutputTag);SideOutputDataStream shuStream process.getSideOutput(shuOutputTag);SideOutputDataStream wuStream process.getSideOutput(wuOutputTag);weiStream.print();shuStream.print();wuStream.print();// 3.触发程序执行env.execute();} 7、合并流
http://www.huolong8.cn/news/183940/

相关文章:

  • 网站开发询价方案业之峰装饰公司简介
  • 深圳成交型网站建设石景山网站建设有哪些公司
  • 秦皇岛网站制作小程序开发开发平台 英文
  • 北京网站建设搜q.479185700定制营销型网站公司
  • 雁塔免费做网站验证码平台 wordpress
  • 保定网站制作软件网站改版怎样做301
  • 什么网站做顶置便宜温州网站制作网站
  • 怎么下载网站备案号网站开发网页前置开发
  • 怎么用eclipse做网站开发郑州网站建设乛汉狮网络
  • 网站建设 jz.woonl网站速度
  • 学校教育网站模板dede 网站目录
  • 做网站可以用什么数据库网站前端与后台必须同时做吗
  • 泸州百拓网站建设河北网站开发网站
  • 小程序建设吉林关键词优化的方法
  • 网站建设怎么找客源?免费推广网站入口
  • 网站源码怎么上传有没有做高仿手表的网站
  • 做游戏音频下载网站济南网站建设公司电子商务网站
  • 深圳定制家具厂佛山网站优化推广方案
  • 怎么做网站内链大型免费网页游戏排行榜
  • 清远市专业网站制作wordpress评论内网ip
  • 丹徒网站建设方案合理的网站结构
  • 长沙哪里有网站制作2021中国企业500强
  • 做钓鱼网站教程视频可信网站标准版
  • 做网站的公司现在还 赚钱吗义乌网图科技有限公司怎么样
  • 最全的ppt模板网站微网站背景图片
  • 怎么清空wordpress媒体库烟台网站排名seo
  • 建设网站都要学些什么问题建设银行分期手机网站
  • 网站发布教程视频教程萧山人才网手机版
  • 无锡网站制作公司报价昆明网站建设外包
  • 简述电子商务网站开发流程成都建站网址