콘텐츠로 이동

Kafka Streams — Under the Hood: Topology Execution, State, and Stream Processing Internals

Source: Kafka Streams in Action — William P. Bejeck Jr. (Manning, 2018)


1. The Processor Topology: A DAG of Transforms

Kafka Streams is not a distributed cluster service — it is a client library that runs inside your JVM. Its fundamental execution model is a Directed Acyclic Graph (DAG) of processor nodes, built at startup time and executed per-record.

flowchart TD
  src["Source Node\n(Kafka topic consumer)"]
  mask["Masking Processor\nmapValues: CC→xxxx"]
  patterns["Purchase Patterns Processor\nmap: Purchase→PurchasePattern"]
  rewards["Rewards Processor\nmap: Purchase→RewardAccumulator"]
  purchases["Purchases Processor\n(identity / passthrough)"]
  patSink["patterns topic\n(Sink Node)"]
  rewSink["rewards topic\n(Sink Node)"]
  purSink["purchases topic\n(Sink Node)"]

  src --> mask
  mask --> patterns
  mask --> rewards
  mask --> purchases
  patterns --> patSink
  rewards --> rewSink
  purchases --> purSink

Depth-first traversal: each record entering the source node is fully processed through all connected processors before the next record is dequeued. This eliminates backpressure — there is no inter-node buffering, no async queue between processors in the same topology. The record traverses the entire DAG synchronously on a single thread before the next record begins.

Consequence: if one processor branch is slow (e.g., state store lookup), the entire DAG slows proportionally. There is no independent parallelism across branches within one task.


2. Task Model: Partitions → Tasks → Threads

flowchart TD
  subgraph KafkaCluster["Kafka Cluster"]
    P0["Topic Partition 0"]
    P1["Topic Partition 1"]
    P2["Topic Partition 2"]
    P3["Topic Partition 3"]
  end

  subgraph StreamsApp["Kafka Streams App (1 JVM)"]
    subgraph Thread1["StreamThread-1"]
      T0["StreamTask-0\n(owns P0)"]
      T1["StreamTask-1\n(owns P1)"]
    end
    subgraph Thread2["StreamThread-2"]
      T2["StreamTask-2\n(owns P2)"]
      T3["StreamTask-3\n(owns P3)"]
    end
  end

  P0 --> T0
  P1 --> T1
  P2 --> T2
  P3 --> T3

Tasks are the unit of parallelism. The number of tasks equals max(partitions across all source topics). Each task owns exactly one partition per source topic. A StreamThread runs multiple tasks in a round-robin poll loop.

Scaling: add more partitions → more tasks. Add more num.stream.threads → tasks distributed across more threads. Add more JVM instances → tasks distributed across more processes (each process is a separate Kafka consumer group member).

stateDiagram-v2
  direction LR
  [*] --> Created
  Created --> Running : start()
  Running --> Partitions_Revoked : rebalance triggered
  Partitions_Revoked --> Partitions_Assigned : SyncGroup complete
  Partitions_Assigned --> Running : resume processing
  Running --> Closing : close()
  Closing --> [*]

3. KStream vs KTable: Stream-Table Duality

flowchart LR
  subgraph KStream["KStream — Event Stream"]
    direction TB
    E1["offset=0 key=A val=1"]
    E2["offset=1 key=B val=10"]
    E3["offset=2 key=A val=2"]
    E4["offset=3 key=A val=3"]
    E5["All records retained\n(append semantics)"]
  end

  subgraph KTable["KTable — Changelog / Materialized View"]
    direction TB
    KV1["key=A → val=3 (latest)"]
    KV2["key=B → val=10 (latest)"]
    KV3["Only latest value per key\n(update semantics)"]
  end

  KStream -->|"group + aggregate\nor table()"| KTable
  KTable -->|"toStream()"| KStream

A KTable is a materialized view backed by a changelog topic. When a new record arrives with key K, the table's local state store is updated in-place — old value is overwritten. Downstream processors see only the latest value for each key.

Changelog topic: every KTable state store update is also written to a Kafka topic (<app-id>-<store-name>-changelog). On restart or reassignment, the Streams app rebuilds the state store by replaying this changelog topic from the beginning — restoring exact state without coordination.


4. State Stores: RocksDB and In-Memory

