콘텐츠로 이동

Effective Kafka — Under the Hood: Producer Internals, Reliability, and Performance Mechanics

Source: Effective Kafka — Emil Koutanov (Leanpub, 2021). Focus: not API usage but the mechanical internals of how Kafka batches records through the accumulator, how compression interacts with batching end-to-end, how consumer offset semantics prevent data loss, and how reliability guarantees propagate through the entire pipeline.


1. Producer Internals: RecordAccumulator and I/O Thread

The Kafka producer is asynchronous by designsend() never directly writes to the network. Instead, it stages records through an internal accumulator.

flowchart TD
    App["Application: producer.send(record)"] -->|serialize + partition assign| Accumulator["RecordAccumulator\n(in-memory per-partition ProducerBatch queue)"]
    Accumulator -->|batch full OR linger.ms elapsed| Sender["Sender Thread (background I/O)"]
    Sender -->|drain batches by node| NetworkClient["NetworkClient\n(TCP connection per broker)"]
    NetworkClient -->|ProduceRequest (batches)| Broker["Kafka Broker Leader"]
    Broker -->|ACK (acks=all: wait ISR)| NetworkClient
    NetworkClient -->|callback invoked| App

Accumulator Batch Lifecycle

stateDiagram-v2
    [*] --> Empty: partition drain queue
    Empty --> Accumulating: first record arrives → new ProducerBatch
    Accumulating --> Accumulating: more records added (same partition)
    Accumulating --> Drained: batch.size reached OR linger.ms expired
    Drained --> InFlight: Sender picks up batch
    InFlight --> Completed: ACK received → callback fired
    InFlight --> RetryQueue: error (retryable) → back to accumulator
    Completed --> [*]

Key interaction between batch.size and linger.ms: - linger.ms=0 (default): batch dispatched as soon as Sender thread runs — minimal latency - linger.ms=5: wait up to 5ms, accumulating more records → larger batches → better compression - batch.size override: even if linger.ms hasn't elapsed, dispatch once batch fills up


2. End-to-End Batching and Compression

sequenceDiagram
    participant Producer
    participant PBatch as ProducerBatch (accumulator)
    participant BrokerLog as Broker Log Segment
    participant Consumer

    Producer->>PBatch: append records (uncompressed in memory)
    Note over PBatch: batch.size reached
    PBatch->>PBatch: compress entire batch (snappy/lz4/zstd/gzip)
    PBatch->>BrokerLog: ProduceRequest with compressed RecordBatch
    BrokerLog->>BrokerLog: store compressed bytes as-is (no decompression)
    Consumer->>BrokerLog: FetchRequest
    BrokerLog->>Consumer: compressed RecordBatch bytes
    Consumer->>Consumer: decompress batch → individual records

The broker stores and transfers compressed batches without recompressing — this is the "zero-copy end-to-end" property. Compression is client-side CPU, but saves network bandwidth AND broker disk space simultaneously.

Compression Ratio vs. Batch Size

flowchart LR
    SmallBatch["Small batch (2 records)\nJSON: ~300 bytes\ncompressed: ~280 bytes\nratio: 1.07x"] -->|increase batch size| LargeBatch["Large batch (1000 records)\nJSON: ~150KB\ncompressed: ~21KB\nratio: 7x (typical JSON)"]
    LargeBatch -->|diminishing returns| VeryLarge["Very large batch\nratio plateaus near entropy limit"]

Compression codec tradeoffs: - gzip: highest ratio, highest CPU — good for archival - lz4: balanced ratio + speed — recommended for production - zstd: best ratio/speed tradeoff (Kafka 2.1+) — preferred for high-throughput


3. Consumer Offset Mechanics and Delivery Guarantees

