公司网站建设策划方案,招聘网站建设策划书,网站开发一个人可以完成吗,内江建设局网站Flink作为一个非常优秀的大数据实时计算框架#xff0c;在很多从事大数据开发的公司都是必备的技能#xff0c;接下来我将通过Flink以批处理来实现入门案例WordCount 1:步骤一
idea新建设maven项目#xff0c;并且自己配置好maven环境 2#xff1a;步骤二
在pom文件中加…Flink作为一个非常优秀的大数据实时计算框架在很多从事大数据开发的公司都是必备的技能接下来我将通过Flink以批处理来实现入门案例WordCount 1:步骤一
idea新建设maven项目并且自己配置好maven环境 2步骤二
在pom文件中加入下面的依赖和配置 propertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncoding!--Flink 版本--flink.version1.13.0/flink.version!--JDK 版本--java.version1.8/java.versionslf4j.version1.7.30/slf4j.version!--Scala 2.11 版本--scala.binary.version2.12/scala.binary.versionmaven.compiler.source${java.version}/maven.compiler.sourcemaven.compiler.target${java.version}/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.codehaus.mojo/groupIdartifactIdexec-maven-plugin/artifactIdversion1.6.0/versionexecutionsexecutiongoalsgoaljava/goal/goals/execution/executionsconfigurationclasspathScopetest/classpathScope/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins/build 3:步骤三
配置 log4j在resources下面建一个文件log4j.properties里面内容如下
# Output pattern : date [thread] priority category - message
log4j.rootLoggerWARN,CONSOLE,RollingFile#CONSOLE
log4j.appender.CONSOLEorg.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layoutorg.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern%d [%t] %-5p [%c] - %m%n#RollingFile
log4j.appender.RollingFileorg.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingFile.Filelogs/signserver.log
log4j.appender.RollingFile.layoutorg.apache.log4j.PatternLayout
log4j.appender.RollingFile.layout.ConversionPattern%d [%t] %-5p [%c] - %m%n#Project default level
log4j.logger.com.ntko.signdebug4:步骤四
新建一个input目录并且新建words.txt文件文件中输入如下内容
hello world
hello flink
hello java 5:步骤五 新建一个BatchWordCount类里面代码如下
public class BatchWordCount {public static void main(String[] args) throws Exception {//1:创建执行环境ExecutionEnvironment executionEnvironment ExecutionEnvironment.getExecutionEnvironment();//2:从文件读取数据DataSourceString dataSource executionEnvironment.readTextFile(input/words.txt);//将每行数据进行分词转换成二元组类型FlatMapOperatorString, Tuple2String, Long returns dataSource.flatMap((String line, CollectorTuple2String, Long out) - {//将一行文本进行分词String[] words line.split( );//将每个单词转换成二元组输出for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));//按照word进行分组UnsortedGroupingTuple2String, Long tuple2UnsortedGrouping returns.groupBy(0);//分组内进行聚合统计AggregateOperatorTuple2String, Long sum tuple2UnsortedGrouping.sum(1);sum.print();}
}6:步骤六
输出结果 可以看到flink单词出现了1次world出现了1次hello出现了三次java出现了一次