콘텐츠로 이동

Kafka Streams in Action — Under the Hood: Topology Execution, State, and Stream Processing Internals

Source: Kafka Streams in Action — William P. Bejeck Jr., Manning 2018
Focus: Internal topology model, StreamTask execution, state store fault-tolerance, KTable cache mechanics, windowing state machines, Processor API scheduling, interactive query RPC routing


1. The Topology as a Directed Acyclic Graph (DAG)

Kafka Streams' mental model is fundamentally a DAG of processing nodes. Every transformation — mapValues, filter, branch, groupBy, join — becomes a node in this graph. The graph is assembled at build time (StreamsBuilder.build()) before any data flows.

graph TD
    A[Source Node<br/>transactions topic] --> B[Masking Processor<br/>maskCreditCard]
    B --> C[Filtering Processor<br/>price > 5.00]
    C --> D[SelectKey Processor<br/>purchaseDate as key]
    D --> E[Branch Processor<br/>predicates array]
    E --> F[Cafe Processor<br/>department==coffee]
    E --> G[Electronics Processor<br/>department==electronics]
    F --> H[Cafe Sink<br/>cafe-purchases topic]
    G --> I[Electronics Sink<br/>electronics-purchases topic]
    B --> J[Pattern Processor<br/>PurchasePattern.build]
    B --> K[Rewards Processor<br/>RewardAccumulator.build]
    J --> L[Patterns Sink<br/>patterns topic]
    K --> M[Rewards Sink<br/>rewards topic]
    B --> N[Purchases Sink<br/>purchases topic]

    style A fill:#2d6a4f,color:#fff
    style H fill:#1d3557,color:#fff
    style I fill:#1d3557,color:#fff
    style L fill:#1d3557,color:#fff
    style M fill:#1d3557,color:#fff
    style N fill:#1d3557,color:#fff

Node traversal: depth-first, single-threaded per StreamTask

Each StreamTask processes one partition assignment. When a record enters via the source node, execution propagates depth-first through the graph:

record arrives at Source Node
  → process at Masking Processor
    → process at Filtering Processor
      → process at Branch Processor
        → process at Cafe Processor (if predicate matches)
          → write to Cafe Sink
        → process at Electronics Processor (if predicate matches)
          → write to Electronics Sink
    → process at Pattern Processor
      → write to Patterns Sink
    → process at Rewards Processor
      → write to Rewards Sink
    → write to Purchases Sink

Key implication: no concurrent access to state stores from any single processor. Processor.process() and Punctuator.punctuate() are never called concurrently.


2. StreamThread and StreamTask Architecture

graph TD
    A[KafkaStreams<br/>application entry point] --> B[StreamThread Pool<br/>num.stream.threads]
    B --> C[StreamThread 1]
    B --> D[StreamThread 2]
    B --> E[StreamThread N]
    C --> F[TaskManager]
    F --> G[StreamTask P0<br/>owns partition 0]
    F --> H[StreamTask P1<br/>owns partition 1]
    F --> I[StandbyTask P2<br/>warm replica]
    G --> J[Local RocksDB<br/>state store]
    G --> K[Topology DAG<br/>node chain]
    H --> L[Local RocksDB<br/>state store]
    H --> M[Topology DAG<br/>node chain]
    I --> N[Replicated state<br/>from changelog topic]

    style A fill:#2d6a4f,color:#fff
    style J fill:#8b0000,color:#fff
    style L fill:#8b0000,color:#fff
    style N fill:#555,color:#fff

Partition → Task mapping (1:1 invariant)

Topic partitions:  [P0]  [P1]  [P2]  [P3]
                    │     │     │     │
StreamTasks:       [T0]  [T1]  [T2]  [T3]
                    │     │     │
                   RDB   RDB   RDB   (RocksDB per task)

Each StreamTask exclusively owns its partitions. This shared-nothing architecture means: - State store get(key) = local RocksDB lookup (no network hop) - No lock contention between tasks - Task failure is isolated — other tasks continue unaffected

