Distributed transactions What we've learnt so far - local transactions (ACID) - I(solation): serializability - concurrency control (2 phase locking) Serializability: - Any history of execution (for a group of transactions) has the same effect on the database and produces the same output as some serial execution schedule of those transctions. How is it related to linearizability? - Very similar, except linearizability is defined for abstract data types (FIFO queues, lists): each operation involves 1 data object. (transactions access >=1 data itmes). - Additionally, in linearizability, equivalent serial schedule must obey completion-to-issuance order. - We could demand the same for the equivalent serial schedule for transactions ==> strict serializability Distributed transactions: Why distribution? - Data is huge, must be spread across many machines - Scale performance out across machines - Replicate data to tolerate failures Why transactions? How? Review Mechanisms for local transaction: 2PL with REDO logging begin_tx read \ write - (2PL) ... / end_tx: During execution, buffer writes (do not modify in-memory/disk database state) - On write a=val, Wlock(a), tx.writes[a]=val - On read a, Rlock(a), tx.reads[a] = true At commit 1. Make a log record containing tx.writes, flush log record to disk 2. foreach x in tx.writes, database[x] = tx.writes[x] 3. foreach x in tx.reads and tx.writes, unlock x. In fact, since we are not directly modifying data, but only buffering writes, we can grab locks at commit time 0. wlock a, wlock b 1. log .... T1={a=1, b=1} (assume initially a=0, b=0) * why not directly write to main memory state? transaction could be aborted later (due to deadlock, due to users explicitly aborting etc.) no way to revert (if logs contains both REDO and UNDO information, then it'll be okay) * why flushing log record to disk first and then write to database state? (if reverse steps, then a crash in the middle leaves unrecoverable, unserializable state: e.g. a=1 but b=0.) How does it work in a distributed setting? begin_T1 a=1 b=1 ... end_T1: (at commit time, lock items in write_sets, persist corresponding transaction logs, unlock data items) Draw a graph of transaction coordinator, data servers. client --> transaction coordinator ---> server-a ---> server-b * how about log persistence at commit time? - a single logging server ==> scalability bottleneck - log modified "a" at server-a and modified "b" at server-b * A strawman protocol coordinator --> server-a: lock a, log a=1, write a=1 to database state, unlock a coordinator --> server-b: lock b, log b=1, write b=1 to database state, unlock b Problem (failure to commit) what is transaction cannot commit (e.g. deadlocks) Problem (failure): coordinator crashes after message to a (before message to b). A later transaction T2 sees a=1, but the information about b=1 is permanently lost! Problem (serializability violation) a's message to server A arrives read(a = 1) read(b = 0) b's message to server B arrives 2PC (Two-phase commit) coordinator --> server-a: prepare-T1: server-a lock a, logs a=1, coordinator --> server-b: prepare-T1: server-b lock b, logs b=1 coordinator --> server-a: commit-T1: write a=1 to database state, unlock a coordinator --> server-b: commit-T1: write b=1 to database state, unlock b Now if coordinator crashes after prepare-a before prepare-b, a recovery protocol should abort T1 (no other transactions can read a=1 since a is still locked) Now if coordinator crashes after commit-a before commit-b, a recovery protocol should send commit-T1 to b. How does the recovery protocol work? Two options: Option 1: Coordinator can unilaterially determine the commit status of a transaction e.g. Coordinator receives prepare-ok(T1) from server-a, but times out on server-b, Coordinator can abort T1 (even if server-b has successfully prepared T1). Coordinator durably logs its decision (e.g. to a Paxos RSM). Recovery protocol reads from coordinator's log to decide to commit or abort. Option 2: Coordinator can *not* unilaterially determine the commit status of a transaction If both server-a and server-b successfully prepared T1, then T1 must commit participants log must be durable against failure (e.g. log replicated via Paxos RSM) Recovery protocol must read all participating server's log to decide commit or abort. The Percolator paper (OSDI'10): What's the problem? * Maintain Google's crawled document collection * Not a simple collection of texts, transformation/computation is needed - eliminate duplicates - invert hyperlinks * Existing solution process an entire doc collection (newly crawled + old ones) in a batch. Ideally, should only need to process the new ones. The challenge: maintain invariants across servers insert_doc(doc) { content_table.put(doc.url, doc.contents); hash = md5sum(doc.contents); canonical_url = dups_table.get(hash); if (!canonical_url || canonical_url > doc.url) { dups_table.put(hash, doc.url); } } What can go wrong in the above code? * Failures example. - doc inserted, but does not appear in the collection of canonical urls * Concurrency example: - two concurrent inserts of nyt.com nytimes.com Why transactions help? Why do they need distributed transactions? Why does Percolator decide to support SI instead of serializability? - avoid read locks so that transactions can read lots and lots of data cheaply How to implement SI in a distributed setting? begin_tx() { tx.start_ts = get_timestamp() } read(key) { //issue key to the server responsible for key //At server versioned_data = db[key] v = highest version smaller than tx.start_ts in versioned_data return v.data } write(key, value) { tx.writes[key] = value } commit_tx(tx) { foreach w in tx.writes issue 2PC_prepare to server responsible for w if (all replies are okay) { tx.commit_ts = get_timestamp() foreach w in tx.writes issue 2PC_commit to server responsible for w } else { foreach w in tx.writes issue 2PC_abort to server responsible for w } } on_2PC_prepare(tx, w) { lock(w.key) if lock is unavabile return 2PC_vote_no; //check for write-write conflict if there exists db[key] whose version is greater than tx.start_ts return 2PC_vote_no; log(tx.id, w.key,w.data); return 2PC_vote_yes; } on_2PC_commit(tx, w) { w.version = tx.commit_ts; add w to db[key] unlock(w.key) } Percolator is built on top of BigTable (a versioned multi-coloumn key-value store). How is its design different from one outlined above? Let's look at Pseudocode for Perrcolator in Figure 6? * pre-write is similar to the prepare phase in 2P commit. - why line 32? (check for write-write conflict) - why line 34? (concurrent 2P commit on overlapping data) * commit() - can I move line 48 to after 43? - ignore line 53 for now How does Percolator handle failures? * data is replicated by BigTable, hence no logging is necessary (BigTable internally logs) * But, transaction coordiator (Percolator worker) can fail. Consequence: locks held during 2P commit are never cleaned up. The idea: if other workers notice that a lock is held for a long time, it cleans up. Challenge: what if B thinks A is dead but A is not dead and still in the process of committing? Percolator solution: - Every transaction corresponds to a primary lock. - All other locks in a transaction point to its primary lock. - Commit replaces the primary lock with a write record - Cleanup erases the primary lock if it's still there. - Access to primary lock is synchronized by the underlying BigTable single row transaction.