Effective Kafka — Under the Hood: Producer Internals, Reliability, and Performance Mechanics¶
Source: Effective Kafka — Emil Koutanov (Leanpub, 2021). Focus: not API usage but the mechanical internals of how Kafka batches records through the accumulator, how compression interacts with batching end-to-end, how consumer offset semantics prevent data loss, and how reliability guarantees propagate through the entire pipeline.
1. Producer Internals: RecordAccumulator and I/O Thread¶
The Kafka producer is asynchronous by design — send() never directly writes to the network. Instead, it stages records through an internal accumulator.
flowchart TD
App["Application: producer.send(record)"] -->|serialize + partition assign| Accumulator["RecordAccumulator\n(in-memory per-partition ProducerBatch queue)"]
Accumulator -->|batch full OR linger.ms elapsed| Sender["Sender Thread (background I/O)"]
Sender -->|drain batches by node| NetworkClient["NetworkClient\n(TCP connection per broker)"]
NetworkClient -->|ProduceRequest (batches)| Broker["Kafka Broker Leader"]
Broker -->|ACK (acks=all: wait ISR)| NetworkClient
NetworkClient -->|callback invoked| App
Accumulator Batch Lifecycle¶
stateDiagram-v2
[*] --> Empty: partition drain queue
Empty --> Accumulating: first record arrives → new ProducerBatch
Accumulating --> Accumulating: more records added (same partition)
Accumulating --> Drained: batch.size reached OR linger.ms expired
Drained --> InFlight: Sender picks up batch
InFlight --> Completed: ACK received → callback fired
InFlight --> RetryQueue: error (retryable) → back to accumulator
Completed --> [*]
Key interaction between batch.size and linger.ms:
- linger.ms=0 (default): batch dispatched as soon as Sender thread runs — minimal latency
- linger.ms=5: wait up to 5ms, accumulating more records → larger batches → better compression
- batch.size override: even if linger.ms hasn't elapsed, dispatch once batch fills up
2. End-to-End Batching and Compression¶
sequenceDiagram
participant Producer
participant PBatch as ProducerBatch (accumulator)
participant BrokerLog as Broker Log Segment
participant Consumer
Producer->>PBatch: append records (uncompressed in memory)
Note over PBatch: batch.size reached
PBatch->>PBatch: compress entire batch (snappy/lz4/zstd/gzip)
PBatch->>BrokerLog: ProduceRequest with compressed RecordBatch
BrokerLog->>BrokerLog: store compressed bytes as-is (no decompression)
Consumer->>BrokerLog: FetchRequest
BrokerLog->>Consumer: compressed RecordBatch bytes
Consumer->>Consumer: decompress batch → individual records
The broker stores and transfers compressed batches without recompressing — this is the "zero-copy end-to-end" property. Compression is client-side CPU, but saves network bandwidth AND broker disk space simultaneously.
Compression Ratio vs. Batch Size¶
flowchart LR
SmallBatch["Small batch (2 records)\nJSON: ~300 bytes\ncompressed: ~280 bytes\nratio: 1.07x"] -->|increase batch size| LargeBatch["Large batch (1000 records)\nJSON: ~150KB\ncompressed: ~21KB\nratio: 7x (typical JSON)"]
LargeBatch -->|diminishing returns| VeryLarge["Very large batch\nratio plateaus near entropy limit"]
Compression codec tradeoffs:
- gzip: highest ratio, highest CPU — good for archival
- lz4: balanced ratio + speed — recommended for production
- zstd: best ratio/speed tradeoff (Kafka 2.1+) — preferred for high-throughput
3. Consumer Offset Mechanics and Delivery Guarantees¶
flowchart TD
subgraph "Consumer Poll Loop"
Poll["consumer.poll(timeout)"] -->|fetch records| Process["Application processes records"]
Process -->|commitSync() or commitAsync()| OffsetCommit["__consumer_offsets topic\n{group, topic, partition} → offset+1"]
end
subgraph "At-Least-Once vs At-Most-Once"
ALO["At-Least-Once:\nprocess THEN commit\nduplicates on crash"] --> CommitAfter
AMO["At-Most-Once:\ncommit THEN process\nloss on crash"] --> CommitBefore
end
subgraph "Exactly-Once"
EOE["Process + produce to output topic + commit offset\nin ONE atomic Kafka transaction"] --> TransactionalAPI
end
Offset Commit Internals¶
sequenceDiagram
participant Consumer
participant GroupCoordinator as Group Coordinator (broker)
participant OffsetTopic as __consumer_offsets
Consumer->>GroupCoordinator: OffsetCommitRequest\n{groupId, topic, partition, offset=101, metadata}
GroupCoordinator->>OffsetTopic: append record (key=group+topic+partition, value=offset)
OffsetTopic-->>GroupCoordinator: ACK
GroupCoordinator-->>Consumer: OffsetCommitResponse (OK)
Note over Consumer: On restart
Consumer->>GroupCoordinator: OffsetFetchRequest {groupId, topic, partition}
GroupCoordinator-->>Consumer: offset=101 → resume from 101
The __consumer_offsets topic is itself a compacted Kafka topic — for each (group, topic, partition) key, only the latest committed offset is retained. Offset storage is thus just a compacted key-value store built on Kafka's own log.
4. Consumer Group Rebalance Protocol¶
sequenceDiagram
participant C1 as Consumer 1
participant C2 as Consumer 2 (new)
participant GC as Group Coordinator
C1->>GC: Heartbeat (memberId=m1)
C2->>GC: JoinGroupRequest (groupId, protocols=[range,roundrobin])
GC->>C1: REBALANCE_IN_PROGRESS (via next heartbeat)
C1->>GC: JoinGroupRequest (memberId=m1, rejoin)
C2->>GC: JoinGroupRequest (memberId=m2)
GC->>C1: JoinGroupResponse (leader=m1, members=[m1,m2])
GC->>C2: JoinGroupResponse (follower)
C1->>C1: run partition assignment algorithm
C1->>GC: SyncGroupRequest (assignment: {m1:[p0,p1], m2:[p2,p3]})
C2->>GC: SyncGroupRequest (no assignment - follower)
GC->>C1: SyncGroupResponse (partitions=[p0,p1])
GC->>C2: SyncGroupResponse (partitions=[p2,p3])
Stop-the-world rebalance: all consumers in the group pause consumption during rebalance. Cooperative/Incremental rebalance (Kafka 2.4+) moves only the reassigned partitions, minimizing pause.
5. Serialization and Schema Management¶
flowchart TD
Record["Java POJO\nCustomerPayload"] -->|Serializer.serialize()| Bytes["byte[] payload"]
subgraph "With Schema Registry (Avro)"
POJO2["Java POJO"] -->|AvroSerializer| Magic["Magic byte: 0x00\n+ Schema ID (4 bytes)\n+ Avro-encoded payload"]
Magic -->|wire format| Kafka["Kafka Record value"]
Kafka -->|AvroDeserializer| LookupSR["Fetch schema from\nSchema Registry by ID\n(cached after first fetch)"]
LookupSR -->|deserialize| POJO3["Java POJO"]
end
Schema evolution with Avro:
- BACKWARD compatible: new schema can read old data (add fields with defaults)
- FORWARD compatible: old schema can read new data (remove fields that have defaults)
- FULL compatible: both directions — safest for production
Schema Registry stores (subject, version) → schema and enforces compatibility rules on registration.
6. Partitioning Strategy: Key Hashing Mechanics¶
flowchart TD
Record["ProducerRecord(topic, key, value)"]
Record -->|key != null| Hash["murmur2(key.serialize()) % numPartitions"]
Record -->|key == null, default partitioner| Sticky["StickyPartitioner\nbatch to same partition until batch full\nthen rotate"]
Record -->|custom partitioner| Custom["Custom Partitioner.partition()"]
Hash --> P["Target Partition"]
Sticky --> P
Custom --> P
Sticky partitioner (default since Kafka 2.4): instead of round-robin per-record (creating many small batches across all partitions), assigns all null-key records to one partition until the batch fills, then switches. Result: larger batches, better compression, fewer Produce requests.
Partition Count and Parallelism¶
flowchart LR
Partitions["Topic: 12 partitions"] -->|consumer group 4 consumers| Assignment["3 partitions per consumer\n(max parallelism = numPartitions)"]
subgraph "Scaling rule"
Rule1["numConsumers <= numPartitions\n(extra consumers idle)"]
Rule2["numPartitions determines max throughput\n(each partition = one ordered stream)"]
end
7. Broker Log Architecture: Segments and Indexes¶
block-beta
columns 1
block:partition_dir:1
columns 2
seg0_log["00000000000000000000.log\n(segment data: records)"]
seg0_idx["00000000000000000000.index\n(sparse offset→physical pos)"]
seg0_time["00000000000000000000.timeindex\n(timestamp→offset)"]
seg1_log["00000000000000001024.log\n(next segment)"]
seg1_idx["00000000000000001024.index"]
active_log["00000000000000002048.log (active)"]
end
Segment rollover: when active segment reaches log.segment.bytes (default 1GB) or log.roll.ms (default 7 days), a new segment is created. Old segments become eligible for retention/compaction.
Sparse index: .index file does NOT index every offset — it indexes every log.index.interval.bytes of data. Finding offset O:
1. Binary search .index for largest indexed offset ≤ O → physical position P
2. Sequential scan .log from P until offset O found
8. Replication: ISR Mechanics and acks Configuration¶
sequenceDiagram
participant Producer
participant Leader as Partition Leader (broker 1)
participant F1 as Follower 1 (broker 2)
participant F2 as Follower 2 (broker 3)
Producer->>Leader: ProduceRequest (acks=all, batch)
Leader->>Leader: append to local log (HW not yet advanced)
F1->>Leader: FetchRequest (fetchOffset=N)
Leader-->>F1: records [N..M]
F1->>F1: append to local log
F2->>Leader: FetchRequest (fetchOffset=N)
Leader-->>F2: records [N..M]
F2->>F2: append to local log
Note over Leader: all ISR members confirmed up to M
Leader->>Leader: advance High Watermark to M
Leader-->>Producer: ProduceResponse (offset=M)
High Watermark (HW): the highest offset that ALL ISR replicas have confirmed. Consumers can only read up to HW — no "dirty reads" of uncommitted data.
ISR shrinkage: if a follower lags beyond replica.lag.time.max.ms (default 30s), it's removed from ISR. If ISR shrinks to just the leader and min.insync.replicas=2, the leader rejects acks=all produces — preventing data loss over availability.
9. Transactions: Atomic Multi-Partition Produce¶
sequenceDiagram
participant App
participant Producer as Transactional Producer
participant TC as Transaction Coordinator
participant TopicA as Topic A (partitions)
participant TopicB as Topic B (partitions)
participant OffTopic as __consumer_offsets
App->>Producer: beginTransaction()
Producer->>TC: AddPartitionsToTxn (topicA:p0, topicB:p1)
Producer->>TopicA: ProduceRequest (PID, epoch, seq)
Producer->>TopicB: ProduceRequest (PID, epoch, seq)
Producer->>OffTopic: TxnOffsetCommit (group offsets within txn)
App->>Producer: commitTransaction()
Producer->>TC: EndTxn (commit=true)
TC->>TopicA: write COMMIT marker
TC->>TopicB: write COMMIT marker
TC->>OffTopic: write COMMIT marker
Note over App: All or nothing — consumers see all or none
Transaction log (__transaction_state): the Transaction Coordinator persists txn state durably. On broker failure, new coordinator recovers from this log and completes or aborts in-flight transactions.
10. Consumer Configuration: Fetch Behavior Internals¶
flowchart TD
Consumer["consumer.poll()"] -->|FetchRequest| Broker
Broker -->|fetch.min.bytes=1| WaitData["Wait until ≥ 1 byte available (default: return immediately)"]
Broker -->|fetch.max.wait.ms=500| MaxWait["Or return after 500ms even if empty"]
Broker -->|max.partition.fetch.bytes=1MB| PerPartition["Max 1MB per partition per fetch"]
Broker -->|fetch.max.bytes=50MB| TotalMax["Max 50MB total per fetch response"]
WaitData --> Response["FetchResponse → consumer deserializes"]
Tuning for throughput: increase fetch.min.bytes (e.g., 100KB) — broker waits to accumulate more data before responding. Reduces fetch round-trips. Tradeoff: higher latency per poll cycle.
Tuning for latency: fetch.min.bytes=1, fetch.max.wait.ms=0 — return immediately with whatever is available.
11. SEDA Pipeline Pattern: Staged Event-Driven Architecture¶
flowchart LR
Input["Input Topic\nraw-orders"] -->|Stage 1: validate + enrich| Validated["validated-orders"]
Validated -->|Stage 2: fraud check| FraudResult["fraud-scored-orders"]
FraudResult -->|fan-out| Fulfillment["fulfillment-orders"]
FraudResult -->|fan-out| Analytics["analytics-events"]
subgraph "Stage 2 internals"
FraudConsumer["Consumer Group: fraud-service\n(scales independently 1-N instances)"]
FraudConsumer -->|produce| FraudResult
end
Each stage scales independently — if fraud scoring is the bottleneck, add more instances to that consumer group without touching other stages. Backpressure propagates naturally through consumer lag.
12. Delivery Reliability: Failure Mode Analysis¶
flowchart TD
subgraph "Producer Failures"
PF1["Network timeout after send\n→ producer retries (idempotent: safe)"]
PF2["Broker leader election\n→ producer reconnects to new leader\n→ retries with idempotent dedup"]
PF3["RecordAccumulator full (buffer.memory)\n→ block or throw if max.block.ms exceeded"]
end
subgraph "Consumer Failures"
CF1["Consumer crash after processing, before commit\n→ reprocessing on restart (at-least-once)"]
CF2["Consumer crash before processing, after commit\n→ loss (at-most-once)"]
CF3["Rebalance during processing\n→ partition reassigned, uncommitted offsets reprocessed"]
end
subgraph "Broker Failures"
BF1["Leader fails mid-replication\n→ ISR follower elected leader\n→ HW ensures no uncommitted data served"]
BF2["All ISR replicas fail\n→ unclean.leader.election.enable=true\n→ potential data loss for availability"]
end
Summary: Effective Kafka Internals Map¶
mindmap
root((Effective Kafka Internals))
Producer
RecordAccumulator per partition
Sender background I/O thread
batch.size + linger.ms interaction
Sticky partitioner rotation
Compression
End-to-end compressed batch
Broker stores without recompressing
lz4/zstd ratio vs CPU tradeoff
Consumer
Offset committed to __consumer_offsets
Poll loop at-least/at-most/exactly-once
Rebalance stop-the-world vs cooperative
Reliability
ISR + HW commit tracking
min.insync.replicas safety gate
Transactional multi-partition atomicity
Serialization
Schema Registry Avro wire format
Magic byte + schema ID header
BACKWARD/FORWARD compatibility
Architecture Patterns
SEDA pipeline independent scaling
CQRS event-sourced ledger
Log shipping for replica sync