flowchart TD
  subgraph StreamTask["StreamTask"]
    Processor["Stateful Processor\n(aggregate, join)"]
    subgraph StateStore["State Store"]
      RocksDB["RocksDB\n(persistent, default)"]
      MemStore["In-Memory Store\n(volatile, fast)"]
      ChangelogProducer["Changelog Producer\n(writes to Kafka topic)"]
    end
  end

  Processor -->|"get(key)"| RocksDB
  Processor -->|"put(key, value)"| RocksDB
  RocksDB -->|"every write also forwarded"| ChangelogProducer
  ChangelogProducer -->|"async produce"| ChangelogTopic["changelog topic\n(Kafka)"]
  ChangelogTopic -->|"on restart: replay"| RocksDB

RocksDB (embedded LSM-tree key-value store) is the default persistent state store. Key properties: - Data stored in state.dir on local disk (default /tmp/kafka-streams/) - Bloom filters + SST file compaction enable O(1) average reads - Write path: MemTable → WAL → SSTable flush → compaction - Kafka Streams calls RocksDB's put() synchronously; changelog write is async

Standby replicas (num.standby.replicas): other StreamsApp instances maintain warm copies of state stores by consuming the changelog topic. On failover, the new task owner only needs to catch up the delta since the standby's checkpoint — sub-second recovery.


5. Windowing Internals

Windows enable aggregation over bounded time ranges. Kafka Streams implements several window types, each with distinct state organization.

flowchart LR
  subgraph TumblingWindow["Tumbling Window (non-overlapping)"]
    W0["[0,5) sec → agg A"]
    W1["[5,10) sec → agg B"]
    W2["[10,15) sec → agg C"]
  end

  subgraph HoppingWindow["Hopping Window (overlapping)"]
    H0["[0,10) sec"]
    H1["[5,15) sec"]
    H2["[10,20) sec"]
  end

  subgraph SessionWindow["Session Window (gap-based)"]
    S0["[t1,t3)\nactivity burst"]
    G0["gap > inactivity.gap → close session"]
    S1["[t5,t7)\nnew session"]
  end

Window state store key schema: (originalKey, windowStart, windowEnd). Each window has its own slot in RocksDB. When a record arrives at time t, Streams computes which windows it falls into and does a get + put for each.

Late arrivals: records arriving after grace.period.ms are discarded. The grace period allows out-of-order records (from network delays) to be incorporated into the correct window before it closes.

sequenceDiagram
  participant Record
  participant Streams
  participant WindowStore
  participant DownstreamSink

  Record->>Streams: event(key=X, ts=1000ms)
  Streams->>Streams: compute window start = 1000 - (1000 % windowSize)
  Streams->>WindowStore: get(X, windowStart)
  WindowStore-->>Streams: currentAgg (or null)
  Streams->>Streams: newAgg = aggregator(currentAgg, record.value)
  Streams->>WindowStore: put(X, windowStart, newAgg)
  WindowStore->>WindowStore: write changelog record
  Streams->>Streams: check if window is closed (stream time > windowEnd + grace)
  Streams->>DownstreamSink: emit(X, windowStart, newAgg)

Stream time vs wall clock time: Streams advances "stream time" based on the maximum observed record timestamp, not the system clock. This ensures deterministic window behavior even when records arrive out-of-order or during replay.


6. Joins: Co-Partitioning Requirement

Joins in Kafka Streams require co-partitioned topics — both input topics must have the same number of partitions and the same partitioning strategy. This ensures that records with the same key are always assigned to the same Streams task.

flowchart TD
  subgraph CoPartitioned["Co-partitioned Topics (required)"]
    Orders["orders topic\n4 partitions\nkey=orderId"]
    Products["products topic\n4 partitions\nkey=productId... wait, WRONG"]
  end

  subgraph CorrectJoin["Correct Join Setup"]
    Orders2["orders topic\n4 partitions\nkey=productId (rekeyed)"]
    Products2["products topic\n4 partitions\nkey=productId"]
    JoinResult["Join Result\norder enriched with product"]
  end

  Orders2 -->|"KStream-KTable join"| JoinResult
  Products2 -->|"KTable lookup"| JoinResult

KStream-KStream join: both streams are windowed. Records are matched if keys match AND timestamps fall within JoinWindows.of(Duration). Both sides are stored in in-memory or RocksDB windowed stores.

KStream-KTable join: the KTable acts as a lookup table. No windowing needed — latest KTable value for the key is used at the time the KStream record arrives. The KTable must be fully caught up (changelog replayed) before joins produce results.

GlobalKTable: replicated in full on every Streams instance (no co-partitioning requirement). The source topic's all partitions are consumed by every instance. Used for reference data (e.g., product catalog) that must be available for any key without repartitioning.

flowchart LR
  subgraph GlobalKTable
    GKT1["instance-1\nfull copy of table"]
    GKT2["instance-2\nfull copy of table"]
    GKT3["instance-3\nfull copy of table"]
  end

  RefTopic["reference topic\n(all partitions)"] --> GKT1
  RefTopic --> GKT2
  RefTopic --> GKT3

