跳转至

Scheduler

调度

在Spark里,与调度相关的程序位于spark-3.2.3/core/src/main/scala/org/apache/spark/scheduler/目录下。

DAG调度的过程

我们首先给出一个宏观的说法,其中的不同的名称会在后文进行解释。总的来说,调度由DAGScheduler控制,其通过RDD算子构建DAG,再基于RDD算子之间的依赖来切分所涉算子,最终得到一些Stage对象。每个Stage再基于Partitioner生成多个Task,每个Stage中的Task集合包装成一个TaskSet,生成一个TaskSetManager。这些TaskSetManager与其他的Pool被嵌套地放在Pool中,进行宏观的任务调度。[^spark]

submitJob

具体来说,DAGScheduler会为每个Job计算一个有向无环图,追踪哪些RDD和Stage输出可以被实现,找到运行最短的方式。之后,将Stage打包成Taskset提交给TaskScheduler。一个TaskSet中只包含可以在某个节点上独立运行的Task

Stage根据RDD图上的shuffle边界分割而成。像map()filter()这样的有着“窄”依赖的RDD操作,会被分到单个Stage中,但有着shuffle依赖的操作就需要被分到不同Stage上面。最终得到的每个Stage只与其他Stage有着依赖,而其内部的Task间不存在依赖,保证可以时运行。对这些任务进行的分割操作发生在RDD.compute()函数上。

除了产生Stage间的有向无环图,DAGScheduler还根据当前缓存状态决定在哪里运行哪个任务。之后其把任务交给低一级的TaskScheduler。在输出文件丢失时,它还要做错误处理,即重新提交之前的Stage。在Stage内部的错误会被交给TaskScheduler处理,在取消整个Stage之前会多次重试每一个Task。

为了从失败中恢复,相同的Stage可能需要运行多次.如果TaskScheduler 报告一个Task失败,因为来自前一个Stage的map输出文件已丢失,需要由DAGScheduler重新提交丢失的Stage. 这些通过CompletionEvent 伴随 FetchFailed, 或者ExecutorLost 事件被检测到. The DAGScheduler 会等待一定时间来看是否有其余的节点或 Task 也失败,然后为计算错误的Task所在的Stage重新提交。作为这一过程的一部分,我们还需要为旧的Stage重新创建Stage对象,这些Stage可能已经被清空。需要注意保证不同的任务正确地处在对应的Stage中。

此外,还有缓存追踪机制以及清空机制。DAGScheduler判断哪个RDD被缓存,记忆哪些shuffle map 的Stage已经产生输出文件以避免重新计算或重复运行shuffle的map阶段。当相关的Task结束时,DAGScheduler会清空Stage,以避免内存泄露。

与上有关的函数包括sumbitJob,submitMapStage,submitStage,submitMissingTasks,submitWaitingChildStage等。

调度算法

SchedulingAlgorithm.scala中描述,只有两种继承算法FIFOSchedulingAlgorithmFairSchedulingAlgorithm,其中的方法comparator,返回boolean值,用于在调度时进行排序。

以下以comparator(A,B)为例. - FIFO方法,先比较优先级,小的优先,如果同级,则比较提交时间(通过StageID判断,小的代表提交时间早),早的优先。可以判断,priority越小,优先级越高。 - Fair方法,如果A满足runningTasksminShare小,而B不满足,则先处理A,反之亦然。如果都满足,则比较runningTasks/minShare的比值,低的优先。如果都不满足,则比较runningTasks/weight的比值,低的优先。当这些比值相同时,比较name。总的来说,即资源使用率低的优先。

调度任务类

Job类是提交给调度器的最高层的工作对象。当用户启动一个操作,比如count()时,一个Job会通过submitJob被提交。每个Job中间层的数据需要多个Stage的执行来得到。

Stage类是Task的集合,这些Task计算Job中间层的结果。每个Task在相同RDD的分割下计算相同的函数,因此有着相同的shuffle依赖。Stage在shuffle边界上被分开。有两种Stage:用于最终执行的ResultStage,直接计算一个Spark操作(如count(), save());以及中间层的ShuffleMapStage,其结果用于下一个Stage的输入。当多个任务共用相同的RDD时,Stage常被在这些任务间共享。因此Stage间遵循拓扑排序的来依次执行。每个Stage有一个成员firstJobId,标识首个提交该StageJob。当使用FIFO调度时,这允许先计算或在失败时更快恢复早期JobStage。如果失败,单个Stage可以被多次重新执行。在这种情况下,Stage对象会跟踪多个StageInfo对象,传递给监听器或web UI。最新的StageInfo对象可以通过latestInfo访问。