The processing loop inside StreamThread

sequenceDiagram
    participant ST as StreamThread
    participant KCon as KafkaConsumer
    participant TM as TaskManager
    participant Task as StreamTask
    participant RDB as RocksDB

    loop Every poll cycle
        ST->>KCon: poll(pollMs)
        KCon-->>ST: ConsumerRecords batch
        ST->>TM: addRecords(partitionRecords)
        loop For each StreamTask
            TM->>Task: process()
            Task->>RDB: get(key) [state lookup]
            RDB-->>Task: value
            Task->>RDB: put(key, newValue)
            Task->>Task: context.forward(key, value) [depth-first]
        end
        ST->>TM: punctuate() [if scheduled time elapsed]
        ST->>TM: maybeCommit() [if commit.interval.ms elapsed]
    end

3. Serde Pipeline: How Bytes Flow Through the Topology

Every record in Kafka is raw bytes. The Serde (Serializer + Deserializer) wrapper handles conversion at boundaries:

flowchart LR
    A[Kafka Topic<br/>byte arrays] -->|deserialize key| B[String key]
    A -->|deserialize value| C[Purchase object]
    B --> D[Processor<br/>maskCreditCard]
    C --> D
    D -->|serialize key| E[byte[] key]
    D -->|serialize value| F[byte[] masked Purchase]
    E --> G[Kafka Sink Topic<br/>byte arrays]
    F --> G

    style A fill:#1d3557,color:#fff
    style G fill:#1d3557,color:#fff

Custom Serde internals (JSON → bytes)

sequenceDiagram
    participant App as KafkaStreams App
    participant Ser as JsonSerializer<T>
    participant Gson as Gson library
    participant Kafka as Kafka broker

    App->>Ser: serialize(topic, Purchase object)
    Ser->>Gson: toJson(object)
    Gson-->>Ser: JSON string
    Ser->>Ser: getBytes("UTF-8")
    Ser-->>Kafka: byte[]

The Serde<T> container holds both Serializer<T> and Deserializer<T>. This reduces parameter count from 4 (separate key/value ser/deser) to 2 Serde instances per topology operation.


4. State Stores: RocksDB Internals and Fault Tolerance

State store taxonomy

graph TD
    A[StateStore Types] --> B[In-Memory Key/Value<br/>Stores.inMemoryKeyValueStore]
    A --> C[Persistent Key/Value<br/>Stores.persistentKeyValueStore<br/>backed by RocksDB]
    A --> D[LRU Map<br/>Stores.lruMap]
    A --> E[Persistent Window Store<br/>Stores.persistentWindowStore]
    A --> F[Persistent Session Store<br/>Stores.persistentSessionStore]

    C --> G[RocksDB on local disk<br/>LSM-tree storage engine]
    E --> H[RocksDB + window keys<br/>(key, window_start_ms, window_end_ms)]
    F --> I[RocksDB + session keys<br/>(key, session_start, session_end)]

Changelog topic: how state survives crashes

sequenceDiagram
    participant Proc as StreamTask Processor
    participant Store as RocksDB State Store
    participant KProd as KafkaProducer (batched)
    participant CL as Changelog Topic<br/>(compacted)
    participant Restore as StateRestoreConsumer

    Proc->>Store: put(key, value)
    Store->>KProd: queue (key, value) for changelog
    KProd->>CL: batch send (on cache flush / commit)

    note over CL: Compacted: only latest per key retained

    rect rgb(200, 50, 50)
        note over Proc: CRASH
    end

    Restore->>CL: consume from beginning
    CL-->>Restore: all (key, value) pairs
    Restore->>Store: reconstruct RocksDB state
    note over Store: State fully restored to pre-crash point

Data locality principle