7. Processor API: Low-Level Node Building

The DSL (KStream/KTable) compiles down to the Processor API. You can use it directly for full control.

flowchart TD
  subgraph TopologyBuilder["Topology (built via Processor API)"]
    SourceNode["addSource()\nname='source'\ntopics=['transactions']"]
    ProcNode["addProcessor()\nname='enricher'\nparent='source'\nprocessorSupplier=EnrichProcessor::new"]
    StoreNode["addStateStore()\nname='product-store'\nconnect to 'enricher'"]
    SinkNode["addSink()\nname='sink'\ntopic='enriched-transactions'\nparent='enricher'"]
  end

  SourceNode --> ProcNode
  StoreNode -.->|"attached to"| ProcNode
  ProcNode --> SinkNode
// Processor lifecycle
class EnrichProcessor implements Processor<String, Order, String, EnrichedOrder> {
    ProcessorContext<String, EnrichedOrder> context;
    KeyValueStore<String, Product> productStore;

    @Override
    public void init(ProcessorContext<String, EnrichedOrder> context) {
        this.context = context;
        this.productStore = context.getStateStore("product-store");
        // Schedule punctuation: periodic callback
        context.schedule(Duration.ofSeconds(30), PunctuationType.STREAM_TIME,
            timestamp -> { /* cleanup stale state */ });
    }

    @Override
    public void process(Record<String, Order> record) {
        Product product = productStore.get(record.value().productId());
        EnrichedOrder enriched = new EnrichedOrder(record.value(), product);
        context.forward(record.withValue(enriched));
    }
}

Punctuation (context.schedule): a callback invoked either on stream time advancement (STREAM_TIME) or wall-clock time (WALL_CLOCK_TIME). Used for periodic state cleanup, emitting aggregates, or heartbeat-style operations. Stream-time punctuation only fires when records arrive — if no records come in, stream time does not advance.


8. Commit and Offset Management

sequenceDiagram
  participant StreamsTask
  participant Consumer
  participant Broker
  participant StateStore
  participant ChangelogTopic

  loop Every commit.interval.ms (default 30s) or cache full
    StreamsTask->>StateStore: flush() — write pending cache entries to RocksDB
    StateStore->>ChangelogTopic: flush changelog producer (sync)
    StreamsTask->>Consumer: commitSync(offsets)
    Consumer->>Broker: OffsetCommit to __consumer_offsets
  end

Processing guarantee: processing.guarantee=exactly_once_v2 (Kafka 2.5+)

  • Uses Kafka transactions internally
  • Each task wraps its processing in a transaction: state store writes + changelog writes + output topic writes + offset commits are all atomic
  • Requires Kafka broker 2.5+ and at least 3 replicas for internal topics
flowchart TD
  subgraph ExactlyOnce["exactly_once_v2 Transaction"]
    BeginTxn["beginTransaction()"]
    ProcessRecord["process record → state store put"]
    WriteChangelog["write changelog record (transactional)"]
    WriteOutput["write output topic record (transactional)"]
    CommitOffsets["sendOffsetsToTransaction()"]
    CommitTxn["commitTransaction()"]
  end

  BeginTxn --> ProcessRecord --> WriteChangelog --> WriteOutput --> CommitOffsets --> CommitTxn

If the application crashes mid-transaction, the broker aborts the incomplete transaction. On restart, the state store is rebuilt from the changelog (which only contains committed data), and the input consumer resumes from the last committed offset.


9. Interactive Queries: Exposing Local State

flowchart LR
  subgraph App1["Instance 1 (handles P0, P1)"]
    LocalStore1["state store\n(P0, P1 data)"]
    QueryServer1["REST endpoint\nport 8080"]
  end
  subgraph App2["Instance 2 (handles P2, P3)"]
    LocalStore2["state store\n(P2, P3 data)"]
    QueryServer2["REST endpoint\nport 8081"]
  end

  Client -->|"GET /state/key=X"| QueryServer1
  QueryServer1 -->|"key X → partition 2 → remote"| QueryServer2
  QueryServer2 -->|"localStore.get(X)"| LocalStore2
  LocalStore2 --> QueryServer2 --> QueryServer1 --> Client

KafkaStreams.queryMetadataForKey(storeName, key, serializer) returns the StreamsMetadata containing the host and port of the instance that owns the partition for that key. Application code must implement the HTTP layer and proxy requests to the correct instance.

