网站 平均加载时间,建设部网站监理注销查询,服装设计手绘,ppt代做长久已来#xff0c;SQL以其简单易用、开发效率高等优势一直是ETL的首选编程语言#xff0c;在构建数据仓库和数据湖的过程中发挥着不可替代的作用。Hive和Spark SQL也正是立足于这一点#xff0c;才在今天的大数据生态中牢牢占据着主力位置。在常规的Spark环境中#xff0…长久已来SQL以其简单易用、开发效率高等优势一直是ETL的首选编程语言在构建数据仓库和数据湖的过程中发挥着不可替代的作用。Hive和Spark SQL也正是立足于这一点才在今天的大数据生态中牢牢占据着主力位置。在常规的Spark环境中开发者可以使用spark-sql命令直接执行SQL文件这是一项看似平平无奇实则非常重要的功能一方面这一方式极大地降低了Spark的使用门槛用户只要会写SQL就可以使用Spark另一方面通过命令行驱动SQL文件的执行可以极大简化SQL作业的提交工作使得作业提交本身被“代码化”为大规模工程开发和自动化部署提供了便利。
但遗憾的是Amazon EMR Serverless 未能针对执行SQL文件提供原生支持用户只能在Scala/Python代码中嵌入SQL语句这对于倚重纯SQL开发数仓或数据湖的用户来说并不友好。为此我们专门开发了一组用于读取、解析和执行SQL文件的工具类借助这组工具类用户可以在 Amazon EMR Serverless 上直接执行SQL文件本文将详细介绍一下这一方案。
1. 方案设计
鉴于在Spark编程环境中执行SQL语句的方法是spark.sql(...)我们可以设计一个通用的作业类该类在启动时会根据传入的参数读取指定位置上的SQL文件然后拆分成单条SQL并调用spark.sql(...)执行。为了让作业类更加灵活和通用还可以引入通配符一次加载并执行多个SQL文件。此外ETL作业经常需要根据作业调度工具生成的时间参数去执行相应的批次这些参数同样会作用到SQL中所以作业类还应允许用户在SQL文件中嵌入自定义变量并在提交作业时以参数形式为自定义变量赋值。基于这种设计思路我们开发了一个项目实现了上述功能项目地址为
项目名称项目地址Amazon EMR Serverless Utilitieshttps://github.com/bluishglc/emr-serverless-utils
项目中的com.github.emr.serverless.SparkSqlJob类即为通用的SQL作业类该类接受两个可选参数分别是
参数说明取值示例–sql-files指定要执行的SQL文件路径支持Java文件系统通配符可指定多个文件一起执行s3://my-spark-sql-job/sqls/insert-into-*.sql–sql-params以K1V1,K2V2,...形式为SQL文件中定义的${K1},${K2},…形式的变量设值CUST_CITYNEW YORK,ORD_DATE2008-07-15
该方案具备如下特性
① 允许单一SQL文件包含多条SQL语句 ② 允许在SQL文件中使用${K1},${K2},…的形式定义变量并在执行作业时使用K1V1,K2V2,...形式的参数进行变量赋值 ③ 支持Java文件系统通配符可一次执行多个SQL文件
下面我们将分别在AWS控制台和命令行两种环境下介绍并演示如何使用该项目的工具类提交纯SQL作业。
2. 实操演示
2.1. 环境准备
在EMR Serverless上提交作业时需要准备一个“EMR Serverless Application”和一个“EMR Serverless Job Execution Role”其中后者应具有S3和Glue Data Catalog的读写权限。Application可以在EMR Serverless控制台EMR Studio上通过向导轻松创建全默认配置即可Execution Role可以使用 《CDC一键入湖当 Apache Hudi DeltaStreamer 遇见 Serverless Spark》 一文第5节提供的脚本快速创建。
接下来要准备提交作业所需的Jar包和SQL文件。首先在S3上创建一个存储桶本文使用的桶取名my-spark-sql-job当您在自己的环境中操作时请注意替换桶名然后从 [ 此处 ] 下载编译好的 emr-serverless-utils.jar包并上传至s3://my-spark-sql-job/jars/目录下 在演示过程中还将使用到5个SQL示例文件从 [ 此处 ] 下载解压后上传至s3://my-spark-sql-job/sqls/目录下 2.2. 在控制台上提交纯SQL文件作业
2.2.1. 执行单一SQL文件
打开EMR Serverless的控制台EMR Studio在选定的EMR Serverless Application下提交一个如下的Job ① Script location设定为此前上传的Jar包路径 s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar ② Main class设定为 com.github.emr.serverless.SparkSqlJob ③ Script arguments设定为 [--sql-files,s3://my-spark-sql-job/sqls/drop-tables.sql]
至于其他选项无需特别设定保持默认配置即可对于在生产环境中部署的作业您可以结合自身作业的需要灵活配置例如Spark Driver/Executor的资源分配等。需要提醒的是通过控制台创建的作业默认会启用Glue Data Catalog即Additional settings - Metastore configuration - Use AWS Glue Data Catalog 默认是勾选的为了方便在Glue和Athena中检查SQL脚本的执行结果建议您不要修改此项默认配置。
上述配置描述了这样一项工作以s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar中的com.github.emr.serverless.SparkSqlJob作为主类提起一个Spark作业。其中[--sql-files,s3://my-spark-sql-job/sqls/drop-tables.sql]是传递给SparkSqlJob的参数用于告知作业所要执行的SQL文件位置。本次作业执行的SQL文件只有三条简单的DROP TABLE语句是一个基础示例用以展示工具类执行单一文件内多条SQL语句的能力。
2.2.2. 执行带自定义参数的SQL文件
接下来要演示的是工具类的第二项功能执行带自定义参数的SQL文件。新建或直接复制上一个作业在控制台上选定上一个作业依次点击 Actions - Clone job然后将“Script arguments”的值设定为
[--sql-files,s3://my-spark-sql-job/sqls/create-tables.sql,--sql-params,APP_S3_HOMEs3://my-spark-sql-job]如下图所示 这次的作业设定除了使用--sql-files参数指定了SQL文件外还通过--sql-params参数为SQL中出现的用户自定义变量进行了赋值。根据此前的介绍APP_S3_HOMEs3://my-spark-sql-job是一个“KeyValue”字符串其含义是将值s3://my-spark-sql-job赋予了变量APP_S3_HOMESQL中所有出现${APP_S3_HOME}的地方都将被s3://my-spark-sql-job所替代。查看create-tables.sql文件在建表语句的LOCATION部分可以发现自定义变量${APP_S3_HOME}
CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (... ...
)
... ...
LOCATION ${APP_S3_HOME}/data/orders/;当SparkSqlJob读取该SQL文件时会根据键值对字符串APP_S3_HOMEs3://my-spark-sql-job将SQL文件中所有的${APP_S3_HOME}替换为s3://my-spark-sql-job实际执行的SQL将变为
CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (... ...
)
... ...
LOCATION s3://my-spark-sql-job/data/orders/;提交作业并执行完毕后可登录Athena控制台查看数据表是否创建成功。
2.2.3. 使用通配符执行多个文件
有时候我们需要批量执行一个文件夹下的所有SQL文件或者使用通配符选择性的执行部分SQL文件SparkSqlJob使用了Java文件系统通配符来支持这类需求。下面的作业就演示了通配符的使用方法同样是新建或直接复制上一个作业然后将“Script arguments”的值设定为
[--sql-files,s3://my-spark-sql-job/sqls/insert-into-*.sql]如下图所示 这次作业的--sql-files参数使用了路径通配符insert-into-*.sql将同时匹配insert-into-orders.sql和insert-into-customers.sql两个SQL文件它们将分别向ORDERS和CUSTOMERS两张表插入多条记录。执行完毕后可以可登录Athena控制台查看数据表中是否有数据产生。
2.2.4. 一个复合示例
最后我们来提交一个更有代表性的复合示例文件通配符 用户自定义参数。再次新建或直接复制上一个作业然后将“Script arguments”的值设定为
[--sql-files,s3://my-spark-sql-job/sqls/select-*.sql,--sql-params,APP_S3_HOMEs3://my-spark-sql-job,CUST_CITYNEW YORK,ORD_DATE2008-07-15]如下图所示
![emr-serverless-snapshot-4.jpg-150.8kB][6]
本次作业的--sql-files参数使用路径通配符select-*.sql匹配select-tables.sql文件该文件中存在三个用户自定义变量分别是${APP_S3_HOME}、${CUST_CITY}、${ORD_DATE}
CREATE EXTERNAL TABLE ORDERS_CUSTOMERS... ...LOCATION ${APP_S3_HOME}/data/orders_customers/
AS SELECT... ...
WHEREC.CUST_CITY ${CUST_CITY} ANDO.ORD_DATE CAST(${ORD_DATE} AS DATE);--sql-params参数为这三个自定义变量设置了取值分别是APP_S3_HOMEs3://my-spark-sql-jobCUST_CITYNEW YORKORD_DATE2008-07-15于是上述SQL将被转化为如下内容去执行
CREATE EXTERNAL TABLE ORDERS_CUSTOMERS... ...LOCATION s3://my-spark-sql-job/data/orders_customers/
AS SELECT... ...
WHEREC.CUST_CITY NEW YORK ANDO.ORD_DATE CAST(2008-07-15 AS DATE);至此通过控制台提交纯SQL文件作业的所有功能演示完毕。
2.3. 通过命令行提交纯SQL文件作业
实际上很多EMR Serverless用户并不在控制台上提交自己的作业而是通过AWS CLI提交这种方式方式多见于工程代码或作业调度中。所以我们再来介绍一下如何通过命令行提交纯SQL文件作业。
本文使用命令行提交EMR Serverless作业的方式遵循了《最佳实践如何优雅地提交一个 Amazon EMR Serverless 作业》一文给出的最佳实践。首先登录一个安装了AWS CLI并配置有用户凭证的Linux环境建议使用Amazon Linux2先使用命令sudo yum -y install jq安装操作json文件的命令行工具jq后续脚本会使用到它然后完成如下前期准备工作
① 创建或选择一个作业专属工作目录和S3存储桶 ② 创建或选择一个EMR Serverless Execution Role ③ 创建或选择一个EMR Serverless Application
接下来将所有环境相关变量悉数导出请根据您的AWS账号和本地环境替换命令行中的相应值
export APP_NAMEchange-to-your-app-name
export APP_S3_HOMEchange-to-your-app-s3-home
export APP_LOCAL_HOMEchange-to-your-app-local-home
export EMR_SERVERLESS_APP_IDchange-to-your-application-id
export EMR_SERVERLESS_EXECUTION_ROLE_ARNchange-to-your-execution-role-arn以下是一份示例
export APP_NAMEmy-spark-sql-job
export APP_S3_HOMEs3://my-spark-sql-job
export APP_LOCAL_HOME/home/ec2-user/my-spark-sql-job
export EMR_SERVERLESS_APP_ID00fbfel40ee59k09
export EMR_SERVERLESS_EXECUTION_ROLE_ARNarn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN《最佳实践如何优雅地提交一个 Amazon EMR Serverless 作业》一文提供了多个操作Job的通用脚本都非常实用本文也会直接复用这些脚本但是由于我们需要多次提交且每次的参数又有所不同为了便于使用和简化行文我们将原文中的部分脚本封装为一个Shell函数取名为submit-spark-sql-job
submit-spark-sql-job() {sqlFiles$1sqlParams$2cat EOF $APP_LOCAL_HOME/start-job-run.json
{name:my-spark-sql-job,applicationId:$EMR_SERVERLESS_APP_ID,executionRoleArn:$EMR_SERVERLESS_EXECUTION_ROLE_ARN,jobDriver:{sparkSubmit:{entryPoint:$APP_S3_HOME/jars/emr-serverless-utils-1.0.jar,entryPointArguments:[$([[ -n $sqlFiles ]] echo \--sql-files\, \$sqlFiles\)$([[ -n $sqlParams ]] echo ,\--sql-params\, \$sqlParams\)],sparkSubmitParameters:--class com.github.emr.serverless.SparkSqlJob --conf spark.hadoop.hive.metastore.client.factory.classcom.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory}},configurationOverrides:{monitoringConfiguration:{s3MonitoringConfiguration:{logUri:$APP_S3_HOME/logs}}}
}
EOFjq . $APP_LOCAL_HOME/start-job-run.jsonexport EMR_SERVERLESS_JOB_RUN_ID$(aws emr-serverless start-job-run \--no-paginate --no-cli-pager --output text \--name my-spark-sql-job \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--execution-timeout-minutes 0 \--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \--query jobRunId)now$(date %s)secwhile true; dojobStatus$(aws emr-serverless get-job-run \--no-paginate --no-cli-pager --output text \--application-id $EMR_SERVERLESS_APP_ID \--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \--query jobRun.state)if [ $jobStatus PENDING ] || [ $jobStatus SCHEDULED ] || [ $jobStatus RUNNING ]; thenfor i in {0..5}; doecho -ne \E[33;5m The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now %H:%M:%S) ] ....\r\E[0msleep 1doneelseprintf The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]%50s\n\nbreakfidone
}该函数接受两个位置参数
① 第一位置上的参数用于指定SQL文件路径其值会传递给SparkSqlJob的--sql-files ② 第二位置上的参数用于指定SQL文件中的用户自定义变量其值会传递给SparkSqlJob的--sql-params
函数中使用的Jar包和SQL文件与《2.1. 环境准备》一节准备的Jar包和SQL文件一致所以使用脚本提交作业前同样需要完成2.1节的环境准备工作。接下来我们就使用该函数完成与2.2节一样的操作。
2.3.1. 执行单一SQL文件
本节操作与2.2.1节完全一致只是改用了命令行方式实现命令如下
submit-spark-sql-job $APP_S3_HOME/sqls/drop-tables.sql2.3.2. 执行带自定义参数的SQL文件
本节操作与2.2.2节完全一致只是改用了命令行方式实现命令如下
submit-spark-sql-job $APP_S3_HOME/sqls/create-tables.sql APP_S3_HOME$APP_S3_HOME2.3.3. 使用通配符执行多个文件
本节操作与2.2.3节完全一致只是改用了命令行方式实现命令如下
submit-spark-sql-job $APP_S3_HOME/sqls/insert-into-*.sql2.3.4. 一个复合示例
本节操作与2.2.4节完全一致只是改用了命令行方式实现命令如下
submit-spark-sql-job $APP_S3_HOME/sqls/select-tables.sql APP_S3_HOME$APP_S3_HOME,CUST_CITYNEW YORK,ORD_DATE2008-07-153. 在源代码中调用工具类
尽管在Spark编程环境中可以使用spark.sql(...)形式直接执行SQL语句但是从前文示例中可以看出 emr-serverless-utils 提供的SQL文件执行能力更便捷也更强大一些所以最后我们简单介绍一下如何在源代码中调用相关的工具类获得上述SQL文件的处理能力。具体做法非常简单你只需要
① 将emr-serverless-utils-1.0.jar加载到你的类路径中 ② 声明隐式类型转换 ③ 在spark上直接调用execSqlFile() # 初始化SparkSession及其他操作
...# 声明隐式类型转换
import com.github.emr.serverless.SparkSqlSupport._# 在spark上直接调用execSqlFile()
spark.execSqlFile(s3://YOUR/XXX.sql)# 在spark上直接调用execSqlFile()
spark.execSqlFile(s3://YOUR/XXX.sql, K1V1,K2V2,...)# 其他操作
...