flowchart LR
    subgraph Server1
        T1[StreamTask 1] -->|local get/put<br/>~microseconds| S1[RocksDB<br/>State Store 1]
    end
    subgraph Server2
        T2[StreamTask 2] -->|local get/put<br/>~microseconds| S2[RocksDB<br/>State Store 2]
    end
    subgraph RemoteDB["Remote DB (ANTI-PATTERN)"]
        style RemoteDB fill:#8b0000,color:#fff
        T3[Processor] -->|network call<br/>~milliseconds| R3[Remote Database]
    end

Each processor owns exclusive access to its state store — no sharing across threads, no network traversal. At 1M records/sec, even 1ms network latency becomes 1000 seconds of overhead.


5. KTable Cache Mechanics and Deduplication

KTable represents an update stream (changelog), not an event stream. The cache layer is the key mechanism that prevents downstream flooding.

stateDiagram-v2
    [*] --> CacheBuffer: Record arrives (same key, new value)
    CacheBuffer --> CacheBuffer: Replace previous value for key
    CacheBuffer --> Downstream: Cache flush triggered
    Downstream --> StateStore: Write latest value to RocksDB
    Downstream --> ChangelogTopic: Replicate to changelog

    state CacheBuffer {
        [*] --> Accumulate
        Accumulate --> Flush: cache.max.bytes.buffering exceeded
        Accumulate --> Flush: commit.interval.ms elapsed
    }

Cache deduplication flow

flowchart TD
    A[Record: YERB→105.24] --> B{Cache hit for YERB?}
    B -->|No| C[Add YERB→105.24 to cache]
    D[Record: YERB→105.36] --> B
    B -->|Yes| E[Overwrite: YERB→105.36]
    F[Record: NDLE→33.56] --> G{Cache hit for NDLE?}
    G -->|No| H[Add NDLE→33.56 to cache]
    I{Flush trigger?} --> J[Emit to downstream:<br/>YERB→105.36<br/>NDLE→33.56]
    C --> I
    E --> I
    H --> I

    note1["YERB→105.24 was NEVER emitted downstream<br/>deduped by cache before flush"]
    J -.-> note1

    style note1 fill:#555,color:#fff

Critical insight: Setting cache.max.bytes.buffering=0 disables caching entirely. Every KTable update propagates downstream immediately, effectively turning a changelog stream into an event stream — and causing excessive I/O on persistent stores (every update hits RocksDB).


6. Windowing: Time Semantics and State Store Keys

Three window types compared

gantt
    title Window Types (20s window, 5s advance for hopping)
    dateFormat X
    axisFormat %Ss

    section Tumbling
    Window 1 :0, 20
    Window 2 :20, 40
    Window 3 :40, 60

    section Hopping
    Window 1 :0, 20
    Window 2 :5, 25
    Window 3 :10, 30
    Window 4 :15, 35

    section Session (inactivity gap = 5s)
    Session 1 :0, 15
    Session 2 :25, 50

Session window merging state machine

stateDiagram-v2
    [*] --> NewSession: Record 1 arrives (t=0:00:00)
    NewSession --> ActiveSession: start=00:00:00, end=00:00:00
    ActiveSession --> Merged: Record 2 arrives t=00:00:15\n(within inactivity gap 20s)
    Merged --> ActiveSession: start=00:00:00, end=00:00:15
    ActiveSession --> NewSession2: Record 3 arrives t=00:00:50\n(OUTSIDE inactivity gap)
    NewSession2 --> ActiveSession2: start=00:00:50, end=00:00:50
    ActiveSession --> MergedAll: Record 4 arrives t=00:00:05\n(matches BOTH sessions → merge all)
    MergedAll --> ActiveSession: start=00:00:00, end=00:00:50

RocksDB key layout for window stores

Window state store key format:
[original_key_bytes][window_start_ms (8 bytes)][window_end_ms (8 bytes)]

Example (tumbling 20s):
  key="YERB", window_start=0, window_end=20000
  → RocksDB key: "YERB\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x4E\x20"

Range scan to retrieve all windows for key:
  keyFrom = "YERB\x00\x00\x00\x00..."
  keyTo   = "YERB\xFF\xFF\xFF\xFF..."