flowchart TD
    subgraph "Consumer Poll Loop"
        Poll["consumer.poll(timeout)"] -->|fetch records| Process["Application processes records"]
        Process -->|commitSync() or commitAsync()| OffsetCommit["__consumer_offsets topic\n{group, topic, partition} → offset+1"]
    end
    subgraph "At-Least-Once vs At-Most-Once"
        ALO["At-Least-Once:\nprocess THEN commit\nduplicates on crash"] --> CommitAfter
        AMO["At-Most-Once:\ncommit THEN process\nloss on crash"] --> CommitBefore
    end
    subgraph "Exactly-Once"
        EOE["Process + produce to output topic + commit offset\nin ONE atomic Kafka transaction"] --> TransactionalAPI
    end

Offset Commit Internals

sequenceDiagram
    participant Consumer
    participant GroupCoordinator as Group Coordinator (broker)
    participant OffsetTopic as __consumer_offsets

    Consumer->>GroupCoordinator: OffsetCommitRequest\n{groupId, topic, partition, offset=101, metadata}
    GroupCoordinator->>OffsetTopic: append record (key=group+topic+partition, value=offset)
    OffsetTopic-->>GroupCoordinator: ACK
    GroupCoordinator-->>Consumer: OffsetCommitResponse (OK)

    Note over Consumer: On restart
    Consumer->>GroupCoordinator: OffsetFetchRequest {groupId, topic, partition}
    GroupCoordinator-->>Consumer: offset=101 → resume from 101

The __consumer_offsets topic is itself a compacted Kafka topic — for each (group, topic, partition) key, only the latest committed offset is retained. Offset storage is thus just a compacted key-value store built on Kafka's own log.


4. Consumer Group Rebalance Protocol

sequenceDiagram
    participant C1 as Consumer 1
    participant C2 as Consumer 2 (new)
    participant GC as Group Coordinator

    C1->>GC: Heartbeat (memberId=m1)
    C2->>GC: JoinGroupRequest (groupId, protocols=[range,roundrobin])
    GC->>C1: REBALANCE_IN_PROGRESS (via next heartbeat)
    C1->>GC: JoinGroupRequest (memberId=m1, rejoin)
    C2->>GC: JoinGroupRequest (memberId=m2)
    GC->>C1: JoinGroupResponse (leader=m1, members=[m1,m2])
    GC->>C2: JoinGroupResponse (follower)
    C1->>C1: run partition assignment algorithm
    C1->>GC: SyncGroupRequest (assignment: {m1:[p0,p1], m2:[p2,p3]})
    C2->>GC: SyncGroupRequest (no assignment - follower)
    GC->>C1: SyncGroupResponse (partitions=[p0,p1])
    GC->>C2: SyncGroupResponse (partitions=[p2,p3])

Stop-the-world rebalance: all consumers in the group pause consumption during rebalance. Cooperative/Incremental rebalance (Kafka 2.4+) moves only the reassigned partitions, minimizing pause.


5. Serialization and Schema Management

flowchart TD
    Record["Java POJO\nCustomerPayload"] -->|Serializer.serialize()| Bytes["byte[] payload"]
    subgraph "With Schema Registry (Avro)"
        POJO2["Java POJO"] -->|AvroSerializer| Magic["Magic byte: 0x00\n+ Schema ID (4 bytes)\n+ Avro-encoded payload"]
        Magic -->|wire format| Kafka["Kafka Record value"]
        Kafka -->|AvroDeserializer| LookupSR["Fetch schema from\nSchema Registry by ID\n(cached after first fetch)"]
        LookupSR -->|deserialize| POJO3["Java POJO"]
    end

Schema evolution with Avro: - BACKWARD compatible: new schema can read old data (add fields with defaults) - FORWARD compatible: old schema can read new data (remove fields that have defaults) - FULL compatible: both directions — safest for production

Schema Registry stores (subject, version) → schema and enforces compatibility rules on registration.


6. Partitioning Strategy: Key Hashing Mechanics

