Learning Apache Kafka (2nd Edition) — Under the Hood¶
Source: Nishant Garg, Packt Publishing, 2015 (Kafka 0.8.x era)
Focus: Internal broker architecture, ZooKeeper coordination, partition log mechanics, ISR replication pipeline, producer/consumer offset state machines
1. Kafka's Architectural DNA: Why It Was Built This Way¶
Kafka emerged from LinkedIn's need to handle activity stream data and operational metrics at scale — a problem that JMS-style queuing systems couldn't solve efficiently due to broker-side metadata overhead, synchronous delivery contracts, and single-consumer semantics.
The core design principles that shape every internal decision:
flowchart TD
subgraph Design["Kafka Design Axioms"]
A["Messages are log segments<br/>on OS page cache"] -->|immutable append| B["Sequential I/O<br/>100x faster than random"]
B --> C["Consumers pull, never push<br/>State held by consumer, not broker"]
C --> D["Broker is stateless<br/>Time-based SLA for retention"]
D --> E["Partitions are the unit<br/>of parallelism and replication"]
end
These axioms cascade into every subsystem: the log format, ZooKeeper schema, ISR mechanism, and consumer group rebalancing protocol.
2. Broker Internal Architecture: Log Segments and Page Cache¶
Each Kafka broker maps every topic-partition to a sequence of segment files on disk. The broker never loads these into heap — it relies entirely on the OS kernel's page cache (mmap-style zero-copy).
block-beta
columns 3
block:disk["Disk — /tmp/kafka-logs/"]:3
seg0["segment-0.log\n(0 → 999)"]
seg1["segment-1.log\n(1000 → 1999)"]
seg2["segment-2.log\n(2000 → active)"]
end
block:idx["Index Files"]:3
i0[".index (offset → byte pos)"]
i1[".index"]
i2[".index (active)"]
end
block:cache["OS Page Cache"]:3
p0["Hot pages\n(recent writes)"]
p1["Warm pages\n(recent reads)"]
p2["Cold pages\n(eviction candidates)"]
end
seg2 --> p0
i2 --> p0
Write path internals:
1. Producer sends ProduceRequest → broker's SocketServer thread accepts via Java NIO
2. RequestHandlerPool dispatches to ReplicaManager.appendRecords()
3. Log.append() serializes MessageSet → writes to active segment's FileChannel
4. OS page cache absorbs write → async flush to disk after log.flush.interval.messages or log.flush.interval.ms
5. Once flushed, LogManager makes segment available to consumers by advancing High Watermark (HW)
sequenceDiagram
participant P as Producer
participant SN as SocketServer<br/>(NIO thread)
participant RH as RequestHandler<br/>(thread pool)
participant RM as ReplicaManager
participant Log as Log.append()
participant FS as FileSystem<br/>(page cache)
P->>SN: ProduceRequest(topic, partition, messages)
SN->>RH: enqueue request
RH->>RM: appendRecords(leaderEpoch, messages)
RM->>Log: append to active segment
Log->>FS: FileChannel.write(ByteBuffer)
FS-->>Log: buffered in page cache
Log-->>RM: LogAppendInfo(firstOffset, lastOffset)
RM-->>P: ProduceResponse(acks) after ISR flush
3. ZooKeeper Coordination Fabric¶
In Kafka 0.8.x, ZooKeeper serves as the central coordination layer — a hierarchical key-value store that all brokers and consumers watch for state changes.
ZooKeeper Znode Schema¶
flowchart TD
root["/"] --> brokers["/brokers"]
root --> consumers["/consumers"]
root --> config["/config"]
brokers --> ids["/brokers/ids/\n├── 0 (broker 0 alive)\n├── 1 (broker 1 alive)\n└── 2 (broker 2 alive)"]
brokers --> topics["/brokers/topics/\n└── my-topic/\n └── partitions/\n ├── 0/state → leader=1, ISR=[1,2]\n ├── 1/state → leader=2, ISR=[2,0]\n └── 2/state → leader=0, ISR=[0,1]"]
consumers --> groups["/consumers/\n└── my-group/\n ├── ids/ (live consumers)\n ├── owners/ (partition → consumer)\n └── offsets/ (partition → offset)"]
Key coordination flows:
| Event | ZooKeeper Operation |
|---|---|
| Broker starts | Creates ephemeral znode /brokers/ids/{id} |
| Broker fails | Ephemeral znode auto-deleted → triggers watches |
| Leader election | Controller watches broker deletion → writes new ISR to /brokers/topics/…/state |
| Consumer joins group | Creates ephemeral /consumers/{group}/ids/{consumer-id} |
| Consumer offset commit | Writes to /consumers/{group}/offsets/{topic}/{partition} |
sequenceDiagram
participant B1 as Broker 1 (fails)
participant ZK as ZooKeeper
participant Ctrl as Controller (Broker 0)
participant B2 as Broker 2
B1->>ZK: ephemeral session expires
ZK-->>Ctrl: watch fired: /brokers/ids/1 deleted
Ctrl->>ZK: read /brokers/topics/X/partitions/0/state
ZK-->>Ctrl: {leader: 1, ISR: [1, 2]}
Ctrl->>Ctrl: elect new leader from ISR → B2
Ctrl->>ZK: write /brokers/topics/X/partitions/0/state<br/>{leader: 2, ISR: [2]}
ZK-->>B2: watch fired: you are new leader
B2->>B2: truncate log to HW<br/>open partition for reads/writes
4. Producer Internals: Metadata, Partitioning, and Async Batching¶
Metadata Bootstrap and Leader Discovery¶
Before a producer can write, it must discover the lead replica for each target partition. This bootstraps via metadata.broker.list — only used for the initial metadata request.
sequenceDiagram
participant Prod as Producer
participant B0 as Any Broker (seed)
participant BL as Lead Broker
Prod->>B0: TopicMetadataRequest(topic)
B0-->>Prod: {partition: 0, leader: broker_id=2, replicas: [2,1,0], ISR: [2,1]}
Prod->>Prod: cache metadata locally
Prod->>BL: ProduceRequest(partition=0, messages)
BL-->>Prod: ProduceResponse(ack)
Partitioner Hash Mechanics¶
The default DefaultPartitioner hashes the key modulo partition count:
Custom partitioner example from the book (IP-based routing):
flowchart LR
msg["Message: clientIP=192.168.14.37"] --> extract["Extract last octet: 37"]
extract --> mod["37 % 5 partitions = 2"]
mod --> p2["Partition 2 on Lead Broker"]
style p2 fill:#2d6a4f,color:#fff
Why key-based partitioning matters internally: All messages with the same key land on the same partition → same ordered log → consumers see messages in causal order per key. This is the basis for event sourcing and stream joins.
Async Mode Internal Buffer¶
stateDiagram-v2
[*] --> Accumulating: producer.type=async
Accumulating --> Flushing: queue.time elapsed OR batch.size reached
Flushing --> Serializing: ProducerSendThread dequeues
Serializing --> Dispatching: EventHandler.serialize()
Dispatching --> Sent: ProduceRequest → lead broker
Sent --> Accumulating: continue buffering
Accumulating --> DATALOSS: Producer crash
note right of DATALOSS: In-memory data lost\nNo WAL for async mode
| Configuration | Effect |
|---|---|
queue.time (ms) |
Max buffer time before flush |
batch.size |
Max message count before flush |
request.required.acks=0 |
Fire-and-forget (no confirmation) |
request.required.acks=1 |
Ack from lead replica only |
request.required.acks=-1 |
Ack from all ISR replicas |
5. Replication Pipeline: ISR Mechanics Under Load¶
Kafka 0.8's replication protocol is synchronous by design for committed messages, but async for propagation.
ISR Replication State Machine¶
stateDiagram-v2
[*] --> InSync: Follower catches up to leader LEO
InSync --> Lagging: Follower falls behind by replica.lag.time.max.ms
Lagging --> OutOfISR: Leader removes follower from ISR
OutOfISR --> CatchingUp: Follower truncates log to HW, re-fetches
CatchingUp --> InSync: Follower fully caught up → leader adds back to ISR
InSync --> Dead: Broker crash
Dead --> CatchingUp: Broker restarts
Commit Protocol (High Watermark)¶
The High Watermark (HW) is the offset of the last message replicated to all ISR members. Consumers only see messages at or below HW — this is what guarantees consistency.
flowchart TD
subgraph Leader["Lead Broker — Partition 0"]
LEO_L["LEO = 1005\n(latest written offset)"]
HW_L["HW = 1000\n(committed to all ISR)"]
end
subgraph F1["Follower Broker 1"]
LEO_F1["LEO = 1004\n(slightly behind)"]
HW_F1["HW = 1000\n(same as leader)"]
end
subgraph F2["Follower Broker 2"]
LEO_F2["LEO = 1000\n(most behind)"]
HW_F2["HW = 1000"]
end
LEO_L -->|"ISR = [Lead, F1, F2]\nMin(LEO) = 1000\n→ advance HW to 1000"| HW_L
Consumer["Consumer\n(reads up to HW=1000 only)"]
HW_L --> Consumer
Leader failover with log truncation:
sequenceDiagram
participant L as Leader (fails at offset 1005)
participant F1 as Follower 1 (LEO=1004)
participant F2 as Follower 2 (LEO=1001, first in ISR)
participant ZK as ZooKeeper
L->>L: crash (LEO=1005, HW=1000)
F2->>ZK: register as candidate for new leader
ZK-->>F2: you are new leader (first registered)
F2->>F2: new HW = my LEO = 1001
F1->>F1: truncate log to HW=1001
F1->>F2: FetchRequest from offset 1001
F2-->>F1: messages 1001..1004
F1->>F1: now LEO=1004, catch up to leader
F2->>ZK: write new ISR = [F2, F1]
6. Consumer Architecture: Pull Model and Offset State Machine¶
High-Level API Internal Architecture¶
The ZookeeperConsumerConnector abstracts the entire consumer state machine:
flowchart TD
App["Application Thread"] -->|createMessageStreams| ZCC["ZookeeperConsumerConnector"]
ZCC --> KS1["KafkaStream[K,V] — Partition 0\n(BlockingQueue[FetchedDataChunk])"]
ZCC --> KS2["KafkaStream[K,V] — Partition 1"]
ZCC --> KS3["KafkaStream[K,V] — Partition 2"]
subgraph FetchLoop["Fetch Thread Pool"]
FT1["FetcherThread — Broker 0"]
FT2["FetcherThread — Broker 1"]
end
FT1 -->|"FetchRequest(offset=last_committed)"| B0["Broker 0"]
B0 -->|"FetchResponse(messages)"| KS1
FT2 -->|"FetchRequest"| B1["Broker 1"]
B1 -->|"FetchResponse"| KS2
subgraph ZK["ZooKeeper"]
Offsets["/consumers/group/offsets/topic/partition"]
end
ZCC -->|"auto.commit.interval.ms"| ZK
Offset Lifecycle State Machine¶
stateDiagram-v2
[*] --> NoOffset: Consumer group never consumed topic
NoOffset --> AutoReset: auto.offset.reset=largest/smallest
AutoReset --> Consuming: fetch from largest (tail) or smallest (head)
Consuming --> CommitPending: messages processed
CommitPending --> Committed: auto.commit.interval.ms triggers\n→ write to ZooKeeper
Committed --> Consuming: continue fetching from committed+1
Consuming --> Rebalancing: consumer joins/leaves group
Rebalancing --> Consuming: new partition assignment
Rebalancing --> DuplicateRead: crash before commit
note right of DuplicateRead: at-least-once delivery\nby design
Consumer Group Rebalancing Mechanics¶
When any consumer joins or leaves a group, ZooKeeper fires a watch → all consumers in the group rebalance:
sequenceDiagram
participant C1 as Consumer 1 (existing)
participant C2 as Consumer 2 (new)
participant ZK as ZooKeeper
participant B as Broker
C2->>ZK: create ephemeral /consumers/group/ids/C2
ZK-->>C1: watch fired: new member C2
ZK-->>C2: you are registered
C1->>C1: release all partition ownership
C2->>C2: calculate new partition assignment
C1->>C1: calculate new partition assignment
Note over C1,C2: Assignment algorithm: sort consumers + partitions,<br/>distribute round-robin
C1->>ZK: claim /consumers/group/owners/topic/partition-0 → C1
C1->>ZK: claim /consumers/group/owners/topic/partition-1 → C1
C2->>ZK: claim /consumers/group/owners/topic/partition-2 → C2
C1->>B: FetchRequest(partition=0, offset=last_committed)
C2->>B: FetchRequest(partition=2, offset=last_committed)
Rebalancing hazard: If a new consumer starts with an existing group.id, in-flight consumers release partitions mid-stream → some messages may be re-delivered to the new consumer. This is the "ambiguous behavior" the book warns about.
7. Log Compaction Internals¶
Log compaction is a background process that eliminates superseded key-value records, preserving only the most recent value per key.
flowchart LR
subgraph Before["Before Compaction — Partition Log"]
direction TB
m0["offset=0: key=A val=v1"]
m1["offset=1: key=B val=v1"]
m2["offset=2: key=A val=v2"]
m3["offset=3: key=C val=v1"]
m4["offset=4: key=B val=v2"]
m5["offset=5: key=A val=v3"]
end
subgraph After["After Compaction"]
direction TB
r1["offset=1: key=B val=v1 ❌ (superseded)"]
r3["offset=3: key=C val=v1 ✅"]
r4["offset=4: key=B val=v2 ✅"]
r5["offset=5: key=A val=v3 ✅"]
end
Before -->|"LogCleaner thread\n(background)"| After
style r1 fill:#c0392b,color:#fff
style r3 fill:#27ae60,color:#fff
style r4 fill:#27ae60,color:#fff
style r5 fill:#27ae60,color:#fff
Compaction properties: - Ordering of retained messages is always preserved (offsets are monotonically increasing) - Once a key's record is compacted away, its offset no longer exists — consumers observing a gap in offsets must skip ahead - The "head" of the log (recent messages) is never compacted; only the "tail" (old segments) is eligible
8. Message Compression Pipeline¶
Kafka 0.8's compression operates at the MessageSet level, not per-message — enabling superior compression ratios via batch entropy reduction.
flowchart TD
subgraph Producer["Producer Side"]
msgs["Messages: M1, M2, M3, M4"]
batch["Batch as MessageSet"]
compress["GZIP/Snappy compress(MessageSet)"]
envelope["Wrap in outer Message\nattributes byte = codec_id"]
end
subgraph Broker["Broker 0.8 — Lead Replica"]
recv["Receive compressed envelope"]
decomp["Decompress inner MessageSet"]
assign["Assign logical offsets to each inner message"]
recommp["Re-compress with offsets embedded"]
append["Append to segment log"]
end
subgraph Consumer["Consumer Side"]
fetch["Fetch compressed envelope"]
cdecomp["Decompress → extract messages with offsets"]
process["Process individual messages"]
end
msgs --> batch --> compress --> envelope
envelope --> recv --> decomp --> assign --> recommp --> append
append --> fetch --> cdecomp --> process
The 0.8 compression penalty: Unlike 0.7 (where compressed batches passed through the broker opaquely), 0.8 brokers must decompress → assign per-message logical offsets → re-compress. This causes measurable CPU load on the lead broker for high-throughput compressed topics.
Compression attribute byte layout (lowest 2 bits):
9. Broker Cluster Topology: Peer-to-Peer Without a Master¶
Unlike traditional databases with primary/secondary, Kafka uses a controller broker (elected via ZooKeeper) for administrative operations only — not for data paths.
flowchart TD
subgraph Cluster["Kafka Cluster (3 Brokers)"]
B0["Broker 0\n(Controller)\nLeader: partition-2"]
B1["Broker 1\nLeader: partition-0"]
B2["Broker 2\nLeader: partition-1"]
end
subgraph ZooKeeper["ZooKeeper Ensemble"]
ZKL["ZK Leader"]
ZKF1["ZK Follower 1"]
ZKF2["ZK Follower 2"]
end
subgraph Clients["Clients"]
P["Producer"]
C["Consumer"]
end
B0 <-->|"ISR sync\n(partition-0 follower)"| B1
B1 <-->|"ISR sync\n(partition-1 follower)"| B2
B2 <-->|"ISR sync\n(partition-2 follower)"| B0
B0 --> ZKL
B1 --> ZKL
B2 --> ZKL
P -->|"metadata req → any broker"| B0
P -->|"produce → lead broker"| B1
C -->|"fetch → lead broker"| B1
B0 -->|"controller: writes ISR\nleader election decisions"| ZKL
Why no master for data paths? Each producer connects directly to the lead broker for each partition — removing a central hotspot. The controller's role is purely metadata: writing ISR updates, triggering leader elections when brokers fail.
10. Multi-Broker Single-Node vs Multi-Node: What Changes Internally¶
block-beta
columns 2
block:single["Single-Node Multi-Broker"]:1
b0s["Broker 0\nport:9092\nlog:/tmp/kafka-0"]
b1s["Broker 1\nport:9093\nlog:/tmp/kafka-1"]
b2s["Broker 2\nport:9094\nlog:/tmp/kafka-2"]
zks["ZooKeeper\nlocalhost:2181"]
end
block:multi["Multi-Node Multi-Broker"]:1
b0m["Broker 0 — Node A\nport:9092"]
b1m["Broker 1 — Node B\nport:9092"]
b2m["Broker 2 — Node C\nport:9092"]
zkm["ZooKeeper Ensemble\n3 nodes"]
end
Internal difference: In single-node multi-broker, all brokers share the same NIC → replication traffic competes with producer/consumer traffic. In multi-node, replication traffic crosses the network but benefits from node-level failure isolation.
11. Kafka 0.8 vs Modern Kafka: What This Era Reveals¶
| Aspect | Kafka 0.8 (this book) | Modern Kafka (2.x+) |
|---|---|---|
| Offset storage | ZooKeeper znodes | Internal __consumer_offsets topic |
| Consumer group coordination | ZooKeeper watches | Group Coordinator broker |
| Leader election | ZooKeeper + Controller | KRaft (Raft-based, no ZK) |
| Producer API | kafka.javaapi.producer.Producer |
KafkaProducer (new API) |
| Compression re-assignment | Broker decompresses+recompresses | Broker passes through (magic byte v2) |
| Exactly-once | Not supported | Transactional API + idempotent producer |
Understanding the 0.8 era illuminates why the modern APIs were redesigned — every pain point (ZooKeeper offset contention, rebalancing storms, compression CPU overhead) has a specific architectural fix in the later versions.
Summary: Internal Data Flow Map¶
flowchart LR
Producer -->|"1. TopicMetadataRequest\n(seed broker)"| AnyBroker
AnyBroker -->|"2. leader=B1, ISR=[B1,B2]"| Producer
Producer -->|"3. ProduceRequest(partition=0)\ncompressed MessageSet"| LeadBroker["Lead Broker B1"]
LeadBroker -->|"4. decompress → assign offsets\n→ recompress → append to log"| SegLog["Segment Log\n(page cache)"]
LeadBroker -->|"5. FetchRequest pulled by followers"| FollowerB2["Follower B2"]
FollowerB2 -->|"6. FetchResponse → append\n→ ACK to leader"| LeadBroker
LeadBroker -->|"7. advance HW after ISR ack\n→ ProduceResponse to producer"| Producer
Consumer -->|"8. read /consumers/group/offsets in ZK"| ZK["ZooKeeper"]
ZK -->|"9. last committed offset"| Consumer
Consumer -->|"10. FetchRequest(offset=N)"| LeadBroker
LeadBroker -->|"11. messages [N..HW]"| Consumer
Consumer -->|"12. process → commit offset+N to ZK"| ZK
The entire system — from log append to consumer pull — is designed around sequential I/O + OS page cache + pull-based consumers + stateless brokers. Every design decision traces back to these four principles.