Distributed Programming Models -- Graph-based Why graph-based model: * many computations work on graphs. Example graphs: web graph, follow graph, friendship graph, user-like-movies graph Example computation: PageRank, connected-component, shortest path The Push-based graph model (Pregel): - User-defined vertex program (function) takes as input a "combined" message it can "SendMsg" to its out-neighbors: - User-defined combiner to aggregate multiple messages to the same vertex class VertexProgram { vector outNbrs() { //return list of my outgoing nbrs } void sendMsg(node_id nbr, Message msg) { //... } virtual doVertex(Message msg) { //user-defined vertex function } virtual Message combine(Message accum, Message m1) { } node_id my_id_; } class AppVertexProgram : VertexProgram { doVertex() combine(..., ...) //one can add other app state } Execution model: * Bulk synchronous execution while (some node is active) { foreach vertex in Graph { if vertex.active { doVertex() } } global_barrier; } * Asynchronous execution At each machine: while (some node is active) { lock outgoing neighbors doVertex unlock } //no global barrier Exercise: write two programs in this interface PageRank ConnectedComponent Performance: (draw pic of Figure 4, PowerGraph) Partition-by-vertex: randomly assign each vertex to one of the p machines Communication overhead: # of edges cut across machines + straightforward to implement - load-imbalance (nodes w/ large in-degrees or more out-degrees) - most edges got cut across machines --> lots of communication PageRank/CC communication cost: Num_edges per iteration PowerGraph's idea: partition-by-edge How to implement this feature? * First, a node is not aware of all its outgoing neighbors, thus, - users cannot manually loop over outgoing neighbors in doVertex solution: factor out message sending in a separate user-defined function - virtual Message scatter(node_id nbr) { } // while (some node is active) { parallel foreach vertex v in Graph { if v.active { v.doVertex() foreach u of vertex.out_nbrs v.scatter(u) } } global_barrier; } * Second, since scatter for the same (source) vertex needs to run on different machines ==> mirror vertex state // while (some node is active) { parallel foreach vertex v in Graph { if v.active { v.doVertex() foreach u of vertex.out_nbrs v.scatter(u) //messages sent to master of each vertex replica } } global_barrier; parallel foreach mirror of a vertex { synchronize mirror with master } global_barrier; } Communication overhead: # of total vertex replicas * The claim: Communication of random partition-by-edge < Communication of random partition-by-vertex for power-law graphs In the paper, they proposed a greedy assignment strategy to minimize vertex replication Look at Figure (9) Fan-in versus fan-out: The intuition why vertex cut is better (for large fan-out): A node with lots of out-degrees to randomly chosen other nodes Under partition-by-vertex, communication cost \approx out-degree Under partition-by-edge, essentially, send this node's state to all other machines, run scatter at those machines communication cost \approx #_of_machines Question: Is Graph-model a fundamental abstraction or one that can be built on top another fundamental abstraction? It's possible to build a graph-model using a distributed data flow engine? see "GraphX: Graph Processing in a Distributed Dataflow Framework" OSDI 2014