当前位置: 首页 > news >正文

东莞专业网站设计专业服务正规少儿编程排名

东莞专业网站设计专业服务,正规少儿编程排名,中国最新军事新闻头条,网站建设客户去哪找引言 典型的Spark作业读取位于OSS的Parquet外表时#xff0c;源端的并发度#xff08;task/partition#xff09;如何确定#xff1f;特别是在做TPCH测试时有一些疑问#xff0c;如源端扫描文件的并发度是如何确定的#xff1f;是否一个parquet文件对应一个partition源端的并发度task/partition如何确定特别是在做TPCH测试时有一些疑问如源端扫描文件的并发度是如何确定的是否一个parquet文件对应一个partition多个parquet文件对应一个partition还是一个parquet文件对应多个partition本文将从源码角度进行分析进而解答这些疑问。 分析 数据源读取对应的物理执行节点为FileSourceScanExec读取数据代码块如下 lazy val inputRDD: RDD[InternalRow] {val readFile: (PartitionedFile) Iterator[InternalRow] relation.fileFormat.buildReaderWithPartitionValues(sparkSession relation.sparkSession,dataSchema relation.dataSchema,partitionSchema relation.partitionSchema,requiredSchema requiredSchema,filters pushedDownFilters,options relation.options,hadoopConf relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))val readRDD if (bucketedScan) {createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,relation)} else {createReadRDD(readFile, dynamicallySelectedPartitions, relation)}sendDriverMetrics()readRDD} 主要关注非bucket的处理对于非bucket的扫描调用createReadRDD方法定义如下 /*** Create an RDD for non-bucketed reads.* The bucketed variant of this function is [[createBucketedReadRDD]].** param readFile a function to read each (part of a) file.* param selectedPartitions Hive-style partition that are part of the read.* param fsRelation [[HadoopFsRelation]] associated with the read.*/private def createReadRDD(readFile: (PartitionedFile) Iterator[InternalRow],selectedPartitions: Array[PartitionDirectory],fsRelation: HadoopFsRelation): RDD[InternalRow] {// 文件打开开销每次打开文件最少需要读取的字节 val openCostInBytes fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes// 最大切分分片大小val maxSplitBytes FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)logInfo(sPlanning scan with bin packing, max size: $maxSplitBytes bytes, sopen cost is considered as scanning $openCostInBytes bytes.)// Filter files with bucket pruning if possibleval bucketingEnabled fsRelation.sparkSession.sessionState.conf.bucketingEnabledval shouldProcess: Path Boolean optionalBucketSet match {case Some(bucketSet) if bucketingEnabled // Do not prune the file if bucket file name is invalidfilePath BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)case _ _ true}// 对分区下文件进行切分并按照从大到小进行排序val splitFiles selectedPartitions.flatMap { partition partition.files.flatMap { file // getPath() is very expensive so we only want to call it once in this block:val filePath file.getPathif (shouldProcess(filePath)) {// 文件是否可splitparquet/orc/avro均可被splitval isSplitable relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)// 切分文件PartitionedFileUtil.splitFiles(sparkSession relation.sparkSession,file file,filePath filePath,isSplitable isSplitable,maxSplitBytes maxSplitBytes,partitionValues partition.values)} else {Seq.empty}}}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)val partitions FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)new FileScanRDD(fsRelation.sparkSession, readFile, partitions)} 可以看到确定最大切分分片大小maxSplitBytes对于后续切分为多少个文件非常重要其核心逻辑如下 def maxSplitBytes(sparkSession: SparkSession,selectedPartitions: Seq[PartitionDirectory]): Long {// 读取文件时打包成最大的partition大小默认为128MB对应一个block大小val defaultMaxSplitBytes sparkSession.sessionState.conf.filesMaxPartitionBytes// 打开每个文件的开销默认为4MBval openCostInBytes sparkSession.sessionState.conf.filesOpenCostInBytes// 建议的不保证最小分割文件分区数默认未设置从leafNodeDefaultParallelism获取// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism - SparkContext#defaultParallelism// - TaskSchedulerImpl#defaultParallelism - CoarseGrainedSchedulerBackend#defaultParallelism// - 总共多少核max(executor core总和, 2)最少为2val minPartitionNum sparkSession.sessionState.conf.filesMinPartitionNum.getOrElse(sparkSession.leafNodeDefaultParallelism)// 总共读取的大小val totalBytes selectedPartitions.flatMap(_.files.map(_.getLen openCostInBytes)).sum// 单core读取的大小val bytesPerCore totalBytes / minPartitionNum// 计算大小不会超过设置的128MBMath.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))} 对于PartitionedFileUtil#splitFiles其核心逻辑如下较为简单直接按照最大切分大小切分大文件来进行分片 def splitFiles(sparkSession: SparkSession,file: FileStatus,filePath: Path,isSplitable: Boolean,maxSplitBytes: Long,partitionValues: InternalRow): Seq[PartitionedFile] {if (isSplitable) {// 切分为多个分片(0L until file.getLen by maxSplitBytes).map { offset val remaining file.getLen - offsetval size if (remaining maxSplitBytes) maxSplitBytes else remainingval hosts getBlockHosts(getBlockLocations(file), offset, size)PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)}} else {Seq(getPartitionedFile(file, filePath, partitionValues))}} 在获取到Seq[PartitionedFile]列表后还并没有完成对文件的切分还需要调用FilePartition#getFilePartitions做最后的处理方法核心逻辑如下 def getFilePartitions(sparkSession: SparkSession,partitionedFiles: Seq[PartitionedFile],maxSplitBytes: Long): Seq[FilePartition] {val partitions new ArrayBuffer[FilePartition]val currentFiles new ArrayBuffer[PartitionedFile]var currentSize 0L/** Close the current partition and move to the next. */def closePartition(): Unit {if (currentFiles.nonEmpty) {// Copy to a new Array.// 重新生成一个新的PartitionFileval newPartition FilePartition(partitions.size, currentFiles.toArray)partitions newPartition}currentFiles.clear()currentSize 0}// 打开文件开销默认为4MBval openCostInBytes sparkSession.sessionState.conf.filesOpenCostInBytes// Assign files to partitions using Next Fit DecreasingpartitionedFiles.foreach { file if (currentSize file.length maxSplitBytes) {// 如果累加的文件大小大于的最大切分大小则关闭该分区表示完成一个Task读取的数据切分closePartition()}// Add the given file to the current partition.currentSize file.length openCostInBytescurrentFiles file}// 最后关闭一次分区文件可能较小closePartition()partitions.toSeq} 可以看到经过这一步后会把一些小文件做合并生成maxSplitBytes大小的PartitionFile这样可以避免拉起太多task读取太多小的文件。 生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的并发度为partitions的长度也即最后Spark生成的Task个数 override protected def getPartitions: Array[RDDPartition] filePartitions.toArray 整体流程图如下图所示 拆分、合并过程如下图所示 实战 对于TPCH 10G生成的customer parquet表 https://oss.console.aliyun.com/bucket/oss-cn-hangzhou/fengzetest/object?pathrt_spark_test%2Fcustomer-parquet%2F 共8个Parquet文件总文件大小为113.918MB Spark作业配置如下executor只有1core conf spark.driver.resourceSpecsmall; conf spark.executor.instances1; conf spark.executor.resourceSpecsmall; conf spark.app.nameSpark SQL Test; conf spark.adb.connectorsoss; use tpcd; select * from customer order by C_CUSTKEY desc limit 100; 根据前面的公式计算 defaultMaxSplitBytes 128MB openCostInBytes 4MB minPartitionNum max(1, 2) 2 totalBytes 113.918 8 * 4MB 145.918MB bytesPerCore 145.918MB / 2 72.959MB maxSplitBytes 72.959MB Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) 得到maxSplitBytes为72.959MB从日志中也可看到对应大小 经过排序后的文件顺序为(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007)再次经过合并后得到3个FilePartitioned分别对应 FilePartitioned 1: 00000, 00001, 00002FilePartitioned 2: 00003, 00004, 00006FilePartitioned 3: 00005, 00007 即总共会生成3个Task 从Spark UI查看确实生成3个Task 从日志查看也是生成3个Task 变更Spark作业配置5个executor共10core conf spark.driver.resourceSpecsmall; conf spark.executor.instances5; conf spark.executor.resourceSpecmedium; conf spark.app.nameSpark SQL Test; conf spark.adb.connectorsoss; use tpcd; select * from customer order by C_CUSTKEY desc limit 100; 根据前面的公式计算 defaultMaxSplitBytes 128MB openCostInBytes 4MB minPartitionNum max(10, 2) 10 totalBytes 113.918 8 * 4MB 145.918MB bytesPerCore 145.918MB / 10 14.5918MB maxSplitBytes 14.5918MB Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) 查看日志 此时可以看到14.5918MB会对源文件进行切分会对00001, 00002,00003,00004,00005,00006进行切分切分成两份00007由于小于14.5918MB因此不会进行切分经过PartitionedFileUtil#splitFiles后总共存在7 * 2 1 15个PartitionedFile 00000(0 - 14.5918MB), 00000(14.5918MB - 15.698MB)00001(0 - 14.5918MB), 00001(14.5918MB - 15.632MB)00002(0 - 14.5918MB), 00002(14.5918MB - 15.629MB)00003(0 - 14.5918MB), 00003(14.5918MB - 15.624MB)00004(0 - 14.5918MB), 00004(14.5918MB - 15.617MB)00005(0 - 14.5918MB), 00005(14.5918MB - 15.536MB)00006(0 - 14.5918MB), 00006(14.5918MB - 15.539MB)00007(0 - 4.634MB) 经过排序后得到如下以及合并后得到10个FilePartitioned分别对应 FilePartitioned 1: 00000(0 - 14.5918MB)FilePartitioned 2: 00001(0 - 14.5918MB)FilePartitioned 3: 00002(0 - 14.5918MB)FilePartitioned 4: 00003(0 - 14.5918MB)FilePartitioned 5: 00004(0 - 14.5918MB)FilePartitioned 6: 00005(0 - 14.5918MB)FilePartitioned 7: 00006(0 - 14.5918MB)FilePartitioned 8: 00007(0 - 4.634MB),00000(14.5918MB - 15.698MB)FilePartitioned 9: 00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)FilePartitioned 10: 00004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB) 即总共会生成10个Task 通过Spark UI也可查看到生成了10个Task 查看日志000004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB)在同一个Task中 00007(0 - 4.634MB),00000(14.5918MB - 15.698MB) 00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)在同一个Task中 总结 通过源码可知Spark对于源端Partition切分会考虑到分区下所有文件大小以及打开每个文件的开销同时会涉及对大文件的切分以及小文件的合并最后得到一个相对合理的Partition。 原文链接 本文为阿里云原创内容未经允许不得转载。
http://www.huolong8.cn/news/386089/