7. Stream Joins: Co-Partitioning, Repartition, and State Stores

Join topology internals

flowchart TD
    A[coffeeStream<br/>key=customerId] --> B{Needs repartition?<br/>selectKey was called}
    B -->|Yes| C[Auto-repartition<br/>internal topic]
    D[electronicsStream<br/>key=customerId] --> E{Needs repartition?}
    E -->|Yes| F[Auto-repartition<br/>internal topic]
    C --> G[Join Processor]
    F --> G
    G --> H[State Store A<br/>coffee purchases window]
    G --> I[State Store B<br/>electronics purchases window]
    H --> J{Match found<br/>within JoinWindows 20min?}
    I --> J
    J -->|Yes| K[ValueJoiner.apply<br/>→ CorrelatedPurchase]
    K --> L[Downstream / Sink]

    style C fill:#8b4513,color:#fff
    style F fill:#8b4513,color:#fff

Co-partitioning requirement

flowchart LR
    subgraph "VALID: Same partition count"
        A1[coffeeStream<br/>4 partitions] --> J1[Join Processor]
        B1[electronicsStream<br/>4 partitions] --> J1
    end
    subgraph "INVALID: Mismatched partitions"
        style invalid fill:#8b0000,color:#fff
        A2[coffeeStream<br/>4 partitions] --> J2[TopologyBuilderException at startup]
        B2[electronicsStream<br/>6 partitions] --> J2
    end

When selectKey() is called, an internal Boolean flag needsRepartition=true is set. Any subsequent join(), reduce(), or aggregation() automatically triggers: 1. Write records to an internal repartition topic (keyed by new key) 2. Consume from repartition topic before proceeding 3. Both join participants' partitions are now aligned


8. KStream vs KTable: Record Model Duality

flowchart LR
    subgraph "KStream (Event Stream)"
        A[Record: YERB→105.24 at t=1] --> X1[Downstream]
        B[Record: YERB→105.36 at t=2] --> X1
        C[Record: YERB→105.01 at t=3] --> X1
        note1["ALL records emitted<br/>unbounded growth of events"]
    end

    subgraph "KTable (Update Stream / Changelog)"
        D[Record: YERB→105.24 at t=1] --> Cache
        E[Record: YERB→105.36 at t=2] --> Cache
        F[Record: YERB→105.01 at t=3] --> Cache
        Cache -->|flush| X2[Downstream: YERB→105.01 only]
        note2["Latest value per key<br/>older updates suppressed by cache"]
    end

builder.table() — what happens internally

sequenceDiagram
    participant App as Application
    participant SB as StreamsBuilder
    participant KT as KTable instance
    participant SS as Internal StateStore
    participant CT as Changelog Topic

    App->>SB: builder.table("stock-ticker-topic")
    SB->>KT: create KTable wrapping source topic
    SB->>SS: create StateStore (auto-named, not queryable)
    SB->>CT: register changelog topic for StateStore
    note over SS,CT: StateStore backed by auto-created changelog topic
    note over KT: Materialized overload allows naming store for IQ

9. Processor API: Custom Scheduling with Punctuator

The high-level DSL (KStream/KTable) buffers records and relies on commit/cache flush for downstream emission. The Processor API gives you direct control over when records are forwarded.

Punctuator scheduling: STREAM_TIME vs WALL_CLOCK_TIME

sequenceDiagram
    participant ST as StreamThread
    participant PG as PartitionGroup
    participant Task as StreamTask
    participant Punct as Punctuator

    loop Processing loop
        ST->>PG: extract minimum timestamp across all partitions
        PG-->>ST: minTimestamp (STREAM_TIME advance)
        ST->>Task: process(record)
        Task->>Task: update state store

        alt STREAM_TIME: if elapsed_time >= schedule_interval
            ST->>Punct: punctuate(currentTimestamp)
            Punct->>Task: context.forward(key, stockPerformance)
        end

        alt WALL_CLOCK_TIME: actual clock seconds elapsed
            ST->>Punct: punctuate(System.currentTimeMillis)
            note over Punct: called regardless of data activity
        end
    end

