当前位置: 首页 > news >正文

做的网站一定要收录么福田祥菱官网

做的网站一定要收录么,福田祥菱官网,网站配色 原则,什么网站下载素材做图的目录 1.采集流程 2.项目架构 3.resources目录下的log4j.properties文件 4.依赖 5.ODS层——OdsApp 6.环境入口类——CreateEnvUtil 7.kafka工具类——KafkaUtil 8.启动集群项目 这一层要从Mysql读取数据#xff0c;分为事实数据和维度数据#xff0c;将不同类型的数据…目录 1.采集流程 2.项目架构 3.resources目录下的log4j.properties文件 4.依赖 5.ODS层——OdsApp 6.环境入口类——CreateEnvUtil 7.kafka工具类——KafkaUtil 8.启动集群项目 这一层要从Mysql读取数据分为事实数据和维度数据将不同类型的数据进行不同的ETL处理发送到kakfa中。 代码 1.采集流程 2.项目架构 3.resources目录下的log4j.properties文件 log4j.rootLoggererror,stdout log4j.appender.stdoutorg.apache.log4j.ConsoleAppender log4j.appender.stdout.targetSystem.out log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n 4.依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.tms.realtime/groupIdartifactIdtms-realtime/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingjava.version1.8/java.versionflink.version1.17.0/flink.versionhadoop.version3.3.4/hadoop.versionflink-cdc.version2.3.0/flink-cdc.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.68/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-reload4j/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/versionscopeprovided/scope/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion${flink-cdc.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion2.4.11/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-auth/artifactIdversion${hadoop.version}/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-reload4j/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.20/version/dependencydependencygroupIdredis.clients/groupIdartifactIdjedis/artifactIdversion3.3.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version/dependencydependencygroupIdru.yandex.clickhouse/groupIdartifactIdclickhouse-jdbc/artifactIdversion0.3.2/versionexclusionsexclusiongroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/exclusionexclusiongroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactId/exclusion/exclusions/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/excludeexcludeorg.apache.hadoop:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --!-- 打包时不复制META-INF下的签名文件避免报非法签名文件的SecurityExceptions异常--artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformers combine.childrenappend!-- The service transformer is needed to merge META-INF/services files --!-- connector和format依赖的工厂类打包时会相互覆盖需要使用ServicesResourceTransformer解决--transformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer//transformers/configuration/execution/executions/plugin/plugins/build/project 5.ODS层——OdsApp package com.atguigu.tms.realtime.app.ods;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.tms.realtime.utils.CreateEnvUtil; import com.atguigu.tms.realtime.utils.KafkaUtil; import com.esotericsoftware.minlog.Log; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;/*** ODS数据的采集*/ public class OdsApp {public static void main(String[] args) throws Exception {// TODO 1.获取流处理环境并指定检查点StreamExecutionEnvironment env CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// TODO 2.使用FlinkCDC从Mysql中读取数据-事实数据-保存到kafkaString dwdOption dwd;String dwdServerId 6030;String dwdSourceName ods_app_dwd_source;mysqlToKafka(dwdOption, dwdServerId, dwdSourceName, env, args);// TODO 3.使用FlinkCDC从Mysql中读取数据-维度数据-保存到kafkaString realtimeOption realtime_dim;String realtimeServerId 6040;String realtimeSourceName ods_app_realtimeDim_source;mysqlToKafka(realtimeOption, realtimeServerId, realtimeSourceName, env, args);env.execute();}public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {// TODO 1.使用FlinkCDC从Mysql中读取数据MySqlSourceString mysqlSource CreateEnvUtil.getMysqlSource(option, serverId, args);SingleOutputStreamOperatorString strDS env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), sourceName).setParallelism(1)// 并行度设置为1的原因是防止乱序.uid(option sourceName);// TODO 2.简单的ETLSingleOutputStreamOperatorString processDS strDS.process(new ProcessFunctionString, String() {Overridepublic void processElement(String jsonStr, ProcessFunctionString, String.Context ctx, CollectorString out) throws Exception {try {// 将json字符串转为json对象JSONObject jsonObj JSON.parseObject(jsonStr);// after属性不为空并且不是删除if (jsonObj.getJSONObject(after) ! null !d.equals(jsonObj.getString(op))) {// 为了防止歧义将ts_ms字段改为tsLong tsMs jsonObj.getLong(ts_ms);jsonObj.put(ts, tsMs);jsonObj.remove(ts_ms);// 移除原来的ts_ms字段// 符合条件以后向下传递之前先将json对象转为json字符串out.collect(jsonObj.toJSONString());}} catch (Exception e) {e.printStackTrace();Log.error(从Flink-CDC得到的数据不是一个标准的json格式);}}}).setParallelism(1);// 防止乱序// TODO 3.按照主键进行分许避免出现乱序主键就是after下的id字段KeyedStreamString, String keyedDS processDS.keyBy(new KeySelectorString, String() {Overridepublic String getKey(String jsonStr) throws Exception {// 获取当前的key// 流中的字符串转为json对象JSONObject jsonObj JSON.parseObject(jsonStr);return jsonObj.getJSONObject(after).getString(id);}});// TODO 4.将数据写到kafka主题中keyedDS.sinkTo(KafkaUtil.getKafkaSink(tms_ods, sourceName _transPre, args)).uid(option _ods_app_sink);} } 6.环境入口类——CreateEnvUtil package com.atguigu.tms.realtime.utils;import com.esotericsoftware.minlog.Log; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; import org.apache.kafka.connect.json.DecimalFormat; import org.apache.kafka.connect.json.JsonConverterConfig;import java.util.HashMap;/*** 获取执行环境* flinkCDC读取mysqlSource的原因是将自己伪装成从机*/ public class CreateEnvUtil {//获取流处理环境public static StreamExecutionEnvironment getStreamEnv(String[] args) {//TODO 1.基本环境准备//1.1 指定流处理环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//TODO 2.检查点相关的设置//2.1 开启检查点env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);//2.2 设置检查点的超时时间env.getCheckpointConfig().setCheckpointTimeout(120000L);//2.3 设置job取消之后 检查点是否保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 设置两个检查点之间的最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);//2.5 设置重启策略env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));//2.6 设置状态后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(hdfs://hadoop102:8020/tms/ck);//2.7 设置操作hdfs的用户//获取命令行参数ParameterTool parameterTool ParameterTool.fromArgs(args);String hdfsUserName parameterTool.get(hadoop-user-name, atguigu);System.setProperty(HADOOP_USER_NAME, hdfsUserName);return env;}//获取MySqlSourcepublic static MySqlSourceString getMysqlSource(String option, String serverId, String[] args) {ParameterTool parameterTool ParameterTool.fromArgs(args);String mysqlHostname parameterTool.get(mysql-hostname, hadoop102);int mysqlPort Integer.valueOf(parameterTool.get(mysql-port, 3306));String mysqlUsername parameterTool.get(mysql-username, root);String mysqlPasswd parameterTool.get(mysql-passwd, root);option parameterTool.get(start-up-options, option);// serverId是对服务器节点进行标记serverId parameterTool.get(server-id, serverId);// 创建配置信息 Map 集合将 Decimal 数据类型的解析格式配置 k-v 置于其中HashMap config new HashMap();config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());// 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema该 Schema 将用于 MysqlSource 的初始化JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema new JsonDebeziumDeserializationSchema(false, config);MySqlSourceBuilderString builder MySqlSource.Stringbuilder().hostname(mysqlHostname).port(mysqlPort).username(mysqlUsername).password(mysqlPasswd).deserializer(jsonDebeziumDeserializationSchema);// 读取的数据可能是维度或事实需要通过标记来区分从而对不同类型的数据进不同的处理switch (option) {// 读取事实数据case dwd:String[] dwdTables new String[]{tms.order_info,tms.order_cargo,tms.transport_task,tms.order_org_bound};// 只读取这4个事实表return builder.databaseList(tms).tableList(dwdTables).startupOptions(StartupOptions.latest())// 表示从mysql的binlog最新位置读取最新的数据.serverId(serverId).build();// 读取维度数据case realtime_dim:String[] realtimeDimTables new String[]{tms.user_info,tms.user_address,tms.base_complex,tms.base_dic,tms.base_region_info,tms.base_organ,tms.express_courier,tms.express_courier_complex,tms.employee_info,tms.line_base_shift,tms.line_base_info,tms.truck_driver,tms.truck_info,tms.truck_model,tms.truck_team};// 读取维度数据表15张return builder.databaseList(tms).tableList(realtimeDimTables).startupOptions(StartupOptions.initial())// 表示在第一次启动时对监控的数据库表执行初始快照并继续读取最新的binlog。.serverId(serverId).build();case config_dim:return builder.databaseList(tms_config).tableList(tms_config.tms_config_dim).startupOptions(StartupOptions.initial()).serverId(serverId).build();}Log.error(不支持的操作类型);return null;} } 7.kafka工具类——KafkaUtil package com.atguigu.tms.realtime.utils;import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.ProducerConfig;import java.io.IOException;/*** 操作Kafka的工具类*/ public class KafkaUtil {private static final String KAFKA_SERVER hadoop102:9092,hadoop103:9092,hadoop104:9092;// 获取kafkaSink的方法 事务id的前缀public static KafkaSinkString getKafkaSink(String topic, String transIdPrefix, String[] args) {// 使用args参数的原因是为了从外部获取参数。在Java中args是一个命令行参数数组当你在命令行中运行Java程序时你可以通过在命令行中输入参数来传递数据给程序。// 将命令行参数对象封装为 ParameterTool 类对象ParameterTool parameterTool ParameterTool.fromArgs(args);// 提取命令行传入的 key 为 topic 的配置信息并将默认值指定为方法参数 topic// 当命令行没有指定 topic 时会采用默认值topic parameterTool.get(topic, topic);// 如果命令行没有指定主题名称且默认值为 null 则抛出异常if (topic null) {throw new IllegalArgumentException(主题名不可为空命令行传参为空且没有默认值!);}// 获取命令行传入的 key 为 bootstrap-servers 的配置信息并指定默认值String bootstrapServers parameterTool.get(bootstrap-severs, KAFKA_SERVER);// 获取命令行传入的 key 为 transaction-timeout 的配置信息并指定默认值String transactionTimeout parameterTool.get(transaction-timeout, 15 * 60 * 1000 );KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(transIdPrefix).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout).build();return kafkaSink;}// 使用这个就只需要传入topic和args即可public static KafkaSinkString getKafkaSink(String topic, String[] args) {return getKafkaSink(topic, topic _trans, args);} } 8.启动集群项目 开启消费者然后启动java项目即可
http://www.huolong8.cn/news/371334/