sequenceDiagram
  participant Client
  participant Instance1
  participant Instance2
  participant RocksDB2

  Client->>Instance1: GET /query?key=customer-456
  Instance1->>Instance1: keyQueryMetadata(store, "customer-456", StringSer)
  Instance1-->>Instance1: → host=instance2:8081
  Instance1->>Instance2: HTTP GET /local-query?key=customer-456
  Instance2->>RocksDB2: get("customer-456")
  RocksDB2-->>Instance2: CustomerState{...}
  Instance2-->>Instance1: JSON response
  Instance1-->>Client: JSON response

10. Serdes: Serialization in the Topology

Every node boundary in the topology requires serialization/deserialization. Kafka Streams uses Serde<T> (Serializer + Deserializer pair).

flowchart LR
  KafkaTopic["Kafka Topic\n(byte[] key, byte[] value)"]
  SourceNode["Source Node\ndeserialize: Serde<K>, Serde<V>"]
  DSL["DSL Operations\n(in-memory Java objects)"]
  SinkNode["Sink Node\nserialize: Serde<K>, Serde<V>"]
  OutputTopic["Output Topic\n(byte[] key, byte[] value)"]

  KafkaTopic -->|"byte[]→K, byte[]→V"| SourceNode
  SourceNode --> DSL --> SinkNode
  SinkNode -->|"K→byte[], V→byte[]"| OutputTopic

SerdeException pitfall: if a Serde cannot deserialize a record (corrupt data, schema change), the task throws a DeserializationException. Default behavior: log and skip. Configure default.deserialization.exception.handler to control this.

State store Serdes: state stores also need Serdes for key and value types. These are configured separately from the topic Serdes. Mismatch causes runtime ClassCastException or corrupted state.


11. Topology Inspection and Testing

The Topology.describe() method returns a textual description of the full DAG:

Topologies:
  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [transactions])
      --> KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> KSTREAM-BRANCH-0000000002
    Processor: KSTREAM-BRANCH-0000000002 (stores: [])
      --> KSTREAM-BRANCHCHILD-0000000003, KSTREAM-BRANCHCHILD-0000000004

TopologyTestDriver: runs the entire topology in-process, without a real Kafka cluster:

sequenceDiagram
  participant Test
  participant TopologyTestDriver
  participant Topology
  participant StateStore

  Test->>TopologyTestDriver: new TopologyTestDriver(topology, props)
  TopologyTestDriver->>Topology: initialize all processors
  Test->>TopologyTestDriver: pipeInput(inputTopic, key, value)
  TopologyTestDriver->>Topology: process record through full DAG
  Topology->>StateStore: put/get (real RocksDB or in-memory)
  Test->>TopologyTestDriver: readOutput(outputTopic)
  TopologyTestDriver-->>Test: OutputRecord<K,V>
  Test->>Test: assertEquals(expected, actual)

The TopologyTestDriver uses a MockProcessorContext that supports injecting event-time by setting record timestamps manually. Punctuations can be triggered by calling advanceWallClockTime(). This makes window-boundary behavior fully deterministic in unit tests.


12. Full Internal Data Flow: End-to-End

flowchart TD
  subgraph Kafka["Apache Kafka"]
    InTopic["Input Topic\n(partitioned)"]
    Changelog["Changelog Topics\n(state backup)"]
    OutTopic["Output Topic"]
  end

  subgraph StreamsInstance["Kafka Streams Instance"]
    subgraph Thread["StreamThread"]
      subgraph Task0["StreamTask (partition 0)"]
        Consumer0["KafkaConsumer\n(poll records)"]
        DAG0["Processor Topology\n(depth-first per record)"]
        Store0["RocksDB State Store\n(local disk)"]
        Producer0["KafkaProducer\n(output + changelog)"]
      end
    end
  end

  InTopic -->|"FetchRequest"| Consumer0
  Consumer0 --> DAG0
  DAG0 <-->|"get/put"| Store0
  Store0 -->|"changelog write"| Producer0
  DAG0 -->|"output record"| Producer0
  Producer0 -->|"ProduceRequest"| Changelog
  Producer0 -->|"ProduceRequest"| OutTopic

The entire processing cycle per record: 1. KafkaConsumer.poll() returns ConsumerRecords batch 2. For each record, traverse DAG depth-first (synchronous) 3. State store operations read/write RocksDB synchronously 4. Output records buffered in KafkaProducer.RecordAccumulator 5. Every commit.interval.ms: flush state stores → flush changelog producer → commit consumer offsets 6. RecordAccumulator drains to Kafka asynchronously on Sender thread

The absence of inter-node queues within a task means zero-copy between processors — the record object is passed by reference through the DAG, with no serialization until it hits a sink node or state store.