东莞专业网站设计专业服务,正规少儿编程排名,中国最新军事新闻头条,网站建设客户去哪找引言
典型的Spark作业读取位于OSS的Parquet外表时#xff0c;源端的并发度#xff08;task/partition#xff09;如何确定#xff1f;特别是在做TPCH测试时有一些疑问#xff0c;如源端扫描文件的并发度是如何确定的#xff1f;是否一个parquet文件对应一个partition源端的并发度task/partition如何确定特别是在做TPCH测试时有一些疑问如源端扫描文件的并发度是如何确定的是否一个parquet文件对应一个partition多个parquet文件对应一个partition还是一个parquet文件对应多个partition本文将从源码角度进行分析进而解答这些疑问。
分析
数据源读取对应的物理执行节点为FileSourceScanExec读取数据代码块如下
lazy val inputRDD: RDD[InternalRow] {val readFile: (PartitionedFile) Iterator[InternalRow] relation.fileFormat.buildReaderWithPartitionValues(sparkSession relation.sparkSession,dataSchema relation.dataSchema,partitionSchema relation.partitionSchema,requiredSchema requiredSchema,filters pushedDownFilters,options relation.options,hadoopConf relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))val readRDD if (bucketedScan) {createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,relation)} else {createReadRDD(readFile, dynamicallySelectedPartitions, relation)}sendDriverMetrics()readRDD}
主要关注非bucket的处理对于非bucket的扫描调用createReadRDD方法定义如下
/*** Create an RDD for non-bucketed reads.* The bucketed variant of this function is [[createBucketedReadRDD]].** param readFile a function to read each (part of a) file.* param selectedPartitions Hive-style partition that are part of the read.* param fsRelation [[HadoopFsRelation]] associated with the read.*/private def createReadRDD(readFile: (PartitionedFile) Iterator[InternalRow],selectedPartitions: Array[PartitionDirectory],fsRelation: HadoopFsRelation): RDD[InternalRow] {// 文件打开开销每次打开文件最少需要读取的字节 val openCostInBytes fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes// 最大切分分片大小val maxSplitBytes FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)logInfo(sPlanning scan with bin packing, max size: $maxSplitBytes bytes, sopen cost is considered as scanning $openCostInBytes bytes.)// Filter files with bucket pruning if possibleval bucketingEnabled fsRelation.sparkSession.sessionState.conf.bucketingEnabledval shouldProcess: Path Boolean optionalBucketSet match {case Some(bucketSet) if bucketingEnabled // Do not prune the file if bucket file name is invalidfilePath BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)case _ _ true}// 对分区下文件进行切分并按照从大到小进行排序val splitFiles selectedPartitions.flatMap { partition partition.files.flatMap { file // getPath() is very expensive so we only want to call it once in this block:val filePath file.getPathif (shouldProcess(filePath)) {// 文件是否可splitparquet/orc/avro均可被splitval isSplitable relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)// 切分文件PartitionedFileUtil.splitFiles(sparkSession relation.sparkSession,file file,filePath filePath,isSplitable isSplitable,maxSplitBytes maxSplitBytes,partitionValues partition.values)} else {Seq.empty}}}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)val partitions FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)new FileScanRDD(fsRelation.sparkSession, readFile, partitions)}
可以看到确定最大切分分片大小maxSplitBytes对于后续切分为多少个文件非常重要其核心逻辑如下
def maxSplitBytes(sparkSession: SparkSession,selectedPartitions: Seq[PartitionDirectory]): Long {// 读取文件时打包成最大的partition大小默认为128MB对应一个block大小val defaultMaxSplitBytes sparkSession.sessionState.conf.filesMaxPartitionBytes// 打开每个文件的开销默认为4MBval openCostInBytes sparkSession.sessionState.conf.filesOpenCostInBytes// 建议的不保证最小分割文件分区数默认未设置从leafNodeDefaultParallelism获取// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism - SparkContext#defaultParallelism// - TaskSchedulerImpl#defaultParallelism - CoarseGrainedSchedulerBackend#defaultParallelism// - 总共多少核max(executor core总和, 2)最少为2val minPartitionNum sparkSession.sessionState.conf.filesMinPartitionNum.getOrElse(sparkSession.leafNodeDefaultParallelism)// 总共读取的大小val totalBytes selectedPartitions.flatMap(_.files.map(_.getLen openCostInBytes)).sum// 单core读取的大小val bytesPerCore totalBytes / minPartitionNum// 计算大小不会超过设置的128MBMath.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))}
对于PartitionedFileUtil#splitFiles其核心逻辑如下较为简单直接按照最大切分大小切分大文件来进行分片
def splitFiles(sparkSession: SparkSession,file: FileStatus,filePath: Path,isSplitable: Boolean,maxSplitBytes: Long,partitionValues: InternalRow): Seq[PartitionedFile] {if (isSplitable) {// 切分为多个分片(0L until file.getLen by maxSplitBytes).map { offset val remaining file.getLen - offsetval size if (remaining maxSplitBytes) maxSplitBytes else remainingval hosts getBlockHosts(getBlockLocations(file), offset, size)PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)}} else {Seq(getPartitionedFile(file, filePath, partitionValues))}}
在获取到Seq[PartitionedFile]列表后还并没有完成对文件的切分还需要调用FilePartition#getFilePartitions做最后的处理方法核心逻辑如下
def getFilePartitions(sparkSession: SparkSession,partitionedFiles: Seq[PartitionedFile],maxSplitBytes: Long): Seq[FilePartition] {val partitions new ArrayBuffer[FilePartition]val currentFiles new ArrayBuffer[PartitionedFile]var currentSize 0L/** Close the current partition and move to the next. */def closePartition(): Unit {if (currentFiles.nonEmpty) {// Copy to a new Array.// 重新生成一个新的PartitionFileval newPartition FilePartition(partitions.size, currentFiles.toArray)partitions newPartition}currentFiles.clear()currentSize 0}// 打开文件开销默认为4MBval openCostInBytes sparkSession.sessionState.conf.filesOpenCostInBytes// Assign files to partitions using Next Fit DecreasingpartitionedFiles.foreach { file if (currentSize file.length maxSplitBytes) {// 如果累加的文件大小大于的最大切分大小则关闭该分区表示完成一个Task读取的数据切分closePartition()}// Add the given file to the current partition.currentSize file.length openCostInBytescurrentFiles file}// 最后关闭一次分区文件可能较小closePartition()partitions.toSeq}
可以看到经过这一步后会把一些小文件做合并生成maxSplitBytes大小的PartitionFile这样可以避免拉起太多task读取太多小的文件。
生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的并发度为partitions的长度也即最后Spark生成的Task个数
override protected def getPartitions: Array[RDDPartition] filePartitions.toArray
整体流程图如下图所示 拆分、合并过程如下图所示 实战
对于TPCH 10G生成的customer parquet表
https://oss.console.aliyun.com/bucket/oss-cn-hangzhou/fengzetest/object?pathrt_spark_test%2Fcustomer-parquet%2F
共8个Parquet文件总文件大小为113.918MB Spark作业配置如下executor只有1core
conf spark.driver.resourceSpecsmall;
conf spark.executor.instances1;
conf spark.executor.resourceSpecsmall;
conf spark.app.nameSpark SQL Test;
conf spark.adb.connectorsoss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100;
根据前面的公式计算
defaultMaxSplitBytes 128MB
openCostInBytes 4MB
minPartitionNum max(1, 2) 2
totalBytes 113.918 8 * 4MB 145.918MB
bytesPerCore 145.918MB / 2 72.959MB
maxSplitBytes 72.959MB Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
得到maxSplitBytes为72.959MB从日志中也可看到对应大小 经过排序后的文件顺序为(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007)再次经过合并后得到3个FilePartitioned分别对应
FilePartitioned 1: 00000, 00001, 00002FilePartitioned 2: 00003, 00004, 00006FilePartitioned 3: 00005, 00007
即总共会生成3个Task
从Spark UI查看确实生成3个Task 从日志查看也是生成3个Task 变更Spark作业配置5个executor共10core
conf spark.driver.resourceSpecsmall;
conf spark.executor.instances5;
conf spark.executor.resourceSpecmedium;
conf spark.app.nameSpark SQL Test;
conf spark.adb.connectorsoss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100;
根据前面的公式计算
defaultMaxSplitBytes 128MB
openCostInBytes 4MB
minPartitionNum max(10, 2) 10
totalBytes 113.918 8 * 4MB 145.918MB
bytesPerCore 145.918MB / 10 14.5918MB
maxSplitBytes 14.5918MB Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
查看日志 此时可以看到14.5918MB会对源文件进行切分会对00001, 00002,00003,00004,00005,00006进行切分切分成两份00007由于小于14.5918MB因此不会进行切分经过PartitionedFileUtil#splitFiles后总共存在7 * 2 1 15个PartitionedFile
00000(0 - 14.5918MB), 00000(14.5918MB - 15.698MB)00001(0 - 14.5918MB), 00001(14.5918MB - 15.632MB)00002(0 - 14.5918MB), 00002(14.5918MB - 15.629MB)00003(0 - 14.5918MB), 00003(14.5918MB - 15.624MB)00004(0 - 14.5918MB), 00004(14.5918MB - 15.617MB)00005(0 - 14.5918MB), 00005(14.5918MB - 15.536MB)00006(0 - 14.5918MB), 00006(14.5918MB - 15.539MB)00007(0 - 4.634MB)
经过排序后得到如下以及合并后得到10个FilePartitioned分别对应
FilePartitioned 1: 00000(0 - 14.5918MB)FilePartitioned 2: 00001(0 - 14.5918MB)FilePartitioned 3: 00002(0 - 14.5918MB)FilePartitioned 4: 00003(0 - 14.5918MB)FilePartitioned 5: 00004(0 - 14.5918MB)FilePartitioned 6: 00005(0 - 14.5918MB)FilePartitioned 7: 00006(0 - 14.5918MB)FilePartitioned 8: 00007(0 - 4.634MB),00000(14.5918MB - 15.698MB)FilePartitioned 9: 00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)FilePartitioned 10: 00004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB)
即总共会生成10个Task
通过Spark UI也可查看到生成了10个Task 查看日志000004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB)在同一个Task中 00007(0 - 4.634MB),00000(14.5918MB - 15.698MB) 00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)在同一个Task中 总结
通过源码可知Spark对于源端Partition切分会考虑到分区下所有文件大小以及打开每个文件的开销同时会涉及对大文件的切分以及小文件的合并最后得到一个相对合理的Partition。
原文链接
本文为阿里云原创内容未经允许不得转载。