相关文章:

  • 跨境一件代发平台安卓aso优化
  • 中国建设银行甘肃省分行官网站深圳网站建设类公司
  • 长沙 网站运营手机官方网站
  • 湛江网站建设策划方案山东君天建设工程有限公司网站
  • 韩国电商网站重庆网站制作定制
  • 网站开发用什么技术asp建设银行网站无法访问
  • 襄阳网站建设公司招聘哪个网站可以做线上翻译赚钱
  • 广东个人 网站备案wordpress 代码优化
  • 怎么做网站首页商城网站建设好么
  • 网站增加用户体验兰溪做网站
  • 怎样理解网站建设与开发这门课有什么网站是做企业型的
  • 新注册公司怎么做网站好的网页制作公司有哪些
  • 免费购物网站系统网站建设明细表
  • 做进化树的在线网站根据图片做网站用什么
  • 重庆网站的推广方式网站建设在线商城
  • 方案 网站北京免费做网站
  • 做网站主流技术济南地区做企业网站的公司
  • 北京网站开发哪家好薇网站首页广告代码
  • 十堰网站建设制作公司泰安最新消息
  • 怎么查看网站百度快照免费的国外代理ip
  • 公司网页网站建设+ppt模板下载求网站建设详细过程
  • 免费舆情网站下载大全最新版wordpress给分类做模板
  • 珠海网站快速排名提升东莞网站设计企业
  • 自己建立网站的方法医院网站前置审核
  • 我是做网站的 哪里有单接国家企业网查询
  • 企业信息网站模板静宁网站建设
  • 网站建设禁止性规定新冠疫苗接种查询
  • 网站建设佰首选金手指二八个人云服务器搭建免费
  • 网站网页能自己做吗怎么自己做歌曲网站
  • 有没有专门做中考卷子的网站山东专业网站seo