当前位置: 首页 > news >正文

东莞专业网站设计专业服务正规少儿编程排名

东莞专业网站设计专业服务,正规少儿编程排名,中国最新军事新闻头条,网站建设客户去哪找引言 典型的Spark作业读取位于OSS的Parquet外表时#xff0c;源端的并发度#xff08;task/partition#xff09;如何确定#xff1f;特别是在做TPCH测试时有一些疑问#xff0c;如源端扫描文件的并发度是如何确定的#xff1f;是否一个parquet文件对应一个partition源端的并发度task/partition如何确定特别是在做TPCH测试时有一些疑问如源端扫描文件的并发度是如何确定的是否一个parquet文件对应一个partition多个parquet文件对应一个partition还是一个parquet文件对应多个partition本文将从源码角度进行分析进而解答这些疑问。 分析 数据源读取对应的物理执行节点为FileSourceScanExec读取数据代码块如下 lazy val inputRDD: RDD[InternalRow] {val readFile: (PartitionedFile) Iterator[InternalRow] relation.fileFormat.buildReaderWithPartitionValues(sparkSession relation.sparkSession,dataSchema relation.dataSchema,partitionSchema relation.partitionSchema,requiredSchema requiredSchema,filters pushedDownFilters,options relation.options,hadoopConf relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))val readRDD if (bucketedScan) {createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,relation)} else {createReadRDD(readFile, dynamicallySelectedPartitions, relation)}sendDriverMetrics()readRDD} 主要关注非bucket的处理对于非bucket的扫描调用createReadRDD方法定义如下 /*** Create an RDD for non-bucketed reads.* The bucketed variant of this function is [[createBucketedReadRDD]].** param readFile a function to read each (part of a) file.* param selectedPartitions Hive-style partition that are part of the read.* param fsRelation [[HadoopFsRelation]] associated with the read.*/private def createReadRDD(readFile: (PartitionedFile) Iterator[InternalRow],selectedPartitions: Array[PartitionDirectory],fsRelation: HadoopFsRelation): RDD[InternalRow] {// 文件打开开销每次打开文件最少需要读取的字节 val openCostInBytes fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes// 最大切分分片大小val maxSplitBytes FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)logInfo(sPlanning scan with bin packing, max size: $maxSplitBytes bytes, sopen cost is considered as scanning $openCostInBytes bytes.)// Filter files with bucket pruning if possibleval bucketingEnabled fsRelation.sparkSession.sessionState.conf.bucketingEnabledval shouldProcess: Path Boolean optionalBucketSet match {case Some(bucketSet) if bucketingEnabled // Do not prune the file if bucket file name is invalidfilePath BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)case _ _ true}// 对分区下文件进行切分并按照从大到小进行排序val splitFiles selectedPartitions.flatMap { partition partition.files.flatMap { file // getPath() is very expensive so we only want to call it once in this block:val filePath file.getPathif (shouldProcess(filePath)) {// 文件是否可splitparquet/orc/avro均可被splitval isSplitable relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)// 切分文件PartitionedFileUtil.splitFiles(sparkSession relation.sparkSession,file file,filePath filePath,isSplitable isSplitable,maxSplitBytes maxSplitBytes,partitionValues partition.values)} else {Seq.empty}}}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)val partitions FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)new FileScanRDD(fsRelation.sparkSession, readFile, partitions)} 可以看到确定最大切分分片大小maxSplitBytes对于后续切分为多少个文件非常重要其核心逻辑如下 def maxSplitBytes(sparkSession: SparkSession,selectedPartitions: Seq[PartitionDirectory]): Long {// 读取文件时打包成最大的partition大小默认为128MB对应一个block大小val defaultMaxSplitBytes sparkSession.sessionState.conf.filesMaxPartitionBytes// 打开每个文件的开销默认为4MBval openCostInBytes sparkSession.sessionState.conf.filesOpenCostInBytes// 建议的不保证最小分割文件分区数默认未设置从leafNodeDefaultParallelism获取// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism - SparkContext#defaultParallelism// - TaskSchedulerImpl#defaultParallelism - CoarseGrainedSchedulerBackend#defaultParallelism// - 总共多少核max(executor core总和, 2)最少为2val minPartitionNum sparkSession.sessionState.conf.filesMinPartitionNum.getOrElse(sparkSession.leafNodeDefaultParallelism)// 总共读取的大小val totalBytes selectedPartitions.flatMap(_.files.map(_.getLen openCostInBytes)).sum// 单core读取的大小val bytesPerCore totalBytes / minPartitionNum// 计算大小不会超过设置的128MBMath.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))} 对于PartitionedFileUtil#splitFiles其核心逻辑如下较为简单直接按照最大切分大小切分大文件来进行分片 def splitFiles(sparkSession: SparkSession,file: FileStatus,filePath: Path,isSplitable: Boolean,maxSplitBytes: Long,partitionValues: InternalRow): Seq[PartitionedFile] {if (isSplitable) {// 切分为多个分片(0L until file.getLen by maxSplitBytes).map { offset val remaining file.getLen - offsetval size if (remaining maxSplitBytes) maxSplitBytes else remainingval hosts getBlockHosts(getBlockLocations(file), offset, size)PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)}} else {Seq(getPartitionedFile(file, filePath, partitionValues))}} 在获取到Seq[PartitionedFile]列表后还并没有完成对文件的切分还需要调用FilePartition#getFilePartitions做最后的处理方法核心逻辑如下 def getFilePartitions(sparkSession: SparkSession,partitionedFiles: Seq[PartitionedFile],maxSplitBytes: Long): Seq[FilePartition] {val partitions new ArrayBuffer[FilePartition]val currentFiles new ArrayBuffer[PartitionedFile]var currentSize 0L/** Close the current partition and move to the next. */def closePartition(): Unit {if (currentFiles.nonEmpty) {// Copy to a new Array.// 重新生成一个新的PartitionFileval newPartition FilePartition(partitions.size, currentFiles.toArray)partitions newPartition}currentFiles.clear()currentSize 0}// 打开文件开销默认为4MBval openCostInBytes sparkSession.sessionState.conf.filesOpenCostInBytes// Assign files to partitions using Next Fit DecreasingpartitionedFiles.foreach { file if (currentSize file.length maxSplitBytes) {// 如果累加的文件大小大于的最大切分大小则关闭该分区表示完成一个Task读取的数据切分closePartition()}// Add the given file to the current partition.currentSize file.length openCostInBytescurrentFiles file}// 最后关闭一次分区文件可能较小closePartition()partitions.toSeq} 可以看到经过这一步后会把一些小文件做合并生成maxSplitBytes大小的PartitionFile这样可以避免拉起太多task读取太多小的文件。 生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的并发度为partitions的长度也即最后Spark生成的Task个数 override protected def getPartitions: Array[RDDPartition] filePartitions.toArray 整体流程图如下图所示 拆分、合并过程如下图所示 实战 对于TPCH 10G生成的customer parquet表 https://oss.console.aliyun.com/bucket/oss-cn-hangzhou/fengzetest/object?pathrt_spark_test%2Fcustomer-parquet%2F 共8个Parquet文件总文件大小为113.918MB Spark作业配置如下executor只有1core conf spark.driver.resourceSpecsmall; conf spark.executor.instances1; conf spark.executor.resourceSpecsmall; conf spark.app.nameSpark SQL Test; conf spark.adb.connectorsoss; use tpcd; select * from customer order by C_CUSTKEY desc limit 100; 根据前面的公式计算 defaultMaxSplitBytes 128MB openCostInBytes 4MB minPartitionNum max(1, 2) 2 totalBytes 113.918 8 * 4MB 145.918MB bytesPerCore 145.918MB / 2 72.959MB maxSplitBytes 72.959MB Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) 得到maxSplitBytes为72.959MB从日志中也可看到对应大小 经过排序后的文件顺序为(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007)再次经过合并后得到3个FilePartitioned分别对应 FilePartitioned 1: 00000, 00001, 00002FilePartitioned 2: 00003, 00004, 00006FilePartitioned 3: 00005, 00007 即总共会生成3个Task 从Spark UI查看确实生成3个Task 从日志查看也是生成3个Task 变更Spark作业配置5个executor共10core conf spark.driver.resourceSpecsmall; conf spark.executor.instances5; conf spark.executor.resourceSpecmedium; conf spark.app.nameSpark SQL Test; conf spark.adb.connectorsoss; use tpcd; select * from customer order by C_CUSTKEY desc limit 100; 根据前面的公式计算 defaultMaxSplitBytes 128MB openCostInBytes 4MB minPartitionNum max(10, 2) 10 totalBytes 113.918 8 * 4MB 145.918MB bytesPerCore 145.918MB / 10 14.5918MB maxSplitBytes 14.5918MB Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) 查看日志 此时可以看到14.5918MB会对源文件进行切分会对00001, 00002,00003,00004,00005,00006进行切分切分成两份00007由于小于14.5918MB因此不会进行切分经过PartitionedFileUtil#splitFiles后总共存在7 * 2 1 15个PartitionedFile 00000(0 - 14.5918MB), 00000(14.5918MB - 15.698MB)00001(0 - 14.5918MB), 00001(14.5918MB - 15.632MB)00002(0 - 14.5918MB), 00002(14.5918MB - 15.629MB)00003(0 - 14.5918MB), 00003(14.5918MB - 15.624MB)00004(0 - 14.5918MB), 00004(14.5918MB - 15.617MB)00005(0 - 14.5918MB), 00005(14.5918MB - 15.536MB)00006(0 - 14.5918MB), 00006(14.5918MB - 15.539MB)00007(0 - 4.634MB) 经过排序后得到如下以及合并后得到10个FilePartitioned分别对应 FilePartitioned 1: 00000(0 - 14.5918MB)FilePartitioned 2: 00001(0 - 14.5918MB)FilePartitioned 3: 00002(0 - 14.5918MB)FilePartitioned 4: 00003(0 - 14.5918MB)FilePartitioned 5: 00004(0 - 14.5918MB)FilePartitioned 6: 00005(0 - 14.5918MB)FilePartitioned 7: 00006(0 - 14.5918MB)FilePartitioned 8: 00007(0 - 4.634MB),00000(14.5918MB - 15.698MB)FilePartitioned 9: 00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)FilePartitioned 10: 00004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB) 即总共会生成10个Task 通过Spark UI也可查看到生成了10个Task 查看日志000004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB)在同一个Task中 00007(0 - 4.634MB),00000(14.5918MB - 15.698MB) 00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)在同一个Task中 总结 通过源码可知Spark对于源端Partition切分会考虑到分区下所有文件大小以及打开每个文件的开销同时会涉及对大文件的切分以及小文件的合并最后得到一个相对合理的Partition。 原文链接 本文为阿里云原创内容未经允许不得转载。
http://www.yutouwan.com/news/386089/

