Parallel in-memory computation Dataflow (MapReduce, Dryad, Spark) provides a "functional programming"-like interface to users. * All data are immutable (once generated, they are not changed) * Operators transform one collection into another one + This interface is suited for batch processing - Cannot handle certain types of computation (asynchronous) - Not the most efficient (since no support for in-place modification) Question: Are there other computation models? Distributed shared memory Goal: can we make distributed programming similar to single-machine parallel computing? - Launch threads on a cluster of machines - Give them a shared address space as is available on a single machine Single machine multi-core architecture: thread-1 thread-2 thread-3 .... thread-n | | | cache cache cache \ \ | / global shared memory DSM Architecture: thread-1 thread-2 thread-3 .... thread-n | | | local_mem local_mem local_mem \ \ | / distributed shared memory Pro: Interface is indeed as simple as a local computer Challenges of DSM: #1 performance vs. consistency model Example application 1: add two huge arrays together float a[1<<40]; float b[1<<40]; float final[1<<40]; launchThreads(1024, addChunk); void addChunk(thread_id idx) { long long start = (1 << 30) * idx; for (int i = 0; i < (1<<30); i++ ) { final[start+i] = a[start+i] + b[start+i]; } } DMS tracks memory at page-grainularity (say 2MB huge page). Draw picture of 3 threads, 3 chunks of a, b, final If we implement linearizability for memory, what's the performance of this program? (suppose thread i resides on the machine storing the chunk of a and b to be read) perform 1<<30 writes, each write must go to the responsible server (according to linearizability) 0.1ms * (1<<30) = 100000 sec!! (on a single machine, it only takes ~1sec to add 1billion number) The big idea: release consistency Insight: In correctly written programs, on thread should read/write data w/o holding a lock! RC: cache writes on a page, and only make them visible on release (or on barrier) Same example: void addChunk(thread_id me) { acquire_lock(locks[idx]); long long start = (1 << 30) * idx; for (int i = 0; i < (1<<30); i++ ) { final[start+i] = a[start+i] + b[start+i]; } release_lock(locks[idx]); } only ship the updates to responsible server at call of release_lock; Challenges of DSM: #2 False sharing In the previous example, what if the final array is not page-aligned? Thread-1 and Thread-2 both want to write some 2M page What happens under 1st generation DSM? "page-bouncing" between two threads thread-1 wants to write to page P, it transfers p locally and becomes p's owner. thread-2 wants to write to page P, it transfers p locally and becomes p's owner etc. Idea: write diffs Thread-1 makes local modifications to page Thread-2 makes local modifications to page At barrier (release), diff with the original copy to merge Lazy release consistency: Only send write diffs to next acquirer of released lock lazier than RC in two ways: 1. release does nothing, so defer work to future acquire 2. sends write diffs to acquirer Example 2 (lazyness) (a1: acquire lock1, r1: release lock1) M0: a1 x=1 r1 M1: a2 y=1 r2 M2: a1 print x,y r1 What does LRC do? M2 only asks previous holder of lock 1 for write diffs M2 does not see M1's modification to y, even tho on same page What does RC do? Q: what's the performance win from LRC? if you don't acquire lock on object, you don't see updates to it => if you use just some vars on a page, you don't see writes to others => less network traffic Example 3 (Causality challenge) M0: a1 x=1 r1 M1: a1 a2 y=x r2 r1 M2: a2 print x, y r2 What's the potential problem here? Counter-intuitive that M2 might see y=1 but x=0 A violation of "causal consistency": If write W1 contributed to write W2, everyone sees W1 before W2 How does Treadmarks ensure causality? Number each machine's acquires/releases -- "interval" numbers Each machine tracks highest write it has seen from each other machine in a "Vector Timestamp" (VT) Tag each release with current VT On acquire, tell responsible server your VT difference indicates which writes need to be sent (annotate previous example) A more sophisticated application example: PageRank Given a directed web graph, compute an "importance" metric for each page based on its connectivity Simulate a random walk on the web graph, at each time step, a surfer picks a random outlink to go to, what's the probability of the surfer at each page at convergence? Rank_i^{t+1} = 0.15 * (1/N) + 0.85 * \sum_j^{incoming links to i} Rank_j^{t}/#_of_outgoing_node_j How to write such a program in Treadmarks? For simplicity, we assume the web graph is regular (each node has 50 outgoing links) Solution #1: node_id graph[n][50];//initialized to contain each node's incoming neighbors float curr[n]; float next[n]; for (iter = 0; iter < ITERATIONS ; iter++) { launcheThread(1024, computePR, graph, curr_rank, next_rank) swap(curr, next); } // computePR(thread_id me) { //assuming graph stores incoming neighbor id for each node start = (n/1024) * me; for (int i = start; i < n/1024; i++) { next[i] = 0.15 * (1/n) for (int j = 0; j < 50; j++) { next[i] += 0.85 * curr[graph[i][j]]/50.0; } } } Problem with solution 1: lots of random reads ==> horrible performance, each random read takes 0.1ms. node_id graph[n][50];//initialized to contain each node's outgoing neighbors float curr[n]; float next[n]; for (iter = 0; iter < ITERATIONS ; iter++) { launcheThread(1024, computePR, graph, curr_rank, next_rank) swap(curr, next); } // computePR(thread_id me) { //assuming graph stores incoming neighbor id for each node start = (n/1024) * me; for (int i = start; i < n/1024; i++) { next[i] = 0.15 * (1/n) for (int j = 0; j < 50; j++) { //acquire graph[i][j] next[graph[i][j]] += 0.85*curr[i]/50.0; //release graph[i][j] } } } Problem: random writes of incrementing pagerank need to be synchronized. node_id graph[n][50];//initialized somehow float curr[n]; float next[n]; float tmp_rank[1024][n]; for (iter = 0; iter < ITERATIONS ; iter++) { launchThread(1024, computePR) launchThread(1024, aggTmpRank) swap(curr, next); } computePR(thread_id me) { //assuming graph stores outgoing neighbor id for each node start = (n/1024) * me; for (int i = start; i < n/1024; i++) { for (int j = 0; j < 50; j++) { tmp_rank[i] += graph[i][j]/50.0; } } } aggTmpRank(thread_id me) { //assuming graph stores outgoing neighbor id for each node start = (n/1024) * me; for (int i = start; i < n/1024; i++) { next[i] = 0.15 * (1/n); for (int j = 0; j < 1024; j++) { next[i] += 0.85 * tmp[i][j]; } } } problem: large temporary state, i.e. tmp_rank[1024][n] Swapping??!! Compared to MapReduce, another problem to all DSM systems: - failure tolerance is reliant on coarse-grained checkpointing