做的网站一定要收录么,福田祥菱官网,网站配色 原则,什么网站下载素材做图的目录
1.采集流程
2.项目架构
3.resources目录下的log4j.properties文件
4.依赖
5.ODS层——OdsApp
6.环境入口类——CreateEnvUtil
7.kafka工具类——KafkaUtil
8.启动集群项目 这一层要从Mysql读取数据#xff0c;分为事实数据和维度数据#xff0c;将不同类型的数据…目录
1.采集流程
2.项目架构
3.resources目录下的log4j.properties文件
4.依赖
5.ODS层——OdsApp
6.环境入口类——CreateEnvUtil
7.kafka工具类——KafkaUtil
8.启动集群项目 这一层要从Mysql读取数据分为事实数据和维度数据将不同类型的数据进行不同的ETL处理发送到kakfa中。
代码
1.采集流程 2.项目架构 3.resources目录下的log4j.properties文件
log4j.rootLoggererror,stdout
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.targetSystem.out
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n
4.依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.tms.realtime/groupIdartifactIdtms-realtime/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingjava.version1.8/java.versionflink.version1.17.0/flink.versionhadoop.version3.3.4/hadoop.versionflink-cdc.version2.3.0/flink-cdc.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.68/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-reload4j/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/versionscopeprovided/scope/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion${flink-cdc.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion2.4.11/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-auth/artifactIdversion${hadoop.version}/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-reload4j/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.20/version/dependencydependencygroupIdredis.clients/groupIdartifactIdjedis/artifactIdversion3.3.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version/dependencydependencygroupIdru.yandex.clickhouse/groupIdartifactIdclickhouse-jdbc/artifactIdversion0.3.2/versionexclusionsexclusiongroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/exclusionexclusiongroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactId/exclusion/exclusions/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/excludeexcludeorg.apache.hadoop:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --!-- 打包时不复制META-INF下的签名文件避免报非法签名文件的SecurityExceptions异常--artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformers combine.childrenappend!-- The service transformer is needed to merge META-INF/services files --!-- connector和format依赖的工厂类打包时会相互覆盖需要使用ServicesResourceTransformer解决--transformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer//transformers/configuration/execution/executions/plugin/plugins/build/project
5.ODS层——OdsApp
package com.atguigu.tms.realtime.app.ods;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;/*** ODS数据的采集*/
public class OdsApp {public static void main(String[] args) throws Exception {// TODO 1.获取流处理环境并指定检查点StreamExecutionEnvironment env CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// TODO 2.使用FlinkCDC从Mysql中读取数据-事实数据-保存到kafkaString dwdOption dwd;String dwdServerId 6030;String dwdSourceName ods_app_dwd_source;mysqlToKafka(dwdOption, dwdServerId, dwdSourceName, env, args);// TODO 3.使用FlinkCDC从Mysql中读取数据-维度数据-保存到kafkaString realtimeOption realtime_dim;String realtimeServerId 6040;String realtimeSourceName ods_app_realtimeDim_source;mysqlToKafka(realtimeOption, realtimeServerId, realtimeSourceName, env, args);env.execute();}public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {// TODO 1.使用FlinkCDC从Mysql中读取数据MySqlSourceString mysqlSource CreateEnvUtil.getMysqlSource(option, serverId, args);SingleOutputStreamOperatorString strDS env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), sourceName).setParallelism(1)// 并行度设置为1的原因是防止乱序.uid(option sourceName);// TODO 2.简单的ETLSingleOutputStreamOperatorString processDS strDS.process(new ProcessFunctionString, String() {Overridepublic void processElement(String jsonStr, ProcessFunctionString, String.Context ctx, CollectorString out) throws Exception {try {// 将json字符串转为json对象JSONObject jsonObj JSON.parseObject(jsonStr);// after属性不为空并且不是删除if (jsonObj.getJSONObject(after) ! null !d.equals(jsonObj.getString(op))) {// 为了防止歧义将ts_ms字段改为tsLong tsMs jsonObj.getLong(ts_ms);jsonObj.put(ts, tsMs);jsonObj.remove(ts_ms);// 移除原来的ts_ms字段// 符合条件以后向下传递之前先将json对象转为json字符串out.collect(jsonObj.toJSONString());}} catch (Exception e) {e.printStackTrace();Log.error(从Flink-CDC得到的数据不是一个标准的json格式);}}}).setParallelism(1);// 防止乱序// TODO 3.按照主键进行分许避免出现乱序主键就是after下的id字段KeyedStreamString, String keyedDS processDS.keyBy(new KeySelectorString, String() {Overridepublic String getKey(String jsonStr) throws Exception {// 获取当前的key// 流中的字符串转为json对象JSONObject jsonObj JSON.parseObject(jsonStr);return jsonObj.getJSONObject(after).getString(id);}});// TODO 4.将数据写到kafka主题中keyedDS.sinkTo(KafkaUtil.getKafkaSink(tms_ods, sourceName _transPre, args)).uid(option _ods_app_sink);}
}
6.环境入口类——CreateEnvUtil
package com.atguigu.tms.realtime.utils;import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverterConfig;import java.util.HashMap;/*** 获取执行环境* flinkCDC读取mysqlSource的原因是将自己伪装成从机*/
public class CreateEnvUtil {//获取流处理环境public static StreamExecutionEnvironment getStreamEnv(String[] args) {//TODO 1.基本环境准备//1.1 指定流处理环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//TODO 2.检查点相关的设置//2.1 开启检查点env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);//2.2 设置检查点的超时时间env.getCheckpointConfig().setCheckpointTimeout(120000L);//2.3 设置job取消之后 检查点是否保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 设置两个检查点之间的最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);//2.5 设置重启策略env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));//2.6 设置状态后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(hdfs://hadoop102:8020/tms/ck);//2.7 设置操作hdfs的用户//获取命令行参数ParameterTool parameterTool ParameterTool.fromArgs(args);String hdfsUserName parameterTool.get(hadoop-user-name, atguigu);System.setProperty(HADOOP_USER_NAME, hdfsUserName);return env;}//获取MySqlSourcepublic static MySqlSourceString getMysqlSource(String option, String serverId, String[] args) {ParameterTool parameterTool ParameterTool.fromArgs(args);String mysqlHostname parameterTool.get(mysql-hostname, hadoop102);int mysqlPort Integer.valueOf(parameterTool.get(mysql-port, 3306));String mysqlUsername parameterTool.get(mysql-username, root);String mysqlPasswd parameterTool.get(mysql-passwd, root);option parameterTool.get(start-up-options, option);// serverId是对服务器节点进行标记serverId parameterTool.get(server-id, serverId);// 创建配置信息 Map 集合将 Decimal 数据类型的解析格式配置 k-v 置于其中HashMap config new HashMap();config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());// 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema该 Schema 将用于 MysqlSource 的初始化JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema new JsonDebeziumDeserializationSchema(false, config);MySqlSourceBuilderString builder MySqlSource.Stringbuilder().hostname(mysqlHostname).port(mysqlPort).username(mysqlUsername).password(mysqlPasswd).deserializer(jsonDebeziumDeserializationSchema);// 读取的数据可能是维度或事实需要通过标记来区分从而对不同类型的数据进不同的处理switch (option) {// 读取事实数据case dwd:String[] dwdTables new String[]{tms.order_info,tms.order_cargo,tms.transport_task,tms.order_org_bound};// 只读取这4个事实表return builder.databaseList(tms).tableList(dwdTables).startupOptions(StartupOptions.latest())// 表示从mysql的binlog最新位置读取最新的数据.serverId(serverId).build();// 读取维度数据case realtime_dim:String[] realtimeDimTables new String[]{tms.user_info,tms.user_address,tms.base_complex,tms.base_dic,tms.base_region_info,tms.base_organ,tms.express_courier,tms.express_courier_complex,tms.employee_info,tms.line_base_shift,tms.line_base_info,tms.truck_driver,tms.truck_info,tms.truck_model,tms.truck_team};// 读取维度数据表15张return builder.databaseList(tms).tableList(realtimeDimTables).startupOptions(StartupOptions.initial())// 表示在第一次启动时对监控的数据库表执行初始快照并继续读取最新的binlog。.serverId(serverId).build();case config_dim:return builder.databaseList(tms_config).tableList(tms_config.tms_config_dim).startupOptions(StartupOptions.initial()).serverId(serverId).build();}Log.error(不支持的操作类型);return null;}
}
7.kafka工具类——KafkaUtil
package com.atguigu.tms.realtime.utils;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;import java.io.IOException;/*** 操作Kafka的工具类*/
public class KafkaUtil {private static final String KAFKA_SERVER hadoop102:9092,hadoop103:9092,hadoop104:9092;// 获取kafkaSink的方法 事务id的前缀public static KafkaSinkString getKafkaSink(String topic, String transIdPrefix, String[] args) {// 使用args参数的原因是为了从外部获取参数。在Java中args是一个命令行参数数组当你在命令行中运行Java程序时你可以通过在命令行中输入参数来传递数据给程序。// 将命令行参数对象封装为 ParameterTool 类对象ParameterTool parameterTool ParameterTool.fromArgs(args);// 提取命令行传入的 key 为 topic 的配置信息并将默认值指定为方法参数 topic// 当命令行没有指定 topic 时会采用默认值topic parameterTool.get(topic, topic);// 如果命令行没有指定主题名称且默认值为 null 则抛出异常if (topic null) {throw new IllegalArgumentException(主题名不可为空命令行传参为空且没有默认值!);}// 获取命令行传入的 key 为 bootstrap-servers 的配置信息并指定默认值String bootstrapServers parameterTool.get(bootstrap-severs, KAFKA_SERVER);// 获取命令行传入的 key 为 transaction-timeout 的配置信息并指定默认值String transactionTimeout parameterTool.get(transaction-timeout, 15 * 60 * 1000 );KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(transIdPrefix).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout).build();return kafkaSink;}// 使用这个就只需要传入topic和args即可public static KafkaSinkString getKafkaSink(String topic, String[] args) {return getKafkaSink(topic, topic _trans, args);}
}
8.启动集群项目 开启消费者然后启动java项目即可