Spark Code Analysis - DAGScheduler

13 Feb 2015

DAG, a directed acyclic graph, is a directed graph with no directed cycles.

In spark, the DAGScheduler manages the user submitted jobs, and the stages of the jobs, and the tasks of each stages. The jobs are submitted as a final RDD, which has its dependent tree. The stage is the transformation from a parrent RDD to the child RDD. The task is the operation on each partition in a RDD transformation.

DAGScheduler finally submit the missing tasks to the TaskScheduler based on the task dependent tree. In DAGScheduler, the process function is run as a background thread. The background thread answering the user request and the executer event in function onReceive (at the bottom of the file).

In Spark, two types of dependency are defined (in nsdi 2012):

We found it both sufficient and useful to classify dependencies into two types: narrow dependencies, where each partition of the parent RDD is used by at most one partition of the child RDD, wide dependencies, where multiple child partitions may depend on it.

This distinction is useful for two reasons. First, narrow dependencies allow for pipelined execution on one cluster node, which can compute all the parent partitions. For example, one can apply a map followed by a filter on an element-by-element basis. In contrast, wide dependencies require data from all parent partitions to be available and to be shuffled across the nodes using a MapReduce-like operation. Second, recovery after a node failure is more efficient with a narrow dependency, as only the lost parent partitions need to be recomputed, and they can be recomputed in parallel on different nodes. In contrast, in a lineage graph with wide dependencies, a single failed node might cause the loss of some partition from all the ancestors of an RDD, requiring a complete re-execution.

A brief summarize of function calling diagram of DAGScheduler is here:

Spark DAGScheduler

The handleJobSubmitted and handleTaskCompletion are the most important event processed by DAGScheduler.

After all the dependency calculation (and failure handling) the DAGScheduler submit the missing tasks to TaskScheduler by function submitMissingTasks.

Latex source code to draw above image:

% spark-dagscheduler.tex
% pdflatex -shell-escape spark-dagscheduler.tex
\documentclass[border=10pt,varwidth,convert]{standalone}
\usepackage{tikz}
\usetikzlibrary{calc, shapes, backgrounds}
\usetikzlibrary{decorations.pathmorphing} % for snake lines
\usetikzlibrary{matrix} % for block alignment
\usetikzlibrary{arrows} % for arrow heads
\usetikzlibrary{calc} % for manimula vtion of coordinates
\usetikzlibrary{positioning,fit,calc}
\usetikzlibrary{decorations.markings} % for vecArrow
\usetikzlibrary{patterns}
\pagecolor{olive!50!yellow!50!white}
\begin{document}

\newcommand{\callr}[2]{
\draw [->] (#1.east) to [out=0,in=0] (#2.east);
}
\newcommand{\calll}[2]{
\draw [->] (#1.west) to [out=180,in=180] (#2.west);
}
\newcommand{\functionbase}{
\node[] (function) {};
}
\newcommand{\function}[2]{
\node[below right = #1 of function] (#2) {#2};
}

\begin{tikzpicture}
\functionbase
\function{0cm and 0cm}{handleJobSubmitted};
\function{1cm and 0cm}{submitStage};
\function{2cm and 0cm}{getMissingParentStages};
\function{3cm and 0cm}{getParentStages};
\function{4cm and 0cm}{getShuffleMapStage};
\function{5cm and 0cm}{newOrUsedStage};
\function{6cm and 0cm}{newStage};
\function{7cm and 0cm}{submitWaitingStages};
\function{8cm and 0cm}{submitMissingTasks};
\function{9cm and 0cm}{handleTaskCompletion};

\calll{submitWaitingStages}{submitStage}
\calll{handleJobSubmitted}{submitStage}
\calll{handleJobSubmitted}{submitStage}
\callr{handleJobSubmitted}{getMissingParentStages}
\callr{handleJobSubmitted}{newStage}
\callr{handleJobSubmitted}{submitWaitingStages}
\callr{newOrUsedStage}{newStage}
\calll{newStage}{getParentStages}
\callr{getParentStages}{getShuffleMapStage}
\calll{getShuffleMapStage}{newOrUsedStage}
\callr{submitStage}{getMissingParentStages}
\callr{handleTaskCompletion}{getMissingParentStages}
\callr{handleTaskCompletion}{submitWaitingStages}
\calll{handleTaskCompletion}{submitStage}
\callr{handleTaskCompletion}{submitMissingTasks}
\callr{submitStage}{submitMissingTasks}
\calll{getMissingParentStages}{getShuffleMapStage}

\end{tikzpicture}

\end{document}