Task类,任务本身,包含了任务的一些信息,如TaskIdindexattemptNumber等。其由executor执行。每个Task会在单个机器上运行。

TaskSet类,任务集,是Task的集合。

ShuffleMapTaskResultTask分别继承了Task类,对应之前的ShuffleMapStageResultStage。 - ShuffleMapTask是shuffle map任务,其partitionId是该任务所在的分区,mapId是该任务的map id,mapIndex是该任务的map index,mapStatus是该任务的map状态。 - ResultTask是结果任务,其partitionId是该任务所在的分区,resultId是该任务的结果id,resultIndex是该任务的结果index,resultStatus是该任务的结果状态。

Schedulable类,其中一些成员变量比较重要,列举如下:[^schedulable] - weight,用于控制不同该类之间的权重,初始为1,而如果一个类有双倍的权重,则会获得双倍的资源。当设置很高的权重时,无论是否有活动,都会优先获得资源。 - minShare,用于控制最少分配的资源(比如CPU核数),公平调度器倾向于满足所有活动类的最小要求。默认为0。 - runningTasks,当前在运行的任务数量。

PoolTaskSetManager分别继承了Schedulable类。 - TaskSetManager是任务集管理器,其Tasks存放了任务 - Pool是调度池,其中存在schedulableQueue存放调度队列,这是一个嵌套结构,即调度池里可能同时存在子调度池和任务集管理器。举例如下图Pool 调度池中的一些方法均是递归式的操作,如果是Pool类,则继续递归,如果是TaskSetManager则调用对应的操作方法

因此,我们有如下的过程:

graph LR
Job --1 to N --> Stage --1 to N--> Task --N to 1 --> TaskSet --1 to 1 --> TaskSetManager

向源码添加内容时的注意事项

DAGScheduler源码中的注释提示如下: - 当有关的job结束之后,所有的数据结构应该被清空,以避免在长时间运行程序中的状态无限增加。 - 添加新数据结构时,更新DAGSchedulerSuite.assertDataStructuresEmpty函数,这有助于找到内存泄露。

[^schedulable]: Apache. Job Scheduling. Apache Spark Documents. [EB/OL]. [2023-04-20]. https://spark.apache.org/docs/latest/job-scheduling.html

[^spark]: IWBS. Spark. CSDN. [EB/OL]. [2023-04-20]. https://blog.csdn.net/asd491310/category_7797537.html


调度

调度是分布式计算框架中非常重要的一个部分,为了介绍这一部分,我们先介绍一下Spark中不同的调度任务对象的定义。

当用户启动一个操作,比如count()时,一个Job会通过submitJob被提交。每个Job中间层的数据需要多个Stage的执行来得到。

Stage类是Task的集合,这些Task计算Job中间层的结果。每个Task有着相同的shuffle依赖。Stage在shuffle边界上被分开。有两种Stage:用于最终执行的ResultStage;以及中间层的ShuffleMapStage,其结果用于下一个Stage的输入。

Task类,任务本身,包含了任务的一些信息,如TaskIdindexattemptNumber等。其由executor执行。每个Task会在单个机器上运行。

TaskSet类,任务集,是Task的集合。

ShuffleMapTaskResultTask分别继承了Task类,对应之前的ShuffleMapStageResultStage

Schedulable类,其中一些成员变量比较重要,列举如下:[^schedulable] - weight,用于控制不同该类之间的权重,初始为1,而如果一个类有双倍的权重,则会获得双倍的资源。当设置很高的权重时,无论是否有活动,都会优先获得资源。 - minShare,用于控制最少分配的资源(比如CPU核数),公平调度器倾向于满足所有活动类的最小要求。默认为0。 - runningTasks,当前在运行的任务数量。

PoolTaskSetManager分别继承了Schedulable类。 - TaskSetManager是任务集管理器,其Tasks存放了任务 - Pool是调度池,其中存在schedulableQueue存放调度队列,这是一个嵌套结构,即调度池里可能同时存在子调度池和任务集管理器。举例如下图Pool 调度池中的一些方法均是递归式的操作,如果是Pool类,则继续递归,如果是TaskSetManager则调用对应的操作方法