flowchart TD
    Record["ProducerRecord(topic, key, value)"]
    Record -->|key != null| Hash["murmur2(key.serialize()) % numPartitions"]
    Record -->|key == null, default partitioner| Sticky["StickyPartitioner\nbatch to same partition until batch full\nthen rotate"]
    Record -->|custom partitioner| Custom["Custom Partitioner.partition()"]
    Hash --> P["Target Partition"]
    Sticky --> P
    Custom --> P

Sticky partitioner (default since Kafka 2.4): instead of round-robin per-record (creating many small batches across all partitions), assigns all null-key records to one partition until the batch fills, then switches. Result: larger batches, better compression, fewer Produce requests.

Partition Count and Parallelism

flowchart LR
    Partitions["Topic: 12 partitions"] -->|consumer group 4 consumers| Assignment["3 partitions per consumer\n(max parallelism = numPartitions)"]
    subgraph "Scaling rule"
        Rule1["numConsumers <= numPartitions\n(extra consumers idle)"]
        Rule2["numPartitions determines max throughput\n(each partition = one ordered stream)"]
    end

7. Broker Log Architecture: Segments and Indexes

block-beta
    columns 1
    block:partition_dir:1
        columns 2
        seg0_log["00000000000000000000.log\n(segment data: records)"]
        seg0_idx["00000000000000000000.index\n(sparse offset→physical pos)"]
        seg0_time["00000000000000000000.timeindex\n(timestamp→offset)"]
        seg1_log["00000000000000001024.log\n(next segment)"]
        seg1_idx["00000000000000001024.index"]
        active_log["00000000000000002048.log (active)"]
    end

Segment rollover: when active segment reaches log.segment.bytes (default 1GB) or log.roll.ms (default 7 days), a new segment is created. Old segments become eligible for retention/compaction.

Sparse index: .index file does NOT index every offset — it indexes every log.index.interval.bytes of data. Finding offset O: 1. Binary search .index for largest indexed offset ≤ O → physical position P 2. Sequential scan .log from P until offset O found


8. Replication: ISR Mechanics and acks Configuration

sequenceDiagram
    participant Producer
    participant Leader as Partition Leader (broker 1)
    participant F1 as Follower 1 (broker 2)
    participant F2 as Follower 2 (broker 3)

    Producer->>Leader: ProduceRequest (acks=all, batch)
    Leader->>Leader: append to local log (HW not yet advanced)
    F1->>Leader: FetchRequest (fetchOffset=N)
    Leader-->>F1: records [N..M]
    F1->>F1: append to local log
    F2->>Leader: FetchRequest (fetchOffset=N)
    Leader-->>F2: records [N..M]
    F2->>F2: append to local log
    Note over Leader: all ISR members confirmed up to M
    Leader->>Leader: advance High Watermark to M
    Leader-->>Producer: ProduceResponse (offset=M)

High Watermark (HW): the highest offset that ALL ISR replicas have confirmed. Consumers can only read up to HW — no "dirty reads" of uncommitted data.

ISR shrinkage: if a follower lags beyond replica.lag.time.max.ms (default 30s), it's removed from ISR. If ISR shrinks to just the leader and min.insync.replicas=2, the leader rejects acks=all produces — preventing data loss over availability.


9. Transactions: Atomic Multi-Partition Produce

sequenceDiagram
    participant App
    participant Producer as Transactional Producer
    participant TC as Transaction Coordinator
    participant TopicA as Topic A (partitions)
    participant TopicB as Topic B (partitions)
    participant OffTopic as __consumer_offsets

    App->>Producer: beginTransaction()
    Producer->>TC: AddPartitionsToTxn (topicA:p0, topicB:p1)
    Producer->>TopicA: ProduceRequest (PID, epoch, seq)
    Producer->>TopicB: ProduceRequest (PID, epoch, seq)
    Producer->>OffTopic: TxnOffsetCommit (group offsets within txn)
    App->>Producer: commitTransaction()
    Producer->>TC: EndTxn (commit=true)
    TC->>TopicA: write COMMIT marker
    TC->>TopicB: write COMMIT marker
    TC->>OffTopic: write COMMIT marker
    Note over App: All or nothing — consumers see all or none