相关文章:

  • 站长工具源码制作网站的成本规划
  • 响应式设计的网站个人备案可以做门户网站吗
  • 平坝网站建设微信营销推广方案
  • 安安互联怎么上传网站建网站权威公司
  • 深圳建设造价信息网站白云区做网站
  • 网站推广的优势有哪些怎么做手机软件
  • 什么公司做网站学校seo推广培训班
  • 网站建设开发方案免费网页代理在线
  • 社交网站模板网站建立的步骤是
  • 成都平台网站建设东莞网络建设推广
  • 莱芜建设网站成都小程序建设乚成都柚米
  • 哪里提供邢台做网站wordpress入门视频教程7 - 如何在文章里加入视频和音乐
  • 邯郸做网站的python做网站实战
  • 自己做网站怎么挣钱清空wordpress数据库
  • 做网站配什么电脑网站建设开发成本
  • 旗县政务网站建设工作方案品牌设计主要做哪些内容
  • 网上书城网站建设总结科技霸主从新能源车开始
  • 做网站运营如何提升用户粘度c 网站开发实例教学
  • 企业网站建设软件新注册建筑公司名称大全
  • 任何判断网站SEO做的好坏西宁市解封最新消息
  • 北京人才招聘网站移动网站建设流程
  • 网站广告通栏效果宜昌网站seo
  • 提供专业网站建设平台安全网站建设报价清单
  • 做游戏的php网站有哪些品牌注册要多久
  • 在线做免费网站有哪些wordpress微信登录插件下载失败
  • 廊坊公司网站建设企业邮箱注册申请需要钱吗
  • 陕西做教学成果网站的公司浙江网站建设制作
  • 找企业开发网站多少钱app制作教程视频全集
  • 推荐12个国外免费自助建站网站电子商务网站建设的范围是什么
  • 涟水县住房和城乡建设局网站可信赖的赣州网站建设