因此,我们有如下的过程:

graph LR
Job --1 to N --> Stage --1 to N--> Task --N to 1 --> TaskSet --1 to 1 --> TaskSetManager

我们首先给出一个宏观的说法,其中的不同的名称会在后文进行解释。总的来说,调度由DAGScheduler控制,其通过RDD算子构建DAG,再基于RDD算子之间的依赖来切分所涉算子,最终得到一些Stage对象。每个Stage再基于Partitioner生成多个Task,每个Stage中的Task集合包装成一个TaskSet,生成一个TaskSetManager。这些TaskSetManager与其他的Pool被嵌套地放在Pool中,进行宏观的任务调度。[^Spark]

submitJob

具体来说,DAGScheduler会为每个Job计算一个有向无环图,追踪哪些RDD和Stage输出可以被实现,找到运行最短的方式。之后,将Stage打包成Taskset提交给TaskScheduler。一个TaskSet中只包含可以在某个节点上独立运行的Task

Stage根据RDD图上的shuffle边界分割而成。像map()filter()这样的有着“窄”依赖的RDD操作,会被分到单个Stage中,但有着shuffle依赖的操作就需要被分到不同Stage上面。最终得到的每个Stage只与其他Stage有着依赖,而其内部的Task间不存在依赖,保证可以时运行。对这些任务进行的分割操作发生在RDD.compute()函数上。

除了产生Stage间的有向无环图,DAGScheduler还根据当前缓存状态决定在哪里运行哪个任务。之后其把任务交给低一级的TaskScheduler。在输出文件丢失时,它还要做错误处理,即重新提交之前的Stage。在Stage内部的错误会被交给TaskScheduler处理,在取消整个Stage之前会多次重试每一个Task。

为了从失败中恢复,相同的Stage可能需要运行多次.如果TaskScheduler 报告一个Task失败,因为来自前一个Stage的map输出文件已丢失,需要由DAGScheduler重新提交丢失的Stage. 这些通过CompletionEvent 伴随 FetchFailed, 或者ExecutorLost 事件被检测到. The DAGScheduler 会等待一定时间来看是否有其余的节点或 Task 也失败,然后为计算错误的Task所在的Stage重新提交。作为这一过程的一部分,我们还需要为旧的Stage重新创建Stage对象,这些Stage可能已经被清空。需要注意保证不同的任务正确地处在对应的Stage中。

此外,还有缓存追踪机制以及清空机制。DAGScheduler判断哪个RDD被缓存,记忆哪些shuffle map 的Stage已经产生输出文件以避免重新计算或重复运行shuffle的map阶段。当相关的Task结束时,DAGScheduler会清空Stage,以避免内存泄露。

与上有关的函数包括sumbitJob,submitMapStage,submitStage,submitMissingTasks,submitWaitingChildStage等。

调度算法

SchedulingAlgorithm.scala中描述,只有两种继承算法FIFOSchedulingAlgorithmFairSchedulingAlgorithm,其中的方法comparator,返回boolean值,用于在调度时进行排序。

以下以comparator(A,B)为例. - FIFO方法,先比较优先级,小的优先,如果同级,则比较提交时间(通过StageID判断,小的代表提交时间早),早的优先。可以判断,priority越小,优先级越高。 - Fair方法,如果A满足runningTasksminShare小,而B不满足,则先处理A,反之亦然。如果都满足,则比较runningTasks/minShare的比值,低的优先。如果都不满足,则比较runningTasks/weight的比值,低的优先。当这些比值相同时,比较name。总的来说,即资源使用率低的优先。

调度任务类

调度部分可以改进的内容

添加调度算法,如调研报告内提到的利用协程方法实现随时将当前在进行处理的批数据暂停,切换到需要低延迟的流数据上去,在处理完流数据之后,再切换回批数据,在保证了流数据的低延迟的同时兼顾批数据的处理。[^neptune]

向调度部分添加内容时的注意事项

DAGScheduler源码中的注释提示如下: - 当有关的job结束之后,所有的数据结构应该被清空,以避免在长时间运行程序中的状态无限增加。 - 添加新数据结构时,更新DAGSchedulerSuite.assertDataStructuresEmpty函数,这有助于找到内存泄露。