当前位置: 首页 > news >正文

网站的运营和维护uniapp商城源码

网站的运营和维护,uniapp商城源码,深圳建设集团有限公司地址,徐州网站开发怎样Flink DataStream API的基本使用 文章目录 前言1. 基本使用方法2. 核心示例代码3. 完成工程代码pom.xmlWordCountExample测试验证 4. Stream 执行环境5. 参考文档 前言 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流是一个持续生成数据的数据源#xff0…Flink DataStream API的基本使用 文章目录 前言1. 基本使用方法2. 核心示例代码3. 完成工程代码pom.xmlWordCountExample测试验证 4. Stream 执行环境5. 参考文档 前言 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流是一个持续生成数据的数据源它没有明确的结束点例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流是一个具有明确开始和结束点的数据集例如一个文件或数据库表。这种类型的数据流通常在批处理场景中使用其中所有数据都已经可用并可以一次性处理。 Flink的DataStream API提供了一套丰富的操作符如map、filter、reduce、aggregations、windowing、join等以支持各种复杂的数据处理和分析需求。此外DataStream API还提供了容错保证能确保在发生故障时应用程序能从最近的检查点checkpoint恢复从而实现精确一次exactly-once的处理语义。 1. 基本使用方法 创建执行环境: 每一个Flink程序都需要创建一个StreamExecutionEnvironment执行环境它可以被用来设置参数和创建从外部系统读取数据的流。 final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();创建数据流: 你可以从各种数据源中创建数据流如本地集合文件socket等。下面的代码是从本地集合创建数据流的示例 DataStreamString dataStream env.fromElements(hello, flink);转换操作: Flink提供了丰富的转换操作如mapfilterreduce等。以下代码首先将每个字符串映射为其长度然后过滤出长度大于5的元素 DataStreamInteger transformedStream dataStream.map(s - s.length()).filter(l - l 5);数据输出: Flink支持将数据流输出到各种存储系统如文件socket数据库等。下面的代码将数据流输出到标准输出 transformedStream.print();执行程序: 将上述所有步骤放在main函数中并在最后调用env.execute()方法来启动程序。Flink程序是懒加载的只有在调用execute方法时才会真正开始执行。 env.execute(Flink Basic API Usage);2. 核心示例代码 使用Flink DataStream API构建一个实时Word Count程序它会从一个socket端口读取文本数据统计每个单词的出现次数并将结果输出到标准输出。 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流从socket接收数据需要在本地启动一个端口为9000的socket服务器DataStreamString textStream env.socketTextStream(localhost, 9000);// 3. 转换操作DataStreamTuple2String, Integer wordCountStream textStream.flatMap(new LineSplitter()) // 将文本行切分为单词.keyBy(0) // 按单词分组.sum(1); // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute(Socket Word Count Example);}// 自定义一个FlatMapFunction将输入的每一行文本切分为单词并输出为Tuple2第一个元素是单词第二个元素是计数初始值为1public static class LineSplitter implements FlatMapFunctionString, Tuple2String, Integer {Overridepublic void flatMap(String line, CollectorTuple2String, Integer out) {for (String word : line.split( )) {out.collect(new Tuple2(word, 1));}}} }3. 完成工程代码 下面是一个基于Apache Flink的实时单词计数应用程序的完整工程代码包括Pom.xml文件和所有Java类。 pom.xml project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.example/groupIdartifactIdflink-wordcount-example/artifactIdversion1.0-SNAPSHOT/versionpackagingjar/packagingpropertiesflink.version1.13.2/flink.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion${flink.version}/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.8.1/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/plugin/plugins/build /projectWordCountExample import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流从socket接收数据需要在本地启动一个端口为9000的socket服务器DataStreamString textStream env.socketTextStream(localhost, 9000);// 3. 转换操作DataStreamTuple2String, Integer wordCountStream textStream.flatMap(new LineSplitter()) // 将文本行切分为单词.keyBy(0) // 按单词分组.sum(1); // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute(Socket Word Count Example);}// 自定义一个FlatMapFunction将输入的每一行文本切分为单词并输出为Tuple2第一个元素是单词第二个元素是计数初始值为1public static class LineSplitter implements FlatMapFunctionString, Tuple2String, Integer {Overridepublic void flatMap(String line, CollectorTuple2String, Integer out) {for (String word : line.split( )) {out.collect(new Tuple2(word, 1));}}} }现在你可以使用Maven编译并运行这个程序。在启动程序之前你需要在本地启动一个端口为9000的Socket服务器。这可以通过使用Netcat工具 (nc -lk 9000) 或者其他任何能打开端口的工具实现。然后你可以输入文本行Flink程序会统计每个单词出现的次数并实时打印结果。 测试验证 用py在本地启动一个socket服务器监听9000端口 python比较简单实现一个socket通信 。写一个Python来验证上面写的例子。 import socketserver_socket socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((localhost, 9000)) server_socket.listen(1)print(Waiting for connection...) client_socket, client_address server_socket.accept() print(Connected to:, client_address)while True:data input(Enter text: )client_socket.sendall(data.encode())运行Flink程序和Python socket服务器然后在Python程序中输入文本 会看到Flink程序实时统计每个单词出现的次数并输出到控制台。 4. Stream 执行环境 开发学习过程中不需要关注。每个 Flink 应用都需要有执行环境在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。 DataStream API 将你的应用构建为一个 job graph并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。 注意如果没有调用 execute()应用就不会运行。 Flink runtime: client, job manager, task managers 此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。 5. 参考文档 https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/
http://www.yutouwan.com/news/466750/

相关文章:

  • 网站做响应式还是移动端qq邮箱 wordpress
  • 合适的网站建设的公司怎么找投资公司取名字大全
  • 医院网站建设计划wordpress做游戏网站
  • 软装公司网站建设网站建设业务好做吗
  • 顺义做网站公司模板建站3000是不是贵了
  • 微信制作网站设计网站开发编程
  • 广东省建设网官网旺道seo推广效果怎么样
  • 子网站怎么建设湛江正规网站制作方案
  • 德化住房和城乡建设网站网站建设模块一项目三
  • 一学一做看视频网站有哪些内容深圳做网站得外包公司有哪些
  • 公司域名注册后怎么建设网站凡科网站做的好不好
  • 医疗手机网站好乐买的网站推广方式
  • 用视频做网站背景小程序appld
  • 网站建设比赛方案东莞市建设规划局网站
  • 山东省建设厅职业资格注册中心网站做网站怎么这么贵
  • 开发定制网站公司南昌的网站设计
  • 柳城企业网站制作哪家好学广告设计要学什么软件
  • 四平网站建设电话中国建设银行官网站黄金部王毅
  • 网站开发代码 免责声明wordpress还是dede
  • 长沙精品网站建设公司网站建设用到的工具
  • 亿省心网站托管做网站是要云空间吗
  • 安康网站开发网站结构设计怎么写
  • 互联网网站文化上海学校网站建设
  • 整个网站的关键词工程承包合同范本免费
  • 网站建设的开发程序政务类网站建设
  • 做网站是用myecli辽宁城乡建设集团 网站
  • 上海网站建设公司价格广告设计图片素材免费
  • 营销型网站设计公司经典营销型网站
  • 邢台做网站哪个网络公司好阿里云建站可不可以备案
  • 烟台学校网站建设网站出现404