콘텐츠로 이동

Apache Kafka — Under the Hood: Internal Architecture & Data Flow

Sources: Kafka: The Definitive Guide (2nd Edition) — Shapira, Palino, Sivaram, Narkhede (O'Reilly 2022); Kafka in Action — Dylan Scott (Manning)


1. The Commit Log as Foundational Data Structure

Kafka's central abstraction is not a queue — it is a durable, ordered, append-only log. Every partition is one log. Understanding the log's physical layout is the prerequisite to understanding everything else.

block-beta
  columns 3
  A["Partition 0\n.log segment files"] B["Partition 1\n.log segment files"] C["Partition 2\n.log segment files"]
  D["00000000000000000000.log\n00000000000000000000.index\n00000000000000000000.timeindex"] E["00000000000000000412.log\n00000000000000000412.index\n..."] F["00000000000000000891.log\n..."]

Each segment triple: - .log — raw message bytes, appended sequentially - .index — sparse offset → byte-position mapping (binary search entry point) - .timeindex — timestamp → offset mapping (time-based seek)

The active segment is the only file receiving writes. Once a segment reaches log.segment.bytes (default 1 GB) or log.roll.hours (default 168 h), Kafka rolls a new active segment. Old segments are candidates for deletion or compaction based on retention policy.

flowchart LR
  Producer -->|"ProduceRequest\n(batch of records)"| NetworkThread
  NetworkThread -->|"enqueue to\nrequest queue"| RequestHandlerPool
  RequestHandlerPool -->|"append to\nReplicaManager"| LogManager
  LogManager -->|"FileChannel.write()\n(OS page cache)"| DiskSegment[".log file\n(mmap / page cache)"]
  DiskSegment -.->|"fsync (configurable)"| Disk[(Physical Disk)]

Key insight: Kafka does not call fsync per-message by default (log.flush.interval.messages=Long.MAX_VALUE). Durability comes from replication, not from forced disk flushes. The OS page cache absorbs writes; the broker later flushes via background OS writeback.


2. Partition Anatomy and Offset Mechanics

stateDiagram-v2
  direction LR
  [*] --> Offset_0 : first produce
  Offset_0 --> Offset_1
  Offset_1 --> Offset_N : append-only
  Offset_N --> ActiveSegment : current write head
  ActiveSegment --> Rolled : segment full / roll time
  Rolled --> Retention : age / size check
  Retention --> Deleted : time/size policy
  Retention --> Compacted : cleanup.policy=compact

The .index file stores sparse entries: (relative_offset, file_position) pairs at configurable granularity (log.index.interval.bytes). Consumer seek to offset N:

  1. Binary search .index for largest entry ≤ N → byte position P
  2. Seek .log to P, scan forward until offset N found

This gives O(log n) seek on sparse index + bounded linear scan.


3. Producer Internals: RecordAccumulator and Batching

A Kafka producer does not send records one-at-a-time. The entire pipeline is designed around batching and async delivery.

sequenceDiagram
  participant App
  participant KafkaProducer
  participant Serializer
  participant Partitioner
  participant RecordAccumulator
  participant Sender
  participant NetworkClient
  participant Broker

  App->>KafkaProducer: send(ProducerRecord)
  KafkaProducer->>Serializer: serialize(key, value) → byte[]
  KafkaProducer->>Partitioner: partition(record, cluster_metadata) → partitionId
  KafkaProducer->>RecordAccumulator: append(tp, serializedRecord)
  Note over RecordAccumulator: Batches keyed by TopicPartition<br/>Each batch: ProducerBatch<br/>Drains when: batch.size full<br/>OR linger.ms elapsed
  RecordAccumulator-->>Sender: drain() → Map<Node, List<ProducerBatch>>
  Sender->>NetworkClient: send(ProduceRequest) per broker node
  NetworkClient->>Broker: TCP → ProduceRequest (batches)
  Broker-->>NetworkClient: ProduceResponse (offsets assigned)
  NetworkClient-->>KafkaProducer: fire callbacks
  KafkaProducer-->>App: RecordMetadata (topic, partition, offset)

RecordAccumulator is a CopyOnWriteMap<TopicPartition, Deque<ProducerBatch>>. The Sender thread drains it continuously, grouping all batches destined for the same broker into a single ProduceRequest. This minimizes TCP round-trips: one request per broker per drain cycle, regardless of how many partitions live on that broker.

flowchart TD
  subgraph RecordAccumulator
    TP0[TopicPartition 0] --> B0["ProducerBatch (filling)"]
    TP1[TopicPartition 1] --> B1["ProducerBatch (full→drained)"]
    TP2[TopicPartition 2] --> B2["ProducerBatch (linger.ms expired)"]
  end
  B1 --> GroupByNode["Group batches\nby destination Node"]
  B2 --> GroupByNode
  GroupByNode -->|"Node 1: [TP0-batch, TP2-batch]"| ProduceRequest1
  GroupByNode -->|"Node 2: [TP1-batch]"| ProduceRequest2

Partitioning Algorithm

Default (no key): sticky partitioner (Kafka 2.4+). The producer selects a random partition and sticks to it until the batch fills or linger.ms fires, then rotates. This maximizes batch fill efficiency.

With a key: murmur2(key) % numPartitions. Same key → same partition, always. This is a deterministic mapping baked into DefaultPartitioner — it survives broker restarts because partition count doesn't change (without explicit repartitioning).


4. Idempotent Producer and Exactly-Once Semantics

flowchart LR
  subgraph Producer
    PID["Producer ID (PID)\nassigned by broker"]
    SEQ["Per-partition sequence\ncounter: 0,1,2,3..."]
  end
  subgraph Broker
    Log["Partition log"]
    SeqTracker["Last seen sequence\nper (PID, partition)"]
  end

  Producer -->|"ProduceRequest:\nPID=42, seq=7, batch"| Broker
  Broker -->|"seq == lastSeen+1 ✓"| Log
  Broker -->|"seq == lastSeen (dup) → discard"| Log
  Broker -->|"seq > lastSeen+1 → OutOfOrderException"| Producer

The broker maintains a sequence number window per (PID, partition). If a producer retries due to network failure, the broker deduplicates by checking if the incoming sequence is already committed. This eliminates duplicates within a single producer session with zero application-level code.

For cross-partition / cross-session exactly-once (transactional API):

sequenceDiagram
  participant Producer
  participant TransactionCoordinator
  participant Broker_P0
  participant Broker_P1

  Producer->>TransactionCoordinator: initTransactions() → epoch assigned
  Producer->>TransactionCoordinator: beginTransaction()
  Producer->>Broker_P0: send(records, transactional_id)
  Producer->>Broker_P1: send(records, transactional_id)
  Producer->>TransactionCoordinator: commitTransaction()
  TransactionCoordinator->>Broker_P0: WriteTxnMarker (COMMIT)
  TransactionCoordinator->>Broker_P1: WriteTxnMarker (COMMIT)
  Note over Broker_P0,Broker_P1: Consumers with isolation.level=read_committed<br/>only see records after COMMIT marker

The transaction coordinator uses a special internal topic __transaction_state (50 partitions by default) as its durable log. The two-phase commit protocol (prepare → commit markers) ensures atomicity across partitions.


5. Broker Request Pipeline: The I/O Thread Model

flowchart TD
  subgraph Network Layer
    AcceptThread["Acceptor Thread\n(1 per listener)"]
    Proc0["Processor Thread 0"]
    Proc1["Processor Thread 1"]
    ProcN["Processor Thread N\n(num.network.threads)"]
  end
  subgraph Request Handling
    RQ["Request Queue\n(blocking queue)"]
    RH0["Request Handler 0"]
    RH1["Request Handler 1"]
    RHM["Request Handler M\n(num.io.threads)"]
    RespQ["Response Queue"]
  end
  subgraph Storage
    RM["ReplicaManager"]
    LM["LogManager"]
    Disk[(Segment Files)]
  end

  AcceptThread --> Proc0
  AcceptThread --> Proc1
  AcceptThread --> ProcN
  Proc0 -->|"parsed Request"| RQ
  Proc1 --> RQ
  ProcN --> RQ
  RQ --> RH0
  RQ --> RH1
  RQ --> RHM
  RH0 -->|"ProduceRequest"| RM
  RM --> LM
  LM --> Disk
  RH0 -->|"FetchRequest"| RM
  RM -->|"bytes"| RespQ
  RespQ --> Proc0
  • Acceptor: one thread per listener (PLAINTEXT, SSL), accepts TCP connections, hands them to processor threads via round-robin
  • Processor threads (num.network.threads, default 3): reads from socket → deserializes → enqueues to request queue; writes responses back to sockets
  • Request handler pool (num.io.threads, default 8): dequeues requests, calls ReplicaManager, returns response object to processor's response queue

This design separates network I/O (non-blocking) from disk I/O (potentially blocking on page cache misses), preventing slow disk operations from stalling the network layer.


6. Replication: ISR, Leader Election, HWM

flowchart LR
  Producer -->|"ProduceRequest\nacks=-1"| LeaderBroker

  subgraph LeaderBroker["Leader (Broker 1)"]
    LeaderLog["Partition Log\nLEO=100"]
    HWM["HWM=95\n(High Watermark)"]
  end

  subgraph Replica2["Follower (Broker 2)"]
    FollowerLog2["Partition Log\nLEO=98"]
  end

  subgraph Replica3["Follower (Broker 3)"]
    FollowerLog3["Partition Log\nLEO=100"]
  end

  LeaderLog -->|"FetchRequest response\n(batches)"| FollowerLog2
  LeaderLog -->|"FetchRequest response"| FollowerLog3
  Replica2 -->|"FetchRequest\noffset=98"| LeaderBroker
  Replica3 -->|"FetchRequest\noffset=100"| LeaderBroker
  HWM -.->|"Consumer can read\nonly up to HWM"| Consumer

ISR (In-Sync Replicas): the set of replicas whose LEO (Log End Offset) is within replica.lag.time.max.ms (default 30 s) of the leader's LEO. The HWM advances to min(LEO across all ISR members).

When acks=-1 (all), the producer waits until all ISR replicas have confirmed the write. This is what makes Kafka durable without fsync.

ISR shrink: if a follower falls behind (network partition, GC pause), leader removes it from ISR. HWM can now advance faster (smaller quorum). The controller writes ISR changes to __cluster_metadata (KRaft) or ZooKeeper.

Unclean leader election (unclean.leader.election.enable): if all ISR replicas die, Kafka can elect an out-of-sync follower as leader. This recovers availability at the cost of data loss.


7. Consumer Group Internals: Rebalance Protocol

sequenceDiagram
  participant C1 as Consumer 1
  participant C2 as Consumer 2
  participant GC as GroupCoordinator (Broker)

  C1->>GC: FindCoordinator(group.id)
  GC-->>C1: CoordinatorId=Broker3
  C1->>GC: JoinGroup(group.id, memberId="", protocols=[RangeAssignor])
  C2->>GC: JoinGroup(group.id, memberId="", protocols=[RangeAssignor])
  Note over GC: Waits for rebalance timeout<br/>Selects leader = first joiner (C1)
  GC-->>C1: JoinGroupResponse(leader=true, memberList=[C1,C2], generationId=1)
  GC-->>C2: JoinGroupResponse(leader=false, generationId=1)
  C1->>C1: Compute partition assignment (RangeAssignor)
  C1->>GC: SyncGroup(groupId, generationId=1, assignments={C1→[P0,P1], C2→[P2,P3]})
  C2->>GC: SyncGroup(groupId, generationId=1, assignments={})
  GC-->>C1: SyncGroupResponse(assignment=[P0,P1])
  GC-->>C2: SyncGroupResponse(assignment=[P2,P3])
  loop Heartbeat (heartbeat.interval.ms)
    C1->>GC: Heartbeat(generationId=1)
    GC-->>C1: OK
  end

The group coordinator is a broker determined by: hash(group.id) % __consumer_offsets.numPartitions. The coordinator stores committed offsets in __consumer_offsets.

Cooperative rebalancing (Kafka 2.4+ CooperativeStickyAssignor): instead of revoking all partitions and reassigning, only the partitions that need to move are revoked. This eliminates the "stop-the-world" pause of eager rebalancing.

stateDiagram-v2
  direction LR
  [*] --> Empty : no members
  Empty --> PreparingRebalance : member joins/leaves
  PreparingRebalance --> CompletingRebalance : all JoinGroup received
  CompletingRebalance --> Stable : all SyncGroup received
  Stable --> PreparingRebalance : member join/leave/timeout

8. Offset Commit Internals

flowchart TD
  Consumer -->|"commitSync()\nor commitAsync()"| GroupCoordinator
  GroupCoordinator -->|"append to\n__consumer_offsets partition"| OffsetLog["__consumer_offsets\n(compacted topic)"]
  OffsetLog -->|"key=(group,topic,partition)\nvalue=(offset,metadata,timestamp)"| CompactionCleaner
  CompactionCleaner -->|"retain latest offset\nper key"| RetainedOffsets

  subgraph OffsetFetch
    Consumer2["New consumer\n(restart or rebalance)"] -->|"OffsetFetch request"| GroupCoordinator
    GroupCoordinator -->|"reads from\nin-memory cache\n(loaded from __consumer_offsets)"| Consumer2
  end

On broker startup, the coordinator replays the entire __consumer_offsets partition to rebuild its in-memory offset map. Compaction ensures this replay stays bounded — only the latest committed offset per (group, topic, partition) key is retained.


9. Log Compaction Mechanics

Compaction preserves the latest value for each key indefinitely, enabling Kafka to act as a key-value changelog store.

flowchart LR
  subgraph BeforeCompaction["Before Compaction"]
    direction TB
    R0["offset=0 key=A val=1"]
    R1["offset=1 key=B val=10"]
    R2["offset=2 key=A val=2"]
    R3["offset=3 key=C val=5"]
    R4["offset=4 key=B val=20"]
    R5["offset=5 key=A val=3"]
  end
  subgraph AfterCompaction["After Compaction"]
    direction TB
    R3b["offset=3 key=C val=5"]
    R4b["offset=4 key=B val=20"]
    R5b["offset=5 key=A val=3"]
  end
  BeforeCompaction -->|"cleaner thread\n(dirty ratio threshold)"| AfterCompaction

The log cleaner (background thread pool) operates on the "dirty" portion of the log — segments that have not been cleaned since the last pass. It: 1. Builds an in-memory OffsetMap (key hash → max offset seen so far) by scanning dirty segments 2. Re-copies records to new segments, skipping any record whose key has a higher-offset entry in the map 3. Replaces old segments with cleaned segments atomically

Tombstones: a record with key=K and null value is a deletion marker. The cleaner retains tombstones for delete.retention.ms (default 24 h) to allow consumers to observe the deletion, then purges them.


10. ZooKeeper → KRaft Migration

Pre-3.0 Kafka used ZooKeeper to store: - Broker registration (/brokers/ids/) - Topic configurations (/config/topics/) - ISR state (/brokers/topics/<t>/partitions/<p>/state) - Controller election (/controller)

This created a metadata bottleneck: every partition state change required a ZooKeeper write. At 200K partitions, this became a multi-minute recovery bottleneck.

flowchart TD
  subgraph KRaft["KRaft Mode (Kafka 3.x)"]
    direction LR
    ActiveCtrl["Active Controller\n(elected via Raft)"]
    StandbyCtrl1["Standby Controller 1"]
    StandbyCtrl2["Standby Controller 2"]
    MetaLog["@metadata topic\n(replicated Raft log)"]
    Broker1["Broker 1"]
    Broker2["Broker 2"]
  end

  ActiveCtrl -->|"append metadata events"| MetaLog
  MetaLog -->|"replicate"| StandbyCtrl1
  MetaLog -->|"replicate"| StandbyCtrl2
  ActiveCtrl -->|"MetadataUpdate RPC"| Broker1
  ActiveCtrl -->|"MetadataUpdate RPC"| Broker2

KRaft stores all cluster metadata in a Kafka topic (@metadata) replicated across 3–5 controller nodes using the Raft consensus protocol. Brokers maintain a local metadata cache and receive incremental updates from the active controller. Controller failover is now a pure Raft leader election — sub-second, no ZooKeeper coordination needed.


11. AdminClient Internal Flow

sequenceDiagram
  participant App
  participant AdminClient
  participant NetworkClient
  participant Controller as Active Controller Broker

  App->>AdminClient: createTopics([NewTopic])
  AdminClient->>AdminClient: wrap in KafkaFuture<CreateTopicsResult>
  AdminClient->>NetworkClient: enqueue CreateTopicsRequest
  NetworkClient->>Controller: TCP → CreateTopicsRequest
  Controller->>Controller: write to @metadata log (KRaft)<br/>or ZooKeeper (legacy)
  Controller-->>NetworkClient: CreateTopicsResponse(errors=[])
  NetworkClient-->>AdminClient: complete KafkaFuture
  AdminClient-->>App: CreateTopicsResult.all().get() → success

All mutating operations (create, delete, alter) route to the active controller. Read operations (listTopics, describeCluster) route to the least loaded broker — the client's internal MetadataCache tracks broker load via response latencies.

The KafkaFuture wrapper is a custom future (not CompletableFuture) that ensures thread-safe completion callbacks. whenComplete() fires on the client's I/O thread — never block inside it.


12. Producer → Broker → Consumer: Full Data Path

flowchart TD
  subgraph ProducerSide
    App["Application\nrecord = ProducerRecord(topic, key, value)"]
    Ser["Serializer\nbyte[] keyBytes, valueBytes"]
    Part["Partitioner\nmurmur2(key) % N → partitionId"]
    Accum["RecordAccumulator\nbatch[tp] += record"]
    Sender["Sender Thread\ndrain → ProduceRequest"]
  end

  subgraph BrokerSide
    NetThread["Network Thread\nparse ProduceRequest"]
    IOHandler["I/O Handler Thread\nReplicaManager.appendRecords()"]
    LeaderLog["Leader Log\nappend → assign offset"]
    PurgatoryDelay["Purgatory (DelayedProduce)\nwait for follower ACKs"]
    FollowerFetch["Follower FetchRequest\nreplicate → update LEO"]
    HWMAdvance["HWM advance\nmin(ISR LEOs)"]
  end

  subgraph ConsumerSide
    FetchReq["FetchRequest\npartition + fetchOffset"]
    FetchResp["FetchResponse\nrecords up to HWM"]
    ConsumerApp["Consumer App\npoll() → ConsumerRecords"]
  end

  App --> Ser --> Part --> Accum --> Sender
  Sender -->|"TCP"| NetThread --> IOHandler
  IOHandler --> LeaderLog --> PurgatoryDelay
  FollowerFetch -->|"Follower catches up"| PurgatoryDelay
  PurgatoryDelay --> HWMAdvance
  HWMAdvance -->|"ProduceResponse"| Sender
  ConsumerSide -->|"TCP"| BrokerSide
  HWMAdvance --> FetchResp --> ConsumerApp

The purgatory (DelayedProducePurgatory) is an in-memory timer wheel containing pending produce requests awaiting replication. When a follower's FetchRequest arrives and advances the ISR's minimum LEO past the threshold, the purgatory watcher fires the produce callback.


13. MirrorMaker 2 / Multi-Cluster Replication

flowchart LR
  subgraph ClusterA["Cluster A (Source)"]
    TopicA["orders topic\nA.orders (remote)"]
  end
  subgraph MM2["MirrorMaker 2\n(Kafka Connect worker)"]
    SourceConn["MirrorSourceConnector\n(consumer from A)"]
    ReplicationTask["ReplicationTask\n(producer to B)"]
    CheckpointConn["MirrorCheckpointConnector\n(offset translation)"]
  end
  subgraph ClusterB["Cluster B (Target)"]
    TopicB["A.orders topic\n(remote topic with prefix)"]
    OffsetTopic["mm2-offsets.A.internal"]
  end

  TopicA --> SourceConn --> ReplicationTask --> TopicB
  CheckpointConn -->|"translate\nA-offsets → B-offsets"| OffsetTopic

MirrorMaker 2 prefixes remote topics with the source cluster alias (A.orders) to avoid naming collisions. The MirrorCheckpointConnector maintains offset translations so that failover consumers can resume at the correct position on the target cluster.


14. Storage Internals: Segment File I/O and Zero-Copy

Kafka's throughput advantage comes from two OS-level mechanisms:

Sequential disk writes: The OS can batch sequential writes into large disk I/O operations. Kafka's append-only design means every write to a partition is sequential — no random seeks.

Zero-copy via sendfile(): Consumer fetch requests are served via FileChannel.transferTo() (Java) → sendfile() syscall. Data moves from:

flowchart LR
  DiskFile["Segment .log file\n(kernel page cache)"] -->|"traditional: 4 copies\ncopy1: disk→kernel\ncopy2: kernel→user\ncopy3: user→socket buf\ncopy4: socket buf→NIC"| Socket
  DiskFile -->|"zero-copy sendfile:\n2 copies only\ndisk→kernel→NIC\n(no user-space copy)"| SocketZC["Socket (zero-copy)"]

This eliminates two memory copies per fetch, reducing CPU usage by 60–70% for high-throughput consumers and allowing Kafka to serve consumers at near-disk-bandwidth speeds.


15. Schema Registry Integration

sequenceDiagram
  participant Producer
  participant SchemaRegistry as Schema Registry
  participant KafkaTopic

  Producer->>SchemaRegistry: POST /subjects/orders-value (Avro schema)
  SchemaRegistry-->>Producer: schemaId=42
  Producer->>Producer: serialize: [magic=0x0][schemaId=42 (4 bytes)][avro bytes]
  Producer->>KafkaTopic: ProduceRequest (serialized bytes)

  participant Consumer
  KafkaTopic->>Consumer: FetchResponse (serialized bytes)
  Consumer->>Consumer: read magic byte → schemaId=42
  Consumer->>SchemaRegistry: GET /schemas/ids/42
  SchemaRegistry-->>Consumer: Avro schema JSON
  Consumer->>Consumer: deserialize avro bytes → object

Every message contains a 5-byte header: [0x00 magic][4-byte schema ID]. The Schema Registry maps IDs to Avro/JSON/Protobuf schemas. Schema evolution is governed by compatibility rules (BACKWARD, FORWARD, FULL) enforced at registration time — consumers reading old schemas can decode new messages safely.