process() + punctuate() separation of concerns

flowchart TD
    A[Incoming StockTransaction record] --> B[process() method]
    B --> C{State store has<br/>StockPerformance for symbol?}
    C -->|No| D[Create new StockPerformance]
    C -->|Yes| E[Fetch existing StockPerformance]
    D --> F[Update price stats SMA-20]
    E --> F
    F --> G[Update volume stats SMA-20]
    G --> H[Set lastUpdateSent timestamp]
    H --> I[keyValueStore.put(symbol, stockPerf)]
    note1["process() NEVER calls context.forward()<br/>No record emitted per received record"]

    J{Punctuator fires every 10s} --> K[Iterate all state store keys]
    K --> L{priceDifferential >= 2%<br/>OR volumeDifferential >= 2%?}
    L -->|Yes| M[context.forward(key, stockPerformance)]
    L -->|No| N[Skip — no emission]

    style note1 fill:#555,color:#fff

Processor API topology wiring

graph TD
    A[addSource<br/>stocks-source<br/>stock-transactions topic] --> B[addProcessor<br/>stocks-processor<br/>StockPerformanceProcessor]
    B --> C[addStateStore<br/>stock-performance-store<br/>attached to stocks-processor]
    B --> D[addSink<br/>stocks-sink<br/>stock-performance topic]

    style A fill:#2d6a4f,color:#fff
    style D fill:#1d3557,color:#fff
    style C fill:#8b0000,color:#fff

10. Co-Grouping Processor: Multi-Parent DAG

A CogroupingProcessor has two parent nodes — records from both source processors funnel into it. This demonstrates that Kafka Streams DAGs are not strictly trees; a node can have multiple parents.

graph TD
    A[Txn-Source<br/>stock-transactions topic] --> B[Txn-Processor<br/>StockTransactionProcessor]
    C[Events-Source<br/>events topic] --> D[Events-Processor<br/>ClickEventProcessor]
    B --> E[CoGrouping-Processor<br/>CogroupingProcessor]
    D --> E
    E --> F[CoGrouping State Store<br/>tuple: List<clicks>, List<purchases>]
    E --> G[Sink / Print]

    style A fill:#2d6a4f,color:#fff
    style C fill:#2d6a4f,color:#fff
    style G fill:#1d3557,color:#fff
    style F fill:#8b0000,color:#fff

The co-grouping processor accumulates clicks and purchases separately in a shared state store, then emits a Tuple<List<ClickEvent>, List<StockTransaction>> per punctuate interval — a pattern not available in the high-level DSL.


11. Interactive Queries: Distributed State Store Access

When multiple instances of a Kafka Streams application run, each instance owns a subset of partitions (and thus a subset of state). Interactive queries (IQ) allow external services to query any instance for any key.

IQ architecture: local vs remote routing

sequenceDiagram
    participant Client as External Client
    participant App1 as Instance 1 (owns P0, P1)
    participant App2 as Instance 2 (owns P2, P3)
    participant IQ1 as Local RocksDB (P0,P1 data)
    participant IQ2 as Local RocksDB (P2,P3 data)
    participant Meta as StreamsMetadata registry

    Client->>App1: GET /query?key=YERB
    App1->>Meta: Which instance owns key YERB?
    Meta-->>App1: Instance 2 (partition 2 via hash)
    App1->>App2: HTTP RPC: GET /query?key=YERB
    App2->>IQ2: store.get("YERB")
    IQ2-->>App2: StockPerformance value
    App2-->>App1: JSON response
    App1-->>Client: StockPerformance for YERB

Configuration requirements for IQ

# Required for IQ remote routing
application.server=hostname:port  # each instance must know its own address

