Parallel in-memory computation Map-Reduce is designed for on-disk processing Map(): read from disk, write intermediate values to disk Shuffle: read from mappers' disks, do "out-of-core" sort, Reduce: write final values to disk Question: if computation fits in the aggregate memory of all machines, can we do sth. faster? Distributed shared memory Goal: can we make distributed programming similar to single-machine parallel computing? - Launch threads on a cluster of machines - Give them a shared address space as is available on a single machine Single machine multi-core architecture: thread-1 thread-2 thread-3 .... thread-n | | | cache cache cache \ \ | / global shared memory DSM Architecture: thread-1 thread-2 thread-3 .... thread-n | | | local_mem local_mem local_mem \ \ | / distributed shared memory Pro: Interface is indeed as simple as a local computer Challenges of DSM: #1 performance vs. consistency model Example application 1: add two huge arrays together float a[1<<40]; float b[1<<40]; float final[1<<40]; launchThreads(1024, addChunk); void addChunk(thread_id idx) { long long start = (1 << 30) * idx; for (int i = 0; i < (1<<30); i++ ) { final[start+i] = a[start+i] + b[start+i]; } } DMS tracks memory at page-grainularity (say 2MB huge page). Draw picture of 3 threads, 3 chunks of a, b, final If we implement linearizability for memory, what's the performance of this program? (suppose thread i resides on the machine storing the chunk of a and b to be read) perform 1<<30 writes, each write must go to the responsible server (according to linearizability) 0.1ms * (1<<30) = 100000 sec!! (on a single machine, it only takes ~1sec to add 1billion number) The big idea: release consistency no-one should read/write data w/o holding a lock! RC: cache writes on a page, and only make them visible on release (or on barrier) Same example: void addChunk(thread_id me) { acquire_lock(locks[idx]); long long start = (1 << 30) * idx; for (int i = 0; i < (1<<30); i++ ) { final[start+i] = a[start+i] + b[start+i]; } release_lock(locks[idx]); } only ship the updates to responsible server at call of release_lock; Challenges of DMS: #2 False sharing In the previous application example, what if the final array is not page-aligned? Thread-1 and Thread-2 both want to write some 2M page What happens under 1st generation DSM? "page-bouncing" between two threads thread-1 wants to write to page P, it transfers p locally and becomes p's owner. thread-2 wants to write to page P, it transfers p locally and becomes p's owner etc. Idea: write diffs Thread-1 makes local modifications to page Thread-2 makes local modifications to page At barrier (release), diff with the original copy to merge Lazy release consistency: Only send write diffs to next acquirer of released lock lazier than RC in two ways: 1. release does nothing, so defer work to future acquire 2. sends write diffs to acquirer Example 2 (lazyness) (a1: acquire lock1, r1: release lock1) M0: a1 x=1 r1 M1: a2 y=1 r2 M2: a1 print x,y r1 What does LRC do? M2 only asks previous holder of lock 1 for write diffs M2 does not see M1's modification to y, even tho on same page What does RC do? Q: what's the performance win from LRC? if you don't acquire lock on object, you don't see updates to it => if you use just some vars on a page, you don't see writes to others => less network traffic Example 3 (Causality challenge) M0: a1 x=1 r1 M1: a1 a2 y=x r2 r1 M2: a2 print x, y r2 What's the potential problem here? Counter-intuitive that M2 might see y=1 but x=0 A violation of "causal consistency": If write W1 contributed to write W2, everyone sees W1 before W2 How does Treadmarks ensure causality? Number each machine's acquires/releases -- "interval" numbers Each machine tracks highest write it has seen from each other machine in a "Vector Timestamp" (VT) Tag each release with current VT On acquire, tell responsible server your VT difference indicates which writes need to be sent (annotate previous example) A more sophisticated application example: PageRank Given a directed web graph, compute an "importance" metric for each page based on its connectivity Simulate a random walk on the web graph, at each time step, a surfer picks a random outlink to go to, what's the probability of the surfer at each page at convergence? Rank_i^{t+1} = 0.15 * (1/N) + 0.85 * \sum_j^{incoming links to i} Rank_j^{t}/#_of_outgoing_node_j How to write such a program in Treadmarks? For simplicity, we assume the web graph is regular (each node has 50 outgoing links) Solution #1: node_id graph[n][50];//initialized to contain each node's incoming neighbors float curr[n]; float next[n]; for (iter = 0; iter < ITERATIONS ; iter++) { launcheThread(1024, computePR, graph, curr_rank, next_rank) swap(curr, next); } // computePR(thread_id me) { //assuming graph stores incoming neighbor id for each node start = (n/1024) * me; for (int i = start; i < n/1024; i++) { next[i] = 0.15 * (1/n) for (int j = 0; j < 50; j++) { next[i] += 0.85 * curr[graph[i][j]]/50.0; } } } Problem with solution 1: lots of random reads ==> horrible performance, each random read takes 0.1ms. node_id graph[n][50];//initialized to contain each node's outgoing neighbors float curr[n]; float next[n]; for (iter = 0; iter < ITERATIONS ; iter++) { launcheThread(1024, computePR, graph, curr_rank, next_rank) swap(curr, next); } // computePR(thread_id me) { //assuming graph stores incoming neighbor id for each node start = (n/1024) * me; for (int i = start; i < n/1024; i++) { next[i] = 0.15 * (1/n) for (int j = 0; j < 50; j++) { //acquire graph[i][j] next[graph[i][j]] += 0.85*curr[i]/50.0; //release graph[i][j] } } } Problem: random writes of incrementing pagerank need to be synchronized. node_id graph[n][50];//initialized somehow float curr[n]; float next[n]; float tmp_rank[1024][n]; for (iter = 0; iter < ITERATIONS ; iter++) { launchThread(1024, computePR) launchThread(1024, aggTmpRank) swap(curr, next); } computePR(thread_id me) { //assuming graph stores outgoing neighbor id for each node start = (n/1024) * me; for (int i = start; i < n/1024; i++) { for (int j = 0; j < 50; j++) { tmp_rank[i] += graph[i][j]/50.0; } } } aggTmpRank(thread_id me) { //assuming graph stores outgoing neighbor id for each node start = (n/1024) * me; for (int i = start; i < n/1024; i++) { next[i] = 0.15 * (1/n); for (int j = 0; j < 1024; j++) { next[i] += 0.85 * tmp[i][j]; } } } problem: large temporary state, i.e. tmp_rank[1024][n] Swapping??!! Compared to MapReduce, another problem to all DSM systems: - failure tolerance is reliant on coarse-grained checkpointing Spark: An offspring of MapReduce instead of DSM. Borrows Dryad's ideas in its programming model (can express iteration and joins) New idea: gives user control over trade off between fault tolerance and performance Abstraction of Resilient Distributed Datasets: an RDD is a collection of partitions of records. Two operations on RDDs: - Transformations: compute a new RDD from existing RDDs (flatMap, reduceByKey) runtime is lazy - doesn't immediately compute Actions: where some effect is requested: result to be stored, get specific value, etc. causes RDDs to materialize. PageRank (from paper): val links = spark.textFile(...).map(...).persist() // (URL, outlinks) var ranks = // RDD of (URL, rank) pairs for (i <- 1 to ITERATIONS) { //contribs contains (for each node) a set of rank contribution of its incoming neighbors val contribs = links.join(ranks).flatMap { (url, (links, rank)) => links.map(dest => (dest, rank/links.size)) } // Sum contributions by URL and get new ranks ranks = contribs.reduceByKey((x,y) => x+y) .mapValues(sum => 0.15/N + 0.85*sum) } What is an RDD (table 3, S4) list of partitions list of (parent RDD, wide/narrow dependency) function to compute partitioning scheme computation placement hint Each transformation takes (one or more) RDDs, and outputs the transformed RDD. // handling faults. when spark computes, by default it only generates one copy of the result, doesn't replicate. without replication, no matter if it's put in ram or disk, if node fails, on permanent failure, data is gone. when some partition is lost and needs to be recomputed, the scheduler needs to find a way to recompute it. (a fault can be detected by using a heartbeat) will need to compute all partitions it depends on, until a partition in ram/disk, or in replicated storage. if wide dependency, will need all partitions of that dependency to recompute, if narrow just one that rdd so two mechanisms enable recovery from faults: lineage, and policy of what partitions to persist the user can call persist on an rdd. with reliable flag, will keep a copy on disk with replicate flag, will write to stable storage (hdfs) without flags, will try to keep in ram (will spill to disk when ram is full) users manually checkpoint via calls to persist. q: why checkpoint? (it's expensive) a: long lineage could cause large recovery time. or when there are wide dependencies a single failure might require many partition re-computations. so can handle a node failure by recomputing lineage up to partitions that can be read from persisted collections in ram/disk/replicated storage. What happens when there isn't enough memory? - LRU (Least Recently Used) on partitions - first on non-persisted - then persisted (they are still available on disk) - user can have control on order of eviction via "persistence priority" - no reason not to discard non-persisted partitions (if they've already been used) - degrades to "almost" MapReduce behavior In figure 7, k-means on 100 Hadoop nodes takes 76-80 seconds In figure 12, k-means on 25 Spark nodes (with no partitions allowed in memory) takes 68.8 Difference could be because MapReduce would use replicated storage after reduce, but Spark by default would only spill to local disk, no network latency and I/O load on replicas. no architectural reason why Spark would be slower than MR