Transaction log (__transaction_state): the Transaction Coordinator persists txn state durably. On broker failure, new coordinator recovers from this log and completes or aborts in-flight transactions.


10. Consumer Configuration: Fetch Behavior Internals

flowchart TD
    Consumer["consumer.poll()"] -->|FetchRequest| Broker
    Broker -->|fetch.min.bytes=1| WaitData["Wait until ≥ 1 byte available (default: return immediately)"]
    Broker -->|fetch.max.wait.ms=500| MaxWait["Or return after 500ms even if empty"]
    Broker -->|max.partition.fetch.bytes=1MB| PerPartition["Max 1MB per partition per fetch"]
    Broker -->|fetch.max.bytes=50MB| TotalMax["Max 50MB total per fetch response"]
    WaitData --> Response["FetchResponse → consumer deserializes"]

Tuning for throughput: increase fetch.min.bytes (e.g., 100KB) — broker waits to accumulate more data before responding. Reduces fetch round-trips. Tradeoff: higher latency per poll cycle.

Tuning for latency: fetch.min.bytes=1, fetch.max.wait.ms=0 — return immediately with whatever is available.


11. SEDA Pipeline Pattern: Staged Event-Driven Architecture

flowchart LR
    Input["Input Topic\nraw-orders"] -->|Stage 1: validate + enrich| Validated["validated-orders"]
    Validated -->|Stage 2: fraud check| FraudResult["fraud-scored-orders"]
    FraudResult -->|fan-out| Fulfillment["fulfillment-orders"]
    FraudResult -->|fan-out| Analytics["analytics-events"]
    subgraph "Stage 2 internals"
        FraudConsumer["Consumer Group: fraud-service\n(scales independently 1-N instances)"]
        FraudConsumer -->|produce| FraudResult
    end

Each stage scales independently — if fraud scoring is the bottleneck, add more instances to that consumer group without touching other stages. Backpressure propagates naturally through consumer lag.


12. Delivery Reliability: Failure Mode Analysis

flowchart TD
    subgraph "Producer Failures"
        PF1["Network timeout after send\n→ producer retries (idempotent: safe)"]
        PF2["Broker leader election\n→ producer reconnects to new leader\n→ retries with idempotent dedup"]
        PF3["RecordAccumulator full (buffer.memory)\n→ block or throw if max.block.ms exceeded"]
    end
    subgraph "Consumer Failures"
        CF1["Consumer crash after processing, before commit\n→ reprocessing on restart (at-least-once)"]
        CF2["Consumer crash before processing, after commit\n→ loss (at-most-once)"]
        CF3["Rebalance during processing\n→ partition reassigned, uncommitted offsets reprocessed"]
    end
    subgraph "Broker Failures"
        BF1["Leader fails mid-replication\n→ ISR follower elected leader\n→ HW ensures no uncommitted data served"]
        BF2["All ISR replicas fail\n→ unclean.leader.election.enable=true\n→ potential data loss for availability"]
    end

Summary: Effective Kafka Internals Map

mindmap
  root((Effective Kafka Internals))
    Producer
      RecordAccumulator per partition
      Sender background I/O thread
      batch.size + linger.ms interaction
      Sticky partitioner rotation
    Compression
      End-to-end compressed batch
      Broker stores without recompressing
      lz4/zstd ratio vs CPU tradeoff
    Consumer
      Offset committed to __consumer_offsets
      Poll loop at-least/at-most/exactly-once
      Rebalance stop-the-world vs cooperative
    Reliability
      ISR + HW commit tracking
      min.insync.replicas safety gate
      Transactional multi-partition atomicity
    Serialization
      Schema Registry Avro wire format
      Magic byte + schema ID header
      BACKWARD/FORWARD compatibility
    Architecture Patterns
      SEDA pipeline independent scaling
      CQRS event-sourced ledger
      Log shipping for replica sync