# In application code:
KafkaStreams.store(
  StoreQueryParameters.fromNameAndType(
    "stock-performance-store",
    QueryableStoreTypes.keyValueStore()
  )
)
flowchart TD
    A[KafkaStreams.store query] --> B{Is key local?<br/>hash(key) % numPartitions == localPartitions?}
    B -->|Yes| C[Direct RocksDB.get<br/>sub-millisecond]
    B -->|No| D[StreamsMetadata.getInstanceWithKey]
    D --> E[HTTP/gRPC to remote instance<br/>user-implemented RPC server]
    E --> F[Remote instance: local RocksDB.get]
    F --> G[Response back through chain]

12. Exactly-Once Semantics in Kafka Streams

sequenceDiagram
    participant Task as StreamTask
    participant TC as TransactionCoordinator
    participant Kafka as Kafka Brokers
    participant OS as Output Topics
    participant Off as __consumer_offsets

    Task->>TC: initTransactions()
    loop Per batch of records (EOS v2 per epoch)
        Task->>TC: beginTransaction()
        Task->>Kafka: consume records
        Task->>Task: process (update state stores)
        Task->>OS: produce output records (within transaction)
        Task->>Off: commitOffset (within transaction)
        Task->>TC: commitTransaction()
        note over TC,Off: commit is atomic across output + offset
    end

    rect rgb(200, 50, 50)
        note over Task: CRASH mid-transaction
    end
    TC->>Kafka: abort uncommitted transaction
    note over Kafka: Records invisible to read_committed consumers

processing.guarantee=exactly_once_v2 (Kafka 2.5+): - One producer per StreamTask (not per thread) → fewer transactions - Epoch-based fencing replaces old per-partition transaction ID scheme - Overhead: ~5-10% throughput reduction vs at_least_once


13. Aggregation Internals: GroupBy, Reduce, KGroupedStream

flowchart TD
    A[KStream.stream<br/>stock-transactions topic] -->|mapValues| B[ShareVolume objects]
    B -->|groupBy symbol| C[KGroupedStream<br/>intermediate — not directly usable]
    C -->|reduce ShareVolume::sum| D[KTable<String, ShareVolume><br/>rolling share volume per symbol]
    D -->|groupBy industry| E[KGroupedTable<br/>by industry]
    E -->|aggregate with FixedSizePriorityQueue| F[KTable<String, Top5Queue>]
    F -->|mapValues queue→string| G[KTable<String, String><br/>top-5 report]
    G -->|toStream.to| H[stock-volume-by-company topic]

    style C fill:#555,color:#fff
    style H fill:#1d3557,color:#fff

KTable.aggregate with adder/subtractor

For KTable aggregations, records with the same key are updates, not new records. The aggregate must both add new values AND remove old ones:

aggregate(
  initializer:  () → FixedSizePriorityQueue,   // fresh empty queue
  adder:        (k, v, agg) → agg.add(v),       // add new ShareVolume
  subtractor:   (k, v, agg) → agg.remove(v),    // remove old ShareVolume
  Materialized.with(stringSerde, queueSerde)
)

When record for YERB is updated (105.24 → 105.36): 1. subtractor called with old value (105.24) — removes from top-5 queue 2. adder called with new value (105.36) — adds to top-5 queue 3. Queue reorders by share volume


14. Branching: Multiple Children from One KStream

KStream.branch() produces an array of KStream instances, one per predicate. Records not matching any predicate are dropped silently.

flowchart TD
    A[transactionStream] -->|branch predicate[0]: coffee?| B[coffeeStream]
    A -->|branch predicate[1]: electronics?| C[electronicsStream]
    A -->|no predicate matches| D[DROPPED silently]

    B --> E[Cafe Sink]
    C --> F[Electronics Sink]

    style D fill:#8b0000,color:#fff
KStream<String, Purchase>[] branches = stream.branch(
    (k, v) -> v.getDepartment().equals("coffee"),
    (k, v) -> v.getDepartment().equals("electronics")
);
// branches[0] = coffee purchases
// branches[1] = electronics purchases

15. Monitoring: JMX Metrics and State Listeners

Kafka Streams metrics hierarchy

