当前位置: 首页 > news >正文

制作视频网站建设宁波受欢迎全网seo优化

制作视频网站建设,宁波受欢迎全网seo优化,kuake自助建站系统源码,百度指数数据来源http://ihoge.cn/2018/Spark%20Scheduler.html Apache Spark是一个开源的#xff0c;强大的分布式查询和处理引擎。它提供MapReduce的灵活性和可扩展性#xff0c;但速度明显更高。 Spark的核心是根据RDD来实现的#xff0c;Spark Scheduler则为Spark核心实现的重要一环…http://ihoge.cn/2018/Spark%20Scheduler.html Apache Spark是一个开源的强大的分布式查询和处理引擎。它提供MapReduce的灵活性和可扩展性但速度明显更高。 Spark的核心是根据RDD来实现的Spark Scheduler则为Spark核心实现的重要一环其作用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据根据RDD的依赖关系构建DAG基于DAG划分Stage将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理我们可以合理规划资源利用做到尽可能用最少的资源高效地完成任务计算。 分布式运行框架 Spark可以部署在多种资源管理平台例如Yarn、Mesos等Spark本身也实现了一个简易的资源管理机制称之为Standalone模式。以下所述均表示Spark-on-Yarn。Spark部署在Yarn上有两种运行模式分别为yarn-client和yarn-cluster模式它们的区别仅仅在于Spark Driver是运行在Client端还是ApplicationMater端。如下图所示为Spark部署在Yarn上以yarn-cluster模式运行的分布式计算框架。 其中蓝色部分是Spark里的概念包括Client、ApplicationMaster、Driver和Executor其中Client和ApplicationMaster主要是负责与Yarn进行交互Driver作为Spark应用程序的总控负责分发任务以及监控任务运行状态Executor负责执行任务并上报状态信息给Driver从逻辑上来看Executor是进程运行在其中的任务是线程所以说Spark的任务是线程级别的。通过下面的时序图可以更清晰地理解一个Spark应用程序从提交到运行的完整流程。 Client 提交一个Spark应用程序首先通过Client向ResourceManager请求启动一个Application同时检查是否有足够的资源满足Application的需求如果资源条件满足则准备ApplicationMaster的启动上下文交给ResourceManager并循环监控Application状态。 ResourceManager 当提交的资源队列中有资源时ResourceManager会在某个NodeManager上启动ApplicationMaster进程ApplicationMaster会单独启动Driver后台线程当Driver启动后ApplicationMaster会通过本地的RPC连接Driver并开始向ResourceManager申请Container资源运行Executor进程一个Executor对应与一个Container当ResourceManager返回Container资源则在对应的Container上启动Executor。 Driver Driver线程主要是初始化SparkContext对象准备运行所需的上下文然后一方面保持与ApplicationMaster的RPC连接通过ApplicationMaster申请资源另一方面根据用户业务逻辑开始调度任务将任务下发到已有的空闲Executor上。 Executor 当ResourceManager向ApplicationMaster返回Container资源时ApplicationMaster就尝试在对应的Container上启动Executor进程Executor进程起来后会向Driver注册注册成功后保持与Driver的心跳同时等待Driver分发任务当分发的任务执行完毕后将任务状态上报给Driver。 Driver把资源申请的逻辑给抽象出来以适配不同的资源管理系统所以才间接地通过ApplicationMaster去和Yarn打交道。 从上述时序图可知Client只管提交Application并监控Application的状态。对于Spark的任务调度主要是集中在两个方面: 资源申请和任务分发其主要是通过ApplicationMaster、Driver以及Executor之间来完成下面详细剖析Spark任务调度每个细节。 Spark任务调度总览 当Driver起来后Driver则会根据用户程序逻辑准备任务并根据Executor资源情况逐步分发任务。在详细阐述任务调度前首先说明下Spark里的几个概念。一个Spark应用程序包括Job、Stage以及Task三个概念 Job是以Action方法为界遇到一个Action方法则触发一个JobStage是Job的子集以RDD宽依赖(即Shuffle)为界遇到Shuffle做一次划分Task是Stage的子集以并行度(分区数)来衡量分区数是多少则有多少个task。 Spark的任务调度总体来说分两路进行一路是Stage级的调度一路是Task级的调度总体调度流程如下图所示。 Spark RDD通过其Transactions操作形成了RDD血缘关系图即DAG最后通过Action的调用触发Job并调度执行。DAGScheduler负责Stage级的调度主要是将DAG切分成若干Stages并将每个Stage打包成TaskSet交给TaskScheduler调度。TaskScheduler负责Task级的调度将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行调度过程中SchedulerBackend负责提供可用资源其中SchedulerBackend有多种实现分别对接不同的资源管理系统。有了上述感性的认识后下面这张图描述了Spark-On-Yarn模式下在任务调度期间ApplicationMaster、Driver以及Executor内部模块的交互过程。 Driver初始化SparkContext过程中会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver并启动SchedulerBackend以及HeartbeatReceiver。SchedulerBackend通过ApplicationMaster申请资源并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息监控Executor的存活状况并通知到TaskScheduler。下面着重剖析DAGScheduler负责的Stage调度以及TaskScheduler负责的Task调度。 Stage级的调度 Spark的任务调度是从DAG切割开始主要是由DAGScheduler来完成。当遇到一个Action操作后就会触发一个Job的计算并交给DAGScheduler来提交下图是涉及到Job提交的相关方法调用流程图。 Job由最终的RDD和Action方法封装而成SparkContext将Job交给DAGScheduler提交它会根据RDD的血缘关系构成的DAG进行切分将一个Job划分为若干Stages具体划分策略是由最终的RDD不断通过依赖回溯判断父依赖是否是款依赖即以Shuffle为界划分Stage窄依赖的RDD之间被划分到同一个Stage中可以进行pipeline式的计算如上图紫色流程部分。划分的Stages分两类一类叫做ResultStage为DAG最下游的Stage由Action方法决定另一类叫做ShuffleMapStage为下游Stage准备数据下面看一个简单的例子WordCount。 Job由saveAsTextFile触发该Job由RDD-3和saveAsTextFile方法组成根据RDD之间的依赖关系从RDD-3开始回溯搜索直到没有依赖的RDD-0在回溯搜索过程中RDD-3依赖RDD-2并且是宽依赖所以在RDD-2和RDD-3之间划分StageRDD-3被划到最后一个Stage即ResultStage中RDD-2依赖RDD-1RDD-1依赖RDD-0这些依赖都是窄依赖所以将RDD-0、RDD-1和RDD-2划分到同一个Stage即ShuffleMapStage中实际执行的时候数据记录会一气呵成地执行RDD-0到RDD-2的转化。不难看出其本质上是一个深度优先搜索算法。 一个Stage是否被提交需要判断它的父Stage是否执行只有在父Stage执行完毕才能提交当前Stage如果一个Stage没有父Stage那么从该Stage开始提交。Stage提交时会将Task信息分区信息以及方法等序列化并被打包成TaskSet交给TaskScheduler一个Partition对应一个Task另一方面监控Stage的运行状态只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务其他类型的Task失败会在TaskScheduler的调度过程中重试。 相对来说DAGScheduler做的事情较为简单仅仅是在Stage层面上划分DAG提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂下面详细阐述其细节。 Task级的调度 Spark Task的调度是由TaskScheduler来完成由前文可知DAGScheduler将Stage打包到TaskSet交给TaskSchedulerTaskScheduler会将其封装为TaskSetManager加入到调度队列中TaskSetManager负责监控管理同一个Stage中的TasksTaskScheduler就是以TaskSetManager为单元来调度任务。前面也提到TaskScheduler初始化后会启动SchedulerBackend它负责跟外界打交道接收Executor的注册信息并维护Executor的状态所以说SchedulerBackend是管“粮食”的同时它在启动后会定期地去“询问”TaskScheduler有没有任务要运行也就是说它会定期地“问”TaskScheduler“我有这么余量你要不要啊”TaskScheduler在SchedulerBackend“问”它的时候会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行大致方法调用流程如下图所示。 调度策略 前面讲到TaskScheduler会先把DAGScheduler给过来的TaskSet封装成TaskSetManager扔到任务队列里然后再从任务队列里按照一定的规则把它们取出来在SchedulerBackend给过来的Executor上运行。这个调度过程实际上还是比较粗粒度的是面向TaskSetManager的。 TaskScheduler是以树的方式来管理任务队列树中的节点类型为Schdulable叶子节点为TaskSetManager非叶子节点为Pool下图是它们之间的继承关系。 TaskScheduler支持两种调度策略一种是FIFO也是默认的调度策略另一种是FAIR。在TaskScheduler初始化过程中会实例化rootPool表示树的根节点是Pool类型。如果是采用FIFO调度策略则直接简单地将TaskSetManager按照先来先到的方式入队出队时直接拿出最先进队的TaskSetManager其树结构大致如下图所示TaskSetManager保存在一个FIFO队列中。 在阐述FAIR调度策略前先贴一段使用FAIR调度策略的应用程序代码后面针对该代码逻辑来详细阐述FAIR调度的实现细节。 object MultiJobTest {// spark.scheduler.modeFAIRdef main(args: Array[String]): Unit {val spark SparkSession.builder().getOrCreate()val rdd spark.sparkContext.textFile(...).map(_.split(\\s)).map(x (x(0), x(1)))val jobExecutor Executors.newFixedThreadPool(2)jobExecutor.execute(new Runnable {override def run(): Unit {spark.sparkContext.setLocalProperty(spark.scheduler.pool, count-pool)val cnt rdd.groupByKey().count()println(sCount: $cnt)}})jobExecutor.execute(new Runnable {override def run(): Unit {spark.sparkContext.setLocalProperty(spark.scheduler.pool, take-pool)val data rdd.sortByKey().take(10)println(sData Samples: )data.foreach { x println(x.mkString(, )) }}})jobExecutor.shutdown()while (!jobExecutor.isTerminated) {}println(Done!)} }上述应用程序中使用两个线程分别调用了Action方法即有两个Job会并发提交但是不管怎样这两个Job被切分成若干TaskSet后终究会被交到TaskScheduler这里统一管理其调度树大致如下图所示。 在出队时则会对所有TaskSetManager排序具体排序过程是从根节点rootPool开始递归地去排序子节点最后合并到一个ArrayBuffer里代码逻辑如下。 def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] {var sortedTaskSetQueue new ArrayBuffer[TaskSetManager]val sortedSchedulableQueue schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)for (schedulable - sortedSchedulableQueue) {sortedTaskSetQueue schedulable.getSortedTaskSetQueue}sortedTaskSetQueue} 使用FAIR调度策略时上面代码中的taskSetSchedulingAlgorithm的类型为FairSchedulingAlgorithm排序过程的比较是基于Fair-share来比较的每个要排序的对象包含三个属性: runningTasks值正在运行的Task数、minShare值、weight值比较时会综合考量runningTasks值minShare以及weight值。如果A对象的runningTasks大于它的minShareB对象的runningTasks小于它的minShare那么B排在A前面如果A、B对象的runningTasks都小于它们的minShare那么就比较runningTasks与minShare的比值谁小谁排前面如果A、B对象的runningTasks都大于它们的minShare那么就比较runningTasks与weight的比值谁小谁排前面。整体上来说就是通过minShare和weight这两个参数控制比较过程可以做到不让资源被某些长时间Task给一直占了。 从调度队列中拿到TaskSetManager后那么接下来的工作就是TaskSetManager按照一定的规则一个个取出Task给TaskSchedulerTaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到TaskSetManager封装了一个Stage的所有Task并负责管理调度这些Task。 本地化调度 从调度队列中拿到TaskSetManager后那么接下来的工作就是TaskSetManager按照一定的规则一个个取出Task给TaskSchedulerTaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到TaskSetManager封装了一个Stage的所有Task并负责管理调度这些Task。 在TaskSetManager初始化过程中会对Tasks按照Locality级别进行分类Task的Locality有五种优先级由高到低顺序PROCESS_LOCAL(指定的Executor)NODE_LOCAL(指定的主机节点)NO_PREF(无所谓)RACK_LOCAL(指定的机架)ANY(满足不了Task的Locality就随便调度)。这五种Locality级别存在包含关系RACK_LOCAL包含NODE_LOCALNODE_LOCAL包含PROCESS_LOCAL然而ANY包含其他所有四种。初始化阶段在对Task分类时根据Task的preferredLocations判断它属于哪个Locality级别属于PROCESS_LOCAL的Task同时也会被加入到NODE_LOCAL、RACK_LOCAL类别中比如一个Task的preferredLocations指定了在Executor-2上执行那么它属于Executor-2对应的PROCESS_LOCAL类别同时也把他加入到Executor-2所在的主机对应的NODE_LOCAL类别Executor-2所在的主机的机架对应的RACK_LOCAL类别中以及ANY类别这样在调度执行时满足不了PROCESS_LOCAL就逐步退化到NODE_LOCALRACK_LOCALANY。 TaskSetManager在决定调度哪些Task时是通过上面流程图中的resourceOffer方法来实现为了尽可能地将Task调度到它的preferredLocations上它采用一种延迟调度算法。resourceOffer方法原型如下参数包括要调度任务的Executor Id、主机地址以及最大可容忍的Locality级别。 def resourceOffer(execId: String,host: String,maxLocality: TaskLocality.TaskLocality): Option[TaskDescription] 延迟调度算法的大致流程如下图所示。 首先看是否存在execId对应的PROCESS_LOCAL类别的任务如果存在取出来调度否则根据当前时间判断是否超过了PROCESS_LOCAL类别最大容忍的延迟如果超过则退化到下一个级别NODE_LOCAL否则等待不调度。退化到下一个级别NODE_LOCAL后调度流程也类似看是否存在host对应的NODE_LOCAL类别的任务如果存在取出来调度否则根据当前时间判断是否超过了NODE_LOCAL类别最大容忍的延迟如果超过则退化到下一个级别RACK_LOCAL否则等待不调度以此类推…..。当不满足Locatity类别会选择等待直到下一轮调度重复上述流程如果你比较激进可以调大每个类别的最大容忍延迟时间如果不满足Locatity时就会等待多个调度周期直到满足或者超过延迟时间退化到下一个级别为止。 失败重试与黑名单机制 除了选择合适的Task调度运行外还需要监控Task的执行状态前面也提到与外部打交道的是SchedulerBackendTask被提交到Executor启动执行后Executor会将执行状态上报给SchedulerBackendSchedulerBackend则告诉TaskSchedulerTaskScheduler找到该Task对应的TaskSetManager并通知到该TaskSetManager这样TaskSetManager就知道Task的失败与成功状态对于失败的Task会记录它失败的次数如果失败次数还没有超过最大重试次数那么就把它放回待调度的Task池子中否则整个Application失败。 在记录Task失败次数过程中会记录它上一次失败所在的Executor Id和Host这样下次再调度这个Task时会使用黑名单机制避免它被调度到上一次失败的节点上起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host以及其对应的“黑暗”时间“黑暗”时间是指这段时间内不要再往这个节点上调度这个Task了。 推测式执行 TaskScheduler在启动SchedulerBackend后还会启动一个后台线程专门负责推测任务的调度推测任务是指对一个Task在不同的Executor上启动多个实例如果有Task实例运行成功则会干掉其他Executor上运行的实例。推测调度线程会每隔固定时间检查是否有Task需要推测执行如果有则会调用SchedulerBackend的reviveOffers去尝试拿资源运行推测任务。 检查是否有Task需要推测执行的逻辑最后会交到TaskSetManagerTaskSetManager采用基于统计的算法检查Task是否需要推测执行算法流程大致如下图所示。 TaskSetManager首先会统计成功的Task数当成功的Task数超过75%(可通过参数spark.speculation.quantile控制)时再统计所有成功的Tasks的运行时间得到一个中位数用这个中位数乘以1.5(可通过参数spark.speculation.multiplier控制)得到运行时间门限如果在运行的Tasks的运行时间超过这个门限则对它启用推测。算法逻辑较为简单其实就是对那些拖慢整体进度的Tasks启用推测以加速整个TaskSet即Stage的运行。 资源申请机制 在前文已经提过ApplicationMaster和SchedulerBackend起来后SchedulerBackend通过ApplicationMaster申请资源ApplicationMaster就是用来专门适配YARN申请Container资源的当申请到Container会在相应Container上启动Executor进程其他事情就交给SchedulerBackend。Spark早期版本只支持静态资源申请即一开始就指定用多少资源在整个Spark应用程序运行过程中资源都不能改变后来支持动态Executor申请用户不需要指定确切的Executor数量Spark会动态调整Executor的数量以达到资源利用的最大化。 静态资源申请 静态资源申请是用户在提交Spark应用程序时就要提前估计应用程序需要使用的资源包括Executor数(num_executor)、每个Executor上的core数(executor_cores)、每个Executor的内存(executor_memory)以及Driver的内存(driver_memory)。 在估计资源使用时应当首先了解这些资源是怎么用的。任务的并行度由分区数(Partitions)决定一个Stage有多少分区就会有多少Task。每个Task默认占用一个Core一个Executor上的所有core共享Executor上的内存一次并行运行的Task数等于num_executor*executor_cores如果分区数超过该值则需要运行多个轮次一般来说建议运行35轮较为合适否则考虑增加num_executor或executor_cores。由于一个Executor的所有tasks会共享内存executor_memory所以建议executor_cores不宜过大。executor_memory的设置则需要综合每个分区的数据量以及是否有缓存等逻辑。下图描绘了一个应用程序内部资源利用情况。 动态资源申请 动态资源申请目前只支持到Executor即可以不用指定num_executor通过参数spark.dynamicAllocation.enabled来控制。由于许多Spark应用程序一开始可能不需要那么多Executor或者其本身就不需要太多Executor所以不必一次性申请那么多Executor根据具体的任务数动态调整Executor的数量尽可能做到资源的不浪费。由于动态Executor的调整会导致Executor动态的添加与删除如果删除Executor其上面的中间Shuffle结果可能会丢失这就需要借助第三方的ShuffleService了如果Spark是部署在Yarn上则可以在Yarn上配置Spark的ShuffleService具体操作仅需做两点: 1.首先在yarn-site.xml中加上如下配置 propertynameyarn.nodemanager.aux-services/namevaluemapreduce_shuffle,spark_shuffle/value /property propertynameyarn.nodemanager.aux-services.spark_shuffle.class/namevalueorg.apache.spark.network.yarn.YarnShuffleService/value /property propertynamespark.shuffle.service.port/namevalue7337/value /property 2.将Spark ShuffleService jar包$SPARK_HOME/lib/spark-*-yarn-shuffle.jar拷贝到每台NodeManager的$HADOOP_HOME/share/hadoop/yarn/lib/下并重启所有的NodeManager。 当启用动态Executor申请时在SparkContext初始化过程中会实例化ExecutorAllocationManager它是被用来专门控制动态Executor申请逻辑的动态Executor申请是一种基于当前Task负载压力实现动态增删Executor的机制。一开始会按照参数spark.dynamicAllocation.initialExecutors设置的初始Executor数申请然后根据当前积压的Task数量逐步增长申请的Executor数如果当前有积压的Task那么取积压的Task数和spark.dynamicAllocation.maxExecutors中的最小值作为Executor数上限每次新增加申请的Executor为2的次方即第一次增加1第二次增加2第三次增加4…。另一方面如果一个Executor在一段时间内都没有Task运行则将其回收但是在Remove Executor时要保证最少的Executor数该值通过参数spark.dynamicAllocation.minExecutors来控制如果Executor上有Cache的数据则永远不会被Remove以保证中间数据不丢失。 结语 本文详细阐述了Spark的任务调度着重讨论Spark on Yarn的部署调度剖析了从应用程序提交到运行的全过程。Spark Schedule算是Spark中的一个大模块它负责任务下发与监控等基本上扮演了Spark大脑的角色。了解Spark Schedule有助于帮助我们清楚地认识Spark应用程序的运行轨迹同时在我们实现其他系统时也可以借鉴Spark的实现。 转载请注明出处本文永久链接http://sharkdtu.com/posts/spark-scheduler.html
http://www.huolong8.cn/news/354796/

