GraphX: Graph Processing in a Distributed Dataflow Framework

07 Jul 2015

GraphX: Graph Processing in a Distributed Dataflow Framework on OSDI 2014.

Graph processing systems, such as Pregel and PowerGraph, typically outperform general-purpose distributed dataflow frameworks like Hadoop MapReduce by orders of magnitude. However, it is still required to process large graph in general-purpose distributed dataflow frameworks.

GraphX achieves an order of magnitude performance gain over the base dataflow framework and matches the performance of specialized graph processing systems while enabling a wider range of computation.

Pregel Graph-Parallel Abstraction

The pregel express the graph algorithm as a Pregel vertex program. This program interface enable concurrently execution of vertexes. The PageRank example is expressed as follow:

def PageRank(v: Id, msgs: List[Double]) {
  // Compute the message sum
  var msgSum = 0
  for (m <- msgs) { msgSum += m }
  // Update the PageRank
  PR(v) = 0.15 + 0.85 * msgSum
  // Broadcast messages with new PR
  for (j <- OutNbrs(v)) {
    msg = PR(v) / NumLinks(v)
    send_msg(to=j, msg)
  }
  // Check for termination
  if (converged(PR(v))) voteToHalt(v)
}

The vertex program for the vertex v begins by summing the messages encoding the weighted PageRank of neighboring vertices. The PageRank is updated using the resulting sum and is then broadcast to its neighbors (weighted by the number of links). Finally, the vertex program assesses whether it has converged (locally) and votes to halt.

Graph System Optimizations

The restrictions imposed by the graph-parallel abstraction along with the sparse graph structure enable a range of important system optimizations.

The GAS Decomposition

The GAS decomposition splits vertex programs into three data-parallel stages: Gather, Apply, and Scatter.

The GAS decomposition leads to a pull-based model of message computation, enables vertex-cut partitioning, improved work balance, serial edge-iteration and reduced data movement, while prohibits direct communication between vertices.

Graph Partitioning

Graph-partitioning algorithms minimize communication and balance computation. The vertex-cut partitioning performs well on many large natural graphs.

Mirror Vertices

Mirror Vertices is used to combine messages send to the same remote machine.

Active Vertices

This optimization track active vertices and eliminate data movement and unnecessary computation for vertices that have converged.

Graph Computation as Dataflow Ops.

  1. In the join stage, vertex and edge properties are joined to form the triplets view 1 consisting of each edge and its corresponding source and destination vertex properties.
  2. In the group-by stage, the triplets are grouped by source or destination vertex to construct the neighborhood of each vertex and compute aggregates.

These two stages capture the GAS decomposition. On top of triplets, we can also implement the Pregel abstraction by iteratively composing the join and group-by stages with data-parallel map stages.

Distributed Graph Representation

graphx_partition

The vertex collection is hash-partitioned by the vertex ids. To support frequent joins across vertex collections, vertices are stored in a local hash index within each partition.

The edge collection is horizontally partitioned by a user-defined partition function. GraphX enables vertex-cut partitioning, which minimizes communication in natural graphs such as social networks and web graphs.