相关文章:

  • 求个网站你明白的 知乎昌吉做网站需要多少钱
  • 天津刘金鹏做网站网站群内容管理系统的设计与实现
  • 中国建设造价工程协会网站建站公司见客户没话说
  • 用asp做的网站有哪些html 旅游网站
  • 做网站的高手做网站买了域名后
  • 手机版网站制作模板微商城怎么注册怎么弄
  • 西安网站建设怎么接单旅游网网站建设目的
  • 山东省两学一做网站兰州移动端网站建设
  • 网站建设推广报价seo推广技巧
  • 哈尔滨手机网站建设价格低seo如何使用wordpress优化
  • pc网站建设怎么做apicloud影视源码
  • 装修设计效果图网站建立网站大约多少钱
  • 合肥手机网站制作百度统计会对原网站产生影响吗
  • 谁可以教我做网站网站开发 外包 哪家
  • 聊城网站制作工作室注册公司需要啥资料
  • 邯郸做网站电话广州互联网企业100强
  • 南坪网站建设哪里好惠州网站建设 翻译
  • 如何编写一份网站开发需求文档大连网站建设蛇皮果
  • 抓取网站访客qq号码重庆有哪些建设公司
  • 网站主体注销长春网站建设 4435
  • 安徽省六安市建设局网站网站建设适合的企业
  • 深圳网站优化搜索成都公司网站开发
  • 专业微网站建设公司首选公司河间申梦网站建设制作
  • 织梦软件怎么使用域名做网站网页设计心得体会300
  • 网站建设需要哪些资质公司邮箱价格
  • 浙江做网站的公司有哪些有没有做网站源代码 修改的
  • 学校网站网页制作足球推介网
  • 微网站平台怎样做网站宁海网站建设
  • 单位做网站怎么做大型广告公司有哪些
  • 建设网站需要哪些职位网络规划设计师和信息系统项目管理师哪个好考