设计素材网站会员哪个最好,杭州互联网大厂,做网站和做微商城有什么区别,网站定制的销售情况一、Flink 框架介绍 Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。 Apache Spark 掀开了内存计算的先河#xff0c;以内存作为赌注#xff0c;赢得了内存计算的飞速发展。 但是在其火热的同时#xff0c;开发人员发现#xff0c;在 Spark …
一、Flink 框架介绍 Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。 Apache Spark 掀开了内存计算的先河以内存作为赌注赢得了内存计算的飞速发展。 但是在其火热的同时开发人员发现在 Spark 中计算框架普遍存在的缺点和不足依然没 有完全解决而这些问题随着 5G 时代的来临以及决策者对实时数据分析结果的迫切需要而 凸显的更加明显 数据精准一次性处理Exactly-Once乱序数据迟到数据 低延迟高吞吐准确性 容错性 Apache Flink 是一个框架和分布式处理引擎用于对无界和有界数据流进行有状态计算。在 Spark 火热的同时也默默地发展自己并尝试着解决其他计算框架的问题。 慢慢地随着这些问题的解决 Flink 慢慢被绝大数程序员所熟知并进行大力推广阿里公 司在 2015 年改进 Flink 并创建了内部分支 Blink 目前服务于阿里集团内部搜索、推荐、 广告和蚂蚁等大量核心实时业务。 二、框架集成 2.1创建 Maven 项目 依赖 ?xml version1.0 encodingUTF-8?
projectxmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.lun.es/groupIdartifactIdflink-elasticsearch/artifactIdversion1.0/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-scala_2.12/artifactIdversion1.12.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion1.12.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.12.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-elasticsearch7_2.11/artifactIdversion1.12.0/version/dependency!-- jackson --dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactIdversion2.11.1/version/dependency/dependencies
/project功能实现 package com.xmx.es;import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class FlinkElasticsearchSinkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.socketTextStream(localhost, 9999);ListHttpHost httpHosts new ArrayList();httpHosts.add(new HttpHost(127.0.0.1, 9200, http));//httpHosts.add(new HttpHost(10.2.3.1, 9200, http));// use a ElasticsearchSink.Builder to create an ElasticsearchSinkElasticsearchSink.BuilderString esSinkBuilder new ElasticsearchSink.Builder(httpHosts,new ElasticsearchSinkFunctionString() {public IndexRequest createIndexRequest(String element) {MapString, String json new HashMap();json.put(data, element);return Requests.indexRequest().index(my-index)//.type(my-type).source(json);}Overridepublic void process(String element, RuntimeContext ctx, RequestIndexer indexer) {indexer.add(createIndexRequest(element));}});// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be bufferedesSinkBuilder.setBulkFlushMaxActions(1);// provide a RestClientFactory for custom configuration on the internally createdREST client// esSinkBuilder.setRestClientFactory(// restClientBuilder - {// restClientBuilder.setDefaultHeaders(...)// restClientBuilder.setMaxRetryTimeoutMillis(...)// restClientBuilder.setPathPrefix(...)// restClientBuilder.setHttpClientConfigCallback(...)// }// );source.addSink(esSinkBuilder.build());env.execute(flink-es);}
}