相关文章:

  • 茂名模板建站定制网站flash网站设计作品
  • 旅游网站建设的利益软件开发的八个流程
  • 中美今天最新消息池州市网站建设优化
  • 想自己做淘宝有什么网站商务网站建设论文总结
  • 网站手机端怎么做做视频网站怎么挣钱
  • 仿制网站软件怎么设置网站的关键字
  • 怎么免费建设个人博客网站蒙自建设网站
  • 制作公司网站有哪些好处网站建设项目体会
  • 网站建设需要多大的空间欧阳娜娜自创品牌
  • 网站后台栏目根据什么做的网站排名提升工具
  • 鲁中网站取消网站的通知
  • 河南建网站 优帮云宜昌网站推广优化技巧
  • 南通网站建设找哪家建网站怎么挣钱的
  • 深圳市多语言网站建设公司做效果图的网站有哪些
  • 网站自动弹窗代码网络架构配置
  • 网站怎么 备案pc网站做app京东
  • 织梦做的网站有点慢做网站虚拟主机怎么选择
  • 网站建设硬件设置网站风格
  • 网站备案喷绘襄阳市建设公司网站
  • 做网站兼容ie做微信网站价格
  • 网站推广销售腾讯会员被告怎么办青海seo关键词排名优化工具
  • 搞笑网站源代码利用论坛推广网站
  • 汽车网站开发背景杭州网站设计公司联系亿企邦
  • 网站 根目录 虚拟目录石家庄网站建设方案优化
  • 如何做网站么网络优化初学者难吗
  • 免费数据库网站wordpress本地登陆
  • 学做吃的网站网站分站作用
  • php是做网站的吗推广比较好的网站
  • 网站开发 js深圳外贸建站模版
  • 做网站外包厦门网站开发公司找哪家