Distributed Systems Internals: Consensus, Fault Tolerance & Data Consistency¶
Under the Hood: How distributed systems achieve agreement, tolerate failures, and maintain consistency across unreliable networks — the exact data flows, message protocols, state machines, and mathematical guarantees.
1. The Fundamental Problem: Partial Failures¶
Unlike a single machine where components fail atomically (crash = stop), distributed systems experience partial failures: some nodes work, some don't, network partitions split components, messages arrive out of order or not at all.
flowchart TD
subgraph "Single Machine Failure Model"
SM[Process] -->|crash| SD[Everything stops atomically]
end
subgraph "Distributed Failure Model"
N1[Node A: alive]
N2[Node B: crashed]
N3[Node C: alive but partitioned]
N4[Node D: alive but slow]
N1 <-->|partition| N3
N1 -->|timeout?| N2
N4 -->|message delay 30s| N1
end
The challenge: from Node A's perspective, Node B crashed is indistinguishable from Node B responding very slowly. Timeouts are the only detection mechanism — but choosing the right timeout is impossible without knowing the maximum message delay.
The Two Generals Problem (Impossibility)¶
No protocol can guarantee two parties reach agreement over an unreliable channel. This is a proven impossibility — any confirmation message itself needs confirmation, ad infinitum.
sequenceDiagram
participant GA as General A
participant Net as Unreliable Network
participant GB as General B
GA->>Net: "Attack at dawn" (msg 1)
Note over Net: May be lost
Net-->>GB: msg 1 (maybe)
GB->>Net: "Acknowledged" (msg 2)
Note over Net: May be lost
Net-->>GA: msg 2 (maybe)
Note over GA: GA needs to confirm receipt of ACK...
Note over GA: Infinite regress — no finite protocol works
2. CAP Theorem: What You Must Sacrifice¶
Brewer's CAP theorem (proven formally by Gilbert & Lynch): a distributed system cannot simultaneously guarantee all three of: - C — Consistency: every read sees the most recent write - A — Availability: every request receives a response - P — Partition tolerance: system works despite network splits
graph TD
subgraph "CAP Triangle"
C[Consistency\nEvery read = latest write]
A[Availability\nEvery request gets response]
P[Partition Tolerance\nWorks despite network split]
C <-->|CA systems\nMySQL single-node| A
C <-->|CP systems\nHBase, ZooKeeper, etcd| P
A <-->|AP systems\nDynamoDB, Cassandra, CouchDB| P
end
Why P is non-negotiable in practice: Network partitions happen. You must tolerate them. The real choice is CP vs AP when a partition occurs:
- CP: Refuse writes (return error) until partition heals → consistent but unavailable
- AP: Accept writes on both sides of partition → available but divergent state
3. Consensus: Paxos Internal Mechanics¶
Paxos achieves consensus (agreement on a single value) despite node failures. Three roles: Proposer, Acceptor, Learner.
sequenceDiagram
participant P as Proposer
participant A1 as Acceptor 1
participant A2 as Acceptor 2
participant A3 as Acceptor 3
Note over P: Phase 1a: PREPARE
P->>A1: Prepare(n=5)
P->>A2: Prepare(n=5)
P->>A3: Prepare(n=5)
Note over A1,A3: Phase 1b: PROMISE
A1-->>P: Promise(n=5, accepted=(3,v1))
A2-->>P: Promise(n=5, accepted=nil)
A3-->>P: Promise(n=5, accepted=nil)
Note over P: Quorum received (2/3)
Note over P: Picks highest accepted value: v1
Note over P: Phase 2a: ACCEPT
P->>A1: Accept(n=5, v=v1)
P->>A2: Accept(n=5, v=v1)
P->>A3: Accept(n=5, v=v1)
Note over A1,A3: Phase 2b: ACCEPTED
A1-->>P: Accepted(n=5, v=v1)
A2-->>P: Accepted(n=5, v=v1)
A3-->>P: Accepted(n=5, v=v1)
Note over P: Quorum accepted → value v1 chosen
Paxos Ballot Number Invariant¶
Each Acceptor stores (maxPromised, acceptedBallot, acceptedValue). The invariant:
- An acceptor never promises to a ballot ≤ its maxPromised
- An acceptor never accepts a ballot ≤ its maxPromised
- If any value was accepted in a previous round, the proposer must propose that value
This ensures: once a value is chosen by a quorum, no future Paxos round can choose a different value.
4. Raft: Understandable Consensus¶
Raft decomposes consensus into three sub-problems: leader election, log replication, safety.
stateDiagram-v2
[*] --> Follower: Start
Follower --> Candidate: Election timeout (150-300ms)\nno heartbeat from leader
Candidate --> Leader: Wins majority vote\nRequestVote RPCs
Candidate --> Follower: Discovers valid leader\nor higher term
Leader --> Follower: Discovers server with\nhigher term
Candidate --> Candidate: Election timeout\n(split vote — retry)
Raft Log Replication Internal Flow¶
sequenceDiagram
participant C as Client
participant L as Leader
participant F1 as Follower 1
participant F2 as Follower 2
C->>L: Write(x=5)
Note over L: Append to local log\nEntry: (term=3, index=7, x=5)
L->>F1: AppendEntries(term=3, prevIndex=6, prevTerm=3, entries=[{7,x=5}], leaderCommit=6)
L->>F2: AppendEntries(...)
F1-->>L: Success (matchIndex=7)
F2-->>L: Success (matchIndex=7)
Note over L: Majority confirmed index=7\ncommitIndex advances to 7
Note over L: Apply to state machine: x=5
L-->>C: Success
Note over F1,F2: Next AppendEntries carries\nleaderCommit=7 → followers apply
Raft Safety: Log Matching Property¶
Two invariants make Raft safe: 1. Election Safety: At most one leader per term 2. Log Matching: If two logs contain an entry with same (index, term), all preceding entries are identical
The prevIndex/prevTerm check in AppendEntries enforces Log Matching — a follower rejects if it doesn't have a matching entry at prevIndex with prevTerm.
flowchart LR
subgraph "Leader Log"
L1["idx=1 t=1 SET x=1"]
L2["idx=2 t=1 SET y=2"]
L3["idx=3 t=2 SET x=3"]
L4["idx=4 t=3 SET z=4"]
L1-->L2-->L3-->L4
end
subgraph "Follower Log (diverged)"
F1["idx=1 t=1 SET x=1"]
F2["idx=2 t=1 SET y=2"]
F3_wrong["idx=3 t=2 SET x=99\n(from old leader)"]
F1-->F2-->F3_wrong
end
Note["AppendEntries at idx=3 fails\nprevTerm check → leader\ndecrements nextIndex and\nretries from idx=2 until\nmatch found, then overwrites"]
5. Distributed Transactions: Two-Phase Commit (2PC)¶
2PC coordinates atomic commits across multiple nodes (shards/databases).
sequenceDiagram
participant TM as Transaction Manager\n(Coordinator)
participant P1 as Participant 1\n(Shard A)
participant P2 as Participant 2\n(Shard B)
Note over TM: Phase 1: PREPARE
TM->>P1: Prepare(txn_id=42)
TM->>P2: Prepare(txn_id=42)
Note over P1: Acquire locks\nWrite to WAL: PREPARED
Note over P2: Acquire locks\nWrite to WAL: PREPARED
P1-->>TM: Vote YES
P2-->>TM: Vote YES
Note over TM: Write COMMIT to WAL\n(point of no return)
Note over TM: Phase 2: COMMIT
TM->>P1: Commit(txn_id=42)
TM->>P2: Commit(txn_id=42)
P1-->>TM: ACK
P2-->>TM: ACK
Note over TM: Write END to WAL\nRelease transaction
2PC Failure Scenarios and WAL Recovery¶
flowchart TD
subgraph "Coordinator WAL States"
S1[INIT] --> S2[PREPARED]
S2 --> S3[COMMITTED]
S2 --> S4[ABORTED]
S3 --> S5[ENDED]
S4 --> S5
end
subgraph "Recovery Logic on Restart"
R1{WAL contains\nCOMMITTED?} -->|yes| R2[Re-send COMMIT to all participants]
R1 -->|no, contains PREPARED| R3[Re-send ABORT to all participants]
R1 -->|no WAL record| R4[Transaction never started — ignore]
end
2PC's Achilles Heel — Blocking: If the coordinator crashes after writing COMMIT but before sending to participants, participants are blocked indefinitely — they hold locks but cannot commit or abort without coordinator decision. 3PC (Three-Phase Commit) adds a pre-commit phase to reduce blocking but doesn't eliminate it under network partitions.
6. MVCC: Multi-Version Concurrency Control Internals¶
MVCC (used by PostgreSQL, MySQL InnoDB, CockroachDB) maintains multiple versions of each row, allowing readers and writers to not block each other.
flowchart TD
subgraph "Row Versions in PostgreSQL"
V1["xmin=100, xmax=200\nname='Alice', salary=50000\n(visible to txns 100..199)"]
V2["xmin=200, xmax=350\nname='Alice', salary=60000\n(visible to txns 200..349)"]
V3["xmin=350, xmax=INF\nname='Alice', salary=70000\n(visible to txns 350+)"]
V1 --> V2 --> V3
end
subgraph "Transaction Snapshot"
T["Txn 280 sees xmax>280\n→ reads V2 (salary=60000)\nNever sees V3 (xmin=350 > 280)"]
end
MVCC Read Path¶
Each transaction receives a snapshot at start: (xmin, xmax, active_xids[]). A row version is visible if:
- row.xmin <= snapshot.xmax (created before snapshot)
- row.xmin not in active_xids[] (creator committed)
- row.xmax > snapshot.xmin OR row.xmax is in active_xids[] (not yet deleted)
sequenceDiagram
participant App as Application
participant DB as PostgreSQL
participant WAL as Write-Ahead Log
App->>DB: BEGIN TRANSACTION
DB-->>App: Snapshot: xmin=500, active=[502,503]
App->>DB: SELECT salary FROM emp WHERE id=1
Note over DB: Heap scan: find all versions of id=1\nFilter by snapshot visibility
DB-->>App: Returns version with xmin=498 (committed, not active)
App->>DB: UPDATE emp SET salary=80000 WHERE id=1
Note over DB: Mark old version xmax=current_txn_id\nInsert new version with xmin=current_txn_id
Note over WAL: WAL records: old row xmax + new row insert
DB-->>App: 1 row updated
App->>DB: COMMIT
Note over WAL: WAL flush to disk (fsync)\nCommit record written
DB-->>App: COMMIT OK
7. Vector Clocks and Causality Tracking¶
Vector clocks track happened-before relationships in distributed systems without requiring synchronized clocks.
flowchart LR
subgraph "Node A"
A1["A:[1,0,0]\nWrite x=1"]
A2["A:[2,0,0]\nSend msg to B"]
A3["A:[3,2,0]\nReceive from B\nmerge: max([2],[1,2,0])=[3,2,0]"]
end
subgraph "Node B"
B1["B:[1,1,0]\nReceive from A\nA's clock:[2,0,0] → merge=[2,1,0]"]
B2["B:[2,2,0]\nWrite y=5"]
B3["B:[3,3,0]\nSend msg to A"]
end
A2 -->|send [2,0,0]| B1
B3 -->|send [3,3,0]| A3
Conflict detection: Two events are concurrent (neither happened-before the other) if neither vector clock dominates the other:
- A=[2,1,0] vs B=[1,2,0] → concurrent → conflict → need merge
DynamoDB uses vector clocks (called "version vectors") to detect write conflicts and return multiple conflicting versions to the application for resolution.
8. Consistent Hashing and Distributed Hash Tables¶
Consistent hashing minimizes key remapping when nodes join/leave. The ring maps both keys and nodes to [0, 2^32) using the same hash function.
flowchart TD
subgraph "Hash Ring [0, 2^32)"
direction LR
K1["Key 'user:1'\nhash=15%"] -->|clockwise lookup| N1["Node A\nposition=20%"]
K2["Key 'user:5'\nhash=45%"] -->|clockwise lookup| N2["Node B\nposition=50%"]
K3["Key 'user:9'\nhash=85%"] -->|clockwise lookup| N3["Node C\nposition=90%"]
N3 --> N1
end
subgraph "Node Join: Node D at 35%"
D["Node D added\nposition=35%"]
Remapped["Keys 21%-35%\nmove from B to D\n(~1/N of B's keys)"]
Unchanged["All other keys\nunchanged"]
end
Virtual Nodes for Load Balance¶
A single node gets multiple positions on the ring (virtual nodes / vnodes):
flowchart LR
RealA["Physical Node A"]
RealB["Physical Node B"]
RealC["Physical Node C"]
VA1["A-vnode-1 @5%"] --> RealA
VA2["A-vnode-2 @40%"] --> RealA
VA3["A-vnode-3 @75%"] --> RealA
VB1["B-vnode-1 @15%"] --> RealB
VB2["B-vnode-2 @55%"] --> RealB
VB3["B-vnode-3 @85%"] --> RealB
VC1["C-vnode-1 @25%"] --> RealC
VC2["C-vnode-2 @65%"] --> RealC
VC3["C-vnode-3 @95%"] --> RealC
With 150 vnodes per physical node, load imbalance is <10% statistically.
9. Eventual Consistency and CRDTs¶
CRDTs (Conflict-free Replicated Data Types) allow replicas to diverge and merge without coordination, guaranteeing eventual convergence by mathematical construction.
flowchart TD
subgraph "G-Counter CRDT (Grow-Only)"
N1S["Node 1: {N1:3, N2:2, N3:5}\nlocal total=10"]
N2S["Node 2: {N1:3, N2:4, N3:5}\nlocal total=12"]
Merge["Merge: element-wise max\n{N1:3, N2:4, N3:5}\ntotal=12"]
N1S --> Merge
N2S --> Merge
end
subgraph "PN-Counter (Increment + Decrement)"
P["P (positive): G-Counter"]
N["N (negative): G-Counter"]
Val["value = sum(P) - sum(N)"]
P --> Val
N --> Val
end
subgraph "LWW-Register (Last-Write-Wins)"
W1["Write(x=5, ts=T1)"]
W2["Write(x=7, ts=T2, T2>T1)"]
Res["Result: x=7 (T2 wins)\nRequires synchronized clocks\nor Lamport timestamps"]
W1 --> Res
W2 --> Res
end
OR-Set (Observed-Remove Set) Internals¶
Simple add/remove sets have the "remove wins" vs "add wins" ambiguity. OR-Set tags each add with a unique ID:
add("a") → {("a", uid1)}
add("a") → {("a", uid1), ("a", uid2)}
remove("a") → removes all observed uid pairs for "a"
concurrent add("a") after remove → uid3 survives (not in remove set)
10. Distributed Tracing: Span Propagation Internals¶
sequenceDiagram
participant Client as Browser Client
participant API as API Gateway
participant Auth as Auth Service
participant DB as Database
Note over Client: traceparent: 00-trace_id-span_id-01
Client->>API: HTTP GET /orders\ntraceparent: 00-abc123-0001-01
Note over API: Extract trace_id=abc123\nCreate child span_id=0002\nparent_span_id=0001
API->>Auth: gRPC CheckToken\ngrpc-trace-bin: (abc123, 0002)
Note over Auth: Create child span_id=0003
Auth-->>API: Token valid (span 0003 ends)
API->>DB: SELECT * FROM orders\nComment: /* traceid=abc123 spanid=0004 */
Note over DB: Query executed (span 0004 ends)
DB-->>API: Results
Note over API: Span 0002 ends
API-->>Client: 200 OK
Note over Client,DB: Async: spans exported to\nJaeger/Zipkin via OTLP\n(batched, out-of-band)
Sampling Decision Propagation: The traceparent flag byte encodes the sampling decision. If the root span decides to sample (probabilistic, 1%), all downstream services inherit that decision — this ensures complete traces, not partial traces.
11. Gossip Protocol: Epidemic Information Dissemination¶
Gossip (used by Cassandra, Consul, Redis Cluster) spreads information in O(log N) rounds.
flowchart TD
subgraph "Round 1 (1 infected)"
I["Node A\n(knows new info)"]
I -->|random peer| R1["Node D\n(now infected)"]
end
subgraph "Round 2 (2 infected)"
I2["Node A"] -->|random| R2["Node B"]
ID2["Node D"] -->|random| R3["Node G"]
end
subgraph "Round 3 (4 infected)"
N1["A→F"]
N2["B→C"]
N3["D→H"]
N4["G→E"]
end
subgraph "Convergence"
Conv["After k=log₂(N) rounds\n~50% nodes informed\nAfter 3k rounds\n~99.9% nodes informed\nP(not_infected) = (1-1/N)^(kN) ≈ e^(-k)"]
end
Phi Accrual Failure Detector (Cassandra)¶
Instead of binary alive/dead, phi failure detector outputs a suspicion level φ based on inter-arrival times of heartbeats:
Where P_later is the probability that the next heartbeat arrives after time t given a Gaussian model of past inter-arrival times. φ=1 → 90% confidence of failure. φ=8 → 99.999999%.
12. Linearizability vs Serializability¶
flowchart TD
subgraph "Consistency Models Hierarchy"
SR["Strict Serializability\n(strongest)\nLinearizable + serializable\nSpanner, FoundationDB"]
LIN["Linearizability\n(single-object real-time)\nEtcd, ZooKeeper\nOpens a 'register' abstraction"]
SEQ["Sequential Consistency\n(global order, not real-time)\nOld CPUs, some GPU memory models"]
SER["Serializability\n(multi-object transactions)\nPostgreSQL SERIALIZABLE\nno real-time constraint"]
SI["Snapshot Isolation\nMVCC read consistency\nbut write skew possible"]
RC["Read Committed\nno dirty reads\nphantas reads possible"]
RU["Read Uncommitted\ndirty reads allowed"]
SR --> LIN
SR --> SER
LIN --> SEQ
SER --> SI
SI --> RC
RC --> RU
end
Write Skew Example (SI allows, Serializable prevents):
- Txn A reads: "2 doctors on call" → decides one can go off call
- Txn B reads: "2 doctors on call" → decides one can go off call
- Both commit → 0 doctors on call (violates invariant)
- Under SI: both pass (each read a consistent snapshot before the other's write)
- Under Serializable: one aborts (serialization conflict detected)
13. Google Spanner: TrueTime and External Consistency¶
Spanner achieves external consistency (strict serializability globally) using GPS + atomic clock hardware providing bounded time uncertainty.
sequenceDiagram
participant App as Application
participant S as Spanner Server
participant TT as TrueTime API
App->>S: COMMIT transaction T
S->>TT: TT.now()
TT-->>S: [earliest=T.early, latest=T.late]\nuncertainty ε typically 1-7ms
Note over S: commit_timestamp = T.late + ε\n(after the absolute latest possible now)
Note over S: WAIT until TT.now().earliest > commit_timestamp\n("commit wait" — typically 10-14ms)
S-->>App: COMMIT OK with timestamp=T_commit
Note over App,S: Any future transaction that starts\nafter receiving this ACK will have\nstart_timestamp > T_commit\n→ guaranteed to see this write\n(external consistency)
The commit wait is the price of external consistency: Spanner deliberately delays COMMIT acknowledgment by the TrueTime uncertainty interval to ensure no future transaction starts with a timestamp before the current commit.
14. Partition Repair: Anti-Entropy with Merkle Trees¶
After a network partition heals, replicas must reconcile diverged data. Merkle trees make this efficient.
flowchart TD
subgraph "Replica A Merkle Tree"
RA_root["Root: hash(AB+CD)=H1a"]
RA_ab["hash(A+B)=H2a"]
RA_cd["hash(C+D)=H3a"]
RA_a["A: h(v1)"]
RA_b["B: h(v2)"]
RA_c["C: h(v3)"]
RA_d["D: h(v4a)\n(diverged)"]
RA_root --> RA_ab --> RA_a
RA_ab --> RA_b
RA_root --> RA_cd --> RA_c
RA_cd --> RA_d
end
subgraph "Replica B Merkle Tree"
RB_root["Root: hash(AB+CD)=H1b\n≠H1a → diverged"]
RB_ab["hash(A+B)=H2b=H2a\n(same — skip subtree)"]
RB_cd["hash(C+D)=H3b\n≠H3a → recurse"]
RB_d["D: h(v4b)\n(different value)"]
RB_root --> RB_ab
RB_root --> RB_cd --> RB_d
end
subgraph "Sync Result"
SR["Only key D needs sync\nO(log N) tree traversal\nvs O(N) full comparison"]
end
Cassandra uses Merkle trees for repair operations. Each node builds a Merkle tree of its token range. Tree comparison identifies diverged leaf nodes (individual keys or key ranges) that need reconciliation.
15. Distributed Lock Service: Chubby/etcd Internals¶
sequenceDiagram
participant C1 as Client 1
participant C2 as Client 2
participant E as etcd Cluster
participant L as Lease Manager
Note over C1: Acquire distributed lock
C1->>E: PUT /locks/resource-x\nvalue=client1-id\nLease TTL=10s (CreateLease first)
Note over E: Raft consensus: replicate to majority
E-->>C1: OK, lease_id=42, revision=100
C2->>E: PUT /locks/resource-x (try)
Note over E: Key already exists
E-->>C2: Key exists — watch for DELETE
Note over C1: C1 must keepalive lease
loop every 5s
C1->>E: LeaseKeepAlive(lease_id=42)
E-->>C1: TTL renewed
end
Note over C1: C1 crashes (no more keepalive)
Note over E: Lease 42 expires after 10s\nKey /locks/resource-x deleted
E-->>C2: Watch event: DELETE revision=101
C2->>E: PUT /locks/resource-x\nvalue=client2-id, new lease
E-->>C2: OK — lock acquired
Fencing Token: The revision number (100, 101...) is a fencing token — monotonically increasing. C1 uses revision 100 in all downstream operations. When the lock expires and C2 gets revision 101, any downstream service that sees a C1 request with revision 100 after seeing revision 101 rejects it (stale request detection).
Summary: Key Distributed Systems Properties¶
| Property | Mechanism | Cost |
|---|---|---|
| Consensus | Paxos/Raft (quorum writes) | 1 RTT for reads, 2 RTTs for writes |
| External Consistency | TrueTime commit wait | 10-14ms latency floor |
| Eventual Consistency | CRDT merge / gossip | No coordination, conflict possible |
| Distributed Txn | 2PC coordinator | Blocking on coordinator failure |
| Causality Tracking | Vector clocks | O(N) clock size |
| Fault Detection | Phi accrual / heartbeat | False positive risk on slow network |
| Partition Repair | Merkle tree anti-entropy | Background CPU/IO for tree computation |
| Distributed Lock | etcd lease + fencing | Lease expiry latency on crash |