graph TD
    A[JMX MBeans] --> B[kafka.streams:type=stream-metrics]
    A --> C[kafka.streams:type=stream-task-metrics]
    A --> D[kafka.streams:type=stream-processor-node-metrics]
    A --> E[kafka.streams:type=stream-state-metrics]

    B --> B1[commit-latency-avg/max]
    B --> B2[poll-latency-avg/max]
    C --> C1[process-latency-avg/max per task]
    C --> C2[commit-latency per task]
    D --> D1[process-rate per node]
    D --> D2[forward-rate per node]
    E --> E1[flush-rate per store]
    E --> E2[restore-rate on restart]

Application state lifecycle

stateDiagram-v2
    [*] --> CREATED: KafkaStreams constructed
    CREATED --> REBALANCING: kafkaStreams.start()
    REBALANCING --> RUNNING: partition assignment complete
    RUNNING --> REBALANCING: consumer group rebalance
    RUNNING --> PENDING_SHUTDOWN: kafkaStreams.close()
    PENDING_SHUTDOWN --> NOT_RUNNING: cleanup complete
    RUNNING --> ERROR: uncaught exception in StreamThread

    note right of REBALANCING: State restore happens here\nchangelog replayed into RocksDB
    note right of ERROR: StateRestoreListener.onRestoreEnd\nUncaughtExceptionHandler fires

16. Topology Inspection and Describe

Before running, you can inspect the topology graph programmatically:

KafkaStreams app = new KafkaStreams(topology, config);
System.out.println(topology.describe());

Output reveals the internal node graph including auto-generated repartition topics, internal state store topics, and the exact parent-child wiring. This is essential for debugging unexpected data routing or verifying that selectKey() correctly inserted a repartitioning step.


17. Exactly-Once vs At-Least-Once Trade-offs

flowchart LR
    subgraph "at_least_once (default)"
        A1[High throughput] --> B1[Possible duplicate records\non consumer rebalance]
        B1 --> C1[No transaction overhead]
    end
    subgraph "exactly_once_v2"
        A2[~5-10% throughput cost] --> B2[Atomic consume+produce+commit]
        B2 --> C2[TransactionCoordinator overhead]
        C2 --> D2[Epoch-based producer fencing\nprevents zombie writes]
    end

Use exactly_once when: - Financial transactions, billing, inventory deduction — any domain where duplicate processing causes real-world harm - Downstream consumers use isolation.level=read_committed

Use at_least_once when: - Idempotent downstream operations (counts can be recomputed, analytics approximate) - Throughput > correctness guarantee


Summary: Kafka Streams Internal Data Flow

flowchart TD
    A[Kafka Topic Records<br/>byte arrays] -->|KafkaConsumer poll| B[PartitionGroup<br/>buffered per partition]
    B -->|min timestamp selection| C[StreamTask.process<br/>depth-first traversal]
    C --> D{State needed?}
    D -->|Yes| E[RocksDB local get/put<br/>sub-millisecond]
    D -->|No| F[Stateless transform]
    E --> G[Cache buffer<br/>dedup by key]
    F --> G
    G -->|commit.interval OR cache full| H[Flush to changelog topic<br/>KafkaProducer batched]
    G --> I[Emit to downstream processor<br/>or sink topic]
    I -->|join path| J[JoinWindowed StateStore<br/>look for matching records]
    J -->|match within window| K[ValueJoiner.apply → joined record]
    I -->|aggregation path| L[KGroupedStream reduce/aggregate<br/>→ KTable update]
    H --> M[Changelog topic<br/>compacted for fault recovery]
    M -->|crash + restart| E

    style A fill:#1d3557,color:#fff
    style E fill:#8b0000,color:#fff
    style M fill:#2d6a4f,color:#fff

The Kafka Streams execution model is: read from Kafka → process depth-first through DAG → write state locally → replicate state to Kafka → emit output to Kafka. The local state stores backed by changelog topics are the core innovation — they provide sub-millisecond lookups while maintaining full fault tolerance through Kafka's own durability guarantees.