Spanner: Google's Globally-Distributed Datase Recap: 2PL+2PC tx_begin(tx) {} tx_read(tx, key) { On_server(key) { RLock(key) val <- db[key] } tx.reads.add(key) return val; } tx_write(tx,key,val) { tx.writes.add((key,val)) //buffer writes } tx_commit(tx) { //2PC prepare foreach w in tx.writes On_server(w.key) { WLock(w.key) log(tx.id, w.key, w.value) } if 2PL successfully prepared all tx.writes, do following 2PC commit foreach w in tx.writes { On_server(w.key) { db[w.key] = db[w.val] Unlock(key) } } foreach r in tx.reads { On_server(r.key) { Unlock(r.key) } } } This is identical to local database' 2PL procedure except that operations are performed on different servers (On_server(...))and the commit procedure requires a 2-phase-commit protocol. Note: I'm assuming the client acts as the transaction coordinator. How to recover from various coordinator/server failure is missing from this pseudocode. Now let's discuss Spanner. what are Spanner's big ideas? sharding data w/ paxos replication (possibly across wide-area) semantics? (see paper question) strictly serializable (external consistency, linearizability) transactions using 2PL+2PC efficient read-only transactions strictly serializable reads using TrueTime Wide-area synchronous replication via Paxos: goal: survive single-site disasters, replicate data at sites closer to users What's the theoretical performance bound of paxos? if two sites are x ms away, what's the latency of an op ? - writes: 1 roundtrip w/ leader + bulk prepare ==> at least x ms - reads occur at leader using leases (need not contact majority) Actual performance? experiments run on datacenters <1ms latency Table 3 pretend just measuring paxos for writes, read at any replica for reads latency why doesn't write latency go up w/ more replicas? why does std dev of latency go down w/ more replicas? r/o a *lot* faster since not a paxos agreement + use closest replica throughput why does read throughput go up w/ # replicas? why doesn't write throughput go up? does write thruput seem to be going down? what can we conclude from Table 3? is the system fast? slow? *use sites close to each other *write apps to tolerate delays may make many slow writes, but issue them in parallel How does Paxos work with 2PL+2PC //in the pseudocode, change On_server(key) --> On_PaxosGroup(key) //issued to Paxos leader Optimized read-only transactions (no more 2PL) why? read-only transactions tend to be the majority read-only transactions can also be expensive (e.g. a tx generating a summary report might read many many stuff) - better not to let read-only reads grab locks (and block write transactions) Borrow idea from Snapshot isolation assign each r/w transaction a commit timestamp keep multiple versions for each data item read-only transaction read the latest version smaller than its start_ts //rwtx_* : read-write transaction's commit logic rwtx_begin() { } rwtx_read(key) { ... } rwtx_write(w) { ... } rwtx_commit(tx) { //2PC prepare foreach w in tx.writes { On_PaxosGroup(w.key) { WLock(w.key) log(tx.id, w.key, w.value) } } //if 2PL successfully prepared all tx.writes, do following 2PC commit tx.commit_ts = Oracle.get_timestamp() foreach w in tx.writes { On_PaxosGroup(w.key) { db[w.key].add(w, tx.commit_ts) Unlock(key) } } foreach r in tx.reads { On_PaxosGroup(r.key) { Unlock(r.key) } } } //rotx_* read-only transaction logic rotx_begin(tx) { tx.start_ts = Oracle.get_timestamp(); } rotx_read(tx,key) { On_PaxosGroup(key) { if exists WLock(key) whose proposed commit_ts is smaller than ts.start_ts, wait //a conflicting 2PL is going on, must block val = lattest version of db[key] smaller than tx.start_ts } return val } How to implement the time oracle? Approach #1: Use a single timestamp server (like in Percolator[OSDI'2010]), but, - Percolator works within a single data center, Spanner work across many geographically separate data centers. Obtain timestamp from a timestamp server running in the remote data center is slow.... Approach #2: Obtain timestamps from the server's local clock. - rw-transaction: a=1, b=1, commit timestamp = 10. Finished. |___________commit_ts=10___db[a]=1 db[b]=1_____| - ro-transaction: a? b? |___start_ts = 5___a=0, b=0___| |___start_ts = 5___a=0, b=0___| |___start_ts = 11___a=1, b=??___| How to decentralize timestamping? Spanner's solution: TrueTime (section 3) there is an actual "absolute" time t_abs but server clocks are typically off by some variable amount Assume the error has a known bound so now() yields an interval: [earliest,latest] earliest and latest are ordinary scalar times t_abs is highly likely between earliest and latest TrueTime is implemented by synchronizing with time masters (GPS clock & atomic clock) rwtx_commit(tx) { //2PC prepare foreach w in tx.writes { On_PaxosGroup(w.key) { WLock(w.key) log(tx.id, w.key, w.value) reply timestamp //larger than all previously prepared transactions } } //if 2PL successfully prepared all tx.writes, do following 2PC commit tx.commit_ts = max(timestamps_of_2PC-prepare_replies, TT.now().lastest) wait till TT.after(tx.commit_ts) //2PL commit... omitted } rotx_begin() { tx.begin_ts = TT.now().latest } What if omitting obtaining timestamps in 2PC-prepare |____a=1____| (commit_ts = 20) |______a=2__________| (commit_ts = 18, on a different coordinator) What happens if omitting 'wait till TT.after(tx.commit_ts)' suppose time error bound is 10ms obtain commit timestamp at t_abs = 12ms (commit_ts = 20) |______x______________________| | | | 10 t_abs = 12 20 finish committing at t_abs = 15, return to client |______ _________________x____| | | | 8 16 18 at t_abs = 16, another client issues ro_transaction and obtained timestamp 18 //violating linearizability Spanner's Other optimiations: allow read from any replica, not just Paxos leader Why is it important for read-only tx to read from Paxos leader? -- so it's guaranteed to see the active write-lock and wait accordingly When can one read from any replica? two requirements 1. the replica is up to date (so when it returns the latest version less than tx.start_ts, it's the correct one) 2. there's no ongoing 2PC that could affect the current read-only tx (that's why we block if WLock exists in the old implementation.) Requirement-1 is relatively easy to implement. How about requirement 2? We cannot know the precise commit timestamp of a read-write transaction while it is still being prepared. however, we could guarantee that a currently-being-prepared read-write transaction will have a commit_ts *larger* than T. If so, then if the read-only tx has tx.start_ts smaller than T, it will not be affected by the read-write transaction currently being prepared. To make the guarantee, we assign 2PC-prepare timestamps and assign commit_ts to be larger than all prepare_ts's seen. Now, to guarantee requirement-1 and 2, each Paxos replica keep track of a safe timestamp T_safe (maximum time when a replica is up-to-date, i.e. no outstanding paxos write nor prepare can occur before T_safe) T_safe = min(TPaxos_safe, TPrepare_safe) rwtx_commit(tx) { //2PC prepare foreach w in tx.writes On_PaxosGroup(w.key) { Wlock(w.key) log(tx.id, w.key, w.value) w.prepare_ts = TT.now().latest() //give 2PC prepare a timestamp } if 2PL successfully prepared all tx.writes, do following 2PC commit tx.commit_ts = max(TT.now().latest, w.prepare_ts forall tx.writes) wait till TT.after(tx.commit_ts) ... } rotx_read(tx,key) { On_AnyPaxosReplica(key) { wait till T.safe > tx.start_ts val = latest version of db[key] less than tx.start_ts } return val } Spanner's Other Optimization: how to avoid blocking (waiting for T_safe) for single read read-only transactions Idea: while a conflicting rw transaction is in the middle of prepare, it's best to read earlier value than to block (both ensure linearizability) single_read_rotx_begin() { //go to Paxos leader tx.start_ts = timestamp of the last committed write } Linearizability is costly writes must go through wide area reads might block for writes (that go through wide area) Lynx: distributed transactions serializability (not linearizability) low latency example application: auction [ based on Figures 1, 5, 7 ] Bids table: bidder, seller, itemid, price sharded by bidder Items table: seller, itemid, price sharded by seller what does a distributed transaction look like in lynx? transaction chain: sequence of local operations (hops) on individual servers each local operation executes atomically (e.g., locks the database) first hop acts as the coordinator for the rest of the transaction executes it hop-by-hop on appropriate servers e.g., placing a bid [ based on Figure 7 ] insert a bid into the Bids table update price in the Items table (if new bid is higher) how does lynx achieve low latency? don't wait for all servers involved in transaction: so, not 2PC/2PL only the first participant can choose to abort (others cannot) the before-or-after atomicity guarantee is slightly weaker what if another transaction accesses the same data, concurrent with place_bid? scenario: place_bid || (look up current list of bids by a given user (bidder)) no problem: place_bid returns after first hop commits reading Bids table will return new bid if lookup runs before place_bid, does not observe new bid scenario: place_bid || (look up current price of an item) maybe the second hop of place_bid hasn't executed yet may observe the old price in Items table, even though Bids was updated why is this legal? serializability: execution is equivalent to some serial order this serial order need not be the order in which transactions were issued re-ordering: later transaction "appears" to run first not linearizability / strict serializability (as achieved by 2PL) what to do about this problem? lynx: "read-my-writes" guarantee chain guaranteed to observe prior writes from same session can still observe inconsistency: two users talk to each other out-of-band suppose Alice places bid, then calls Bob on the phone to tell him about item does this matter? how big can this inconsistency window get? potentially arbitrarily large if intermediate hop server/network is slow, chain will get stuck there Transaction: set_two_identical_numbers(c) { hop1: x = c; hop2: y = c; } scenario: T1:set_two_numbers(1) || T2:set_two_numbers(2) ?? T1_1 T2_1 T2_2 T1_1 how does lynx deal with this atomicity violation? analyze transactions for conflicts assumes all transactions are known ahead of time (why is this reasonable?) but does not require knowing the arguments to transactions consider all pairs of transaction, and the hops in those transactions S (sibling) edges between hops: hops are part of the same txn C (conflict) edges between hops: hops conflict is a "conflict" the same thing in lynx and epaxos? yes, same notion: order of operations matters SC analysis helps determine if it's safe to run hops individually (no 2PL) problem if there exists an SC cycle (cycle containing both S & C edges) what to do if there's an SC cycle? in principle, paper says could use 2PC/2PL in practice, lynx did not implement 2PC/2PL application developer should try hard to avoid these situations - commutativity e.g., two place_bid invocations SC cycle: might require 2PC/2PL authors suggest Bids insert does not conflict: commutative - origin-ordering since all set_two_identical_numbers chains start at item x, ordering is preserved. when might transaction chains not work well? don't know transactions ahead of time (cannot analyze for SC cycles) need linearizability lots of chains converge on a single non-first-hop server first-hop servers queue up lots of pending chains all of them block waiting to execute hop on bottleneck server Spanner vs. Lynx is Spanner strictly better? simpler to program, but higher latency for cross-shard transactions how important is this? depends entirely on the application how would you implement auction in spanner? option 1: transaction modifying Bids and Items wait to contact both bidder's and seller's data centers might be far away: australia, etc so, high latency option 2: separately modify Bids and Items no all-or-nothing atomicity: might insert bid, crash before updating price no before-or-nothing atomicity: might have unexpected interleavings so, difficult to get consistency with lynx, somewhat easier to achieve low latency and consistency but does require programmer to think a bit more about consistency