콘텐츠로 이동

Kafka Streams & ksqlDB — Under the Hood

Sources: Mastering Kafka Streams and ksqlDB (Mitch Seymour, O'Reilly 2021) · Designing Event-Driven Systems (Ben Stopford, O'Reilly 2018)


1. Processor Topology: Depth-First DAG Execution

A Kafka Streams application is a directed acyclic graph (DAG) of source, processing, and sink processors. The runtime doesn't work in layers — it traverses the graph depth-first per record: one record is pushed all the way from source to sink before the next record is pulled from the consumer.

flowchart TD
    subgraph SubTopology_0["Sub-topology 0"]
        S0[Source: raw-tweets] --> F[filter: isEnglish?]
        F --> B[branch: language]
        B --> M[merge: combined stream]
    end
    subgraph SubTopology_1["Sub-topology 1 (repartition boundary)"]
        M --> RP["Repartition Topic\n(internal)"]
        RP --> S1[Source: repartitioned]
        S1 --> AGG[aggregate: sentiment count]
        AGG --> SINK[Sink: crypto-sentiment]
    end
    style SubTopology_0 fill:#1a1a2e,stroke:#4a90d9
    style SubTopology_1 fill:#16213e,stroke:#4a90d9

Why sub-topologies matter: Whenever the DSL introduces a repartition topic (through, groupByKey after selectKey, join co-partitioning), the single logical topology is split. Each sub-topology is an independent unit of parallelism — tasks for sub-topology 0 and sub-topology 1 are scheduled independently across stream threads.


2. Task and Thread Model

flowchart LR
    subgraph JVM["JVM Process"]
        T1["StreamThread-1"]
        T2["StreamThread-2"]
        T3["StreamThread-3"]
    end
    subgraph Tasks["Task Assignment"]
        T1 --> Task0["Task 0\n(partition 0)"]
        T1 --> Task1["Task 1\n(partition 1)"]
        T2 --> Task2["Task 2\n(partition 2)"]
        T3 --> Task3["Task 3\n(partition 3)"]
    end
    subgraph Kafka["Kafka Broker"]
        P0["topic-partition-0"]
        P1["topic-partition-1"]
        P2["topic-partition-2"]
        P3["topic-partition-3"]
    end
    Task0 --> P0
    Task1 --> P1
    Task2 --> P2
    Task3 --> P3

Key invariant: One task owns exactly one input partition set. Tasks never share state stores. Stream threads (configured by num.stream.threads) pull records from their assigned tasks. Because tasks are the unit of parallelism, adding partitions → adding tasks → more threads/instances can absorb them.


3. Stream-Table Duality & KStream/KTable/GlobalKTable

stateDiagram-v2
    [*] --> KStream: append-only event log
    KStream --> KTable: groupBy + aggregate (materializes into state store)
    KTable --> KStream: toStream() (emit changelog records)
    KTable --> GlobalKTable: broadcast to ALL tasks (no co-partitioning)
    KStream --> [*]: to(sink topic)
Abstraction Storage Changelog Co-partition required? Update semantics
KStream None (stateless) No Yes (for joins) Each record independent
KTable RocksDB (per-task) Yes Yes (for joins) Upsert by key
GlobalKTable RocksDB (replicated everywhere) Yes (read from offset 0) No Full broadcast copy

Stream-table duality (Ben Stopford): Every stream is a table of accumulated changes; every table is a stream of upserts. A compacted Kafka topic IS the changelog of a table — replaying it from offset 0 reconstructs the entire table state.

sequenceDiagram
    participant Events as Kafka Topic (stream)
    participant KS as Kafka Streams Runtime
    participant RS as RocksDB State Store
    Events->>KS: key=user1, val=pageview
    KS->>RS: put(user1, count+1)
    Events->>KS: key=user1, val=pageview
    KS->>RS: put(user1, count+1)
    Note over RS: Table = latest value per key
    RS-->>Events: Changelog topic (reverse: stream of mutations)

4. Persistent State Store Disk Layout

Default base: /tmp/kafka-streams/<application-id>/
Production override: StreamsConfig.STATE_DIR_CONFIG

block-beta
    columns 3
    block:dir["&lt;app-id&gt;/"]:3
        block:t0["0_0/ (task 0_0)"]:1
            A[".lock"]
            B[".checkpoint"]
            block:ss["pulse-counts/ (RocksDB)"]
                C["MANIFEST-000001"]
                D["000002.sst"]
                E["000003.sst"]
                F["CURRENT"]
                G["OPTIONS-000005"]
            end
        end
        block:t1["0_1/ (task 0_1)"]:1
            H[".lock"]
            I[".checkpoint"]
            block:ss2["pulse-counts/"]
                J["SST files..."]
            end
        end
        block:t2["1_0/ (task 1_0)"]:1
            K[".lock"]
            L[".checkpoint"]
            block:ss3["another-store/"]
                M["SST files..."]
            end
        end
    end

Task ID format: <sub-topology-id>_<partition-number>
.checkpoint file: Contains the last committed Kafka offset from the changelog topic — this is the recovery cursor. On restart, Kafka Streams replays only from .checkpoint offset forward, not from beginning.
.lock file: Prevents two stream threads from owning the same state directory concurrently.

RocksDB LSM Internals (used by Kafka Streams persistent stores)

flowchart TD
    W[Write: put key/value] --> MEM["MemTable\n(in RAM, ~64MB default)"]
    MEM -->|flush when full| L0["Level 0 SST files\n(unsorted, recent)"]
    L0 -->|compaction| L1["Level 1 SSTs\n(sorted, ~256MB)"]
    L1 -->|compaction| L2["Level 2 SSTs\n(sorted, ~2.5GB)"]
    L2 -->|compaction| L3["Level 3 SSTs\n(sorted, ~25GB)"]

    R[Read: get key] --> BF["Bloom Filter\n(skip SSTs that can't have key)"]
    BF --> BC["Block Cache\n(LRU, frequently read blocks)"]
    BC -->|cache miss| SST["SST file block read"]

Each Kafka Streams task gets its own RocksDB instance under state.dir/<app-id>/<task-id>/<store-name>/. No sharing across tasks, so no lock contention.


5. Changelog Topics & Fault Tolerance

sequenceDiagram
    participant App as Kafka Streams App
    participant RS as RocksDB (local)
    participant CL as Changelog Topic (Kafka)
    participant CP as .checkpoint file

    App->>RS: put(key, aggregated_value)
    RS->>CL: async write: (key, value) [changelog producer]
    Note over CL: topic name: &lt;app-id&gt;-&lt;store-name&gt;-changelog

    App->>CP: commit: write last offset from changelog

    Note over App: CRASH

    App->>CP: read last checkpointed offset
    App->>CL: replay from checkpointed offset → offset N
    CL->>RS: re-apply updates (delta only)
    Note over RS: State fully restored ✓

Changelog topic naming: <application-id>-<internal-store-name>-changelog
Incremental recovery: The .checkpoint file avoids replaying the entire changelog. On a fresh instance (no checkpoint), full replay from offset 0.
Disabling changelog (Materialized.withLoggingDisabled()): Creates an ephemeral store — fast writes, zero recovery on failure.


6. Standby Replicas

flowchart LR
    subgraph Instance1["Instance 1 (active)"]
        A1["Task 0: process records"]
        S1["RocksDB state store\n(up-to-date)"]
        A1 --> S1
    end
    subgraph Instance2["Instance 2 (warm standby)"]
        A2["StandbyTask 0:\nconsume changelog only"]
        S2["RocksDB state store\n(nearly up-to-date)"]
        A2 --> S2
    end
    subgraph Kafka
        CL["changelog topic\n(partition 0)"]
    end
    S1 -->|produce changelog| CL
    CL -->|active consumption| A1
    CL -->|standby consumption| A2

    Instance1 -- FAIL --> Instance2
    Instance2 -->|"promote: delta replay only\n(vs. full rebuild)"| Active2["Instance 2 (now active)"]

num.standby.replicas: How many warm standby copies to maintain. Standby tasks consume changelog topics but never process input topic records. Promotion = replay only the small delta since the standby's last consumed offset.


7. Rebalancing: The Enemy of Stateful Applications

stateDiagram-v2
    state EagerRebalance {
        [*] --> STOP: all tasks stop
        STOP --> REVOKE: all partitions revoked from all members
        REVOKE --> REASSIGN: coordinator issues new assignment
        REASSIGN --> RESTORE: each member rebuilds state from changelog
        RESTORE --> [*]: resume processing
        note right of RESTORE: O(state_size) downtime — catastrophic for large state
    }

    state CooperativeStickyRebalance {
        [*] --> PARTIAL_REVOKE: only migrating partitions revoked
        PARTIAL_REVOKE --> PARTIAL_ASSIGN: migrating tasks reassigned
        PARTIAL_ASSIGN --> DELTA_RESTORE: only new tasks restore state
        DELTA_RESTORE --> [*]: unaffected tasks continue processing throughout
        note right of DELTA_RESTORE: O(migrated_state_size) — minimal disruption
    }

Sticky assignment (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY): Tasks are preferentially reassigned to the instance that previously held them (or their standby). This means the state store on that instance already has the data — recovery is near-instantaneous.

Static membership (group.instance.id): Prevents "dead member" rebalances when an instance temporarily disconnects. The coordinator waits session.timeout.ms before treating it as dead.


8. Windowing: Stream-Time Semantics

Kafka Streams uses event-time (the timestamp embedded in the record), not wall-clock time. Stream time advances when records arrive — late records can still be admitted within a grace period.

gantt
    title Tumbling Window (size=5min)
    dateFormat mm:ss
    axisFormat %M:%S
    section Window [00:00 - 05:00)
    Record A (event-time 01:30): active, 01:30, 10s
    Record B (event-time 03:45): active, 03:45, 10s
    section Window [05:00 - 10:00)
    Record C (event-time 06:20): active, 06:20, 10s
    Record D (late, event-time 04:50): crit, 07:30, 10s
Window Type Overlap? Triggered by Use case
Tumbling No Fixed size Hourly batch aggregations
Hopping Yes (slide < size) Fixed size + advance Rolling 5-min avg
Session No (gap-based) Inactivity gap User session analytics

Grace period: TimeWindows.of(size).grace(Duration.ofMinutes(1)) — records arriving up to 1 minute late are still admitted. After grace expires, the window result is finalized and emitted downstream (if using Suppressed.untilWindowCloses()).


9. Processor API — Punctuation & Low-Level Control

The Processor API exposes the raw process(key, value) method and punctuation — a periodic callback triggered by stream-time or wall-clock-time advancement.

sequenceDiagram
    participant KS as StreamThread
    participant P as Processor
    participant SS as State Store
    participant SC as StreamTimeSchedule

    KS->>P: process(key1, val1)
    P->>SS: put / get
    KS->>P: process(key2, val2)
    P->>SS: put / get
    Note over SC: stream-time advanced past next punctuation interval
    KS->>P: punctuate(timestamp)
    P->>SS: iterate all keys → emit aggregated results
    P->>KS: context.forward(key, aggregated_val)

Wall-clock punctuation (PunctuationType.WALL_CLOCK_TIME): Fires even when no records arrive — useful for timeout detection (e.g. "alert if no heartbeat in 30s").
Stream-time punctuation (PunctuationType.STREAM_TIME): Only fires when records advance stream-time past the interval — pauses during idle input.


10. Exactly-Once Semantics (EOS v2)

sequenceDiagram
    participant Source as Input Topic
    participant App as Kafka Streams (EOS_V2)
    participant Store as RocksDB Changelog Producer
    participant Sink as Output Topic

    Source->>App: poll records (batch)
    App->>Store: write state updates to changelog
    App->>Sink: produce output records
    Note over App,Sink: All within a single Kafka transaction
    App->>Source: commit transaction (atomically commits input offsets + changelog + output)

    Note over App: On crash: transaction rolled back
    Note over App: Replay from last committed input offset
    Note over Store: Changelog rolled back too — state consistent

EOS_V2 (processing.guarantee=exactly_once_v2): Uses a single transactional producer per stream thread (rather than per task in v1). Reduces the number of open transactions from O(tasks) to O(threads), significantly reducing broker-side load.


11. Interactive Queries — Cross-Instance State Retrieval

flowchart TD
    Client["REST Client: GET /count/user42"]
    App1["Instance 1\n(owns partition 0→1)"]
    App2["Instance 2\n(owns partition 2→3)"]

    Client --> App1
    App1 -->|"KeyQueryMetadata.activeHost()\n= Instance 2"| Proxy["Proxy request to\nInstance 2:8080/count/user42"]
    Proxy --> App2
    App2 -->|RocksDB.get(user42)| Result["count = 47"]
    Result --> Client

Metadata routing: streams.queryMetadataForKey(storeName, key, serializer) returns a KeyQueryMetadata with the host/port owning the key. The application is responsible for proxying the query to that host — Kafka Streams provides the routing table, not the HTTP layer.

Read-your-own-writes consistency: Interactive queries may return stale data during rebalances or lag catch-up. streams.state() returns REBALANCING if consistency is temporarily compromised.


12. ksqlDB Architecture — Two Deployment Modes

flowchart TD
    subgraph InteractiveMode["Interactive Mode"]
        CLI["ksqlDB CLI / REST API"]
        CLI --> Server1["ksqlDB Server 1\n(leader)"]
        CLI --> Server2["ksqlDB Server 2\n(follower)"]
        Server1 <-->|"Command Topic\n_confluent-ksql-<id>__command_topic"| Server2
        Server1 --> KS1["Kafka Streams Engine"]
        Server2 --> KS2["Kafka Streams Engine"]
    end
    subgraph HeadlessMode["Headless Mode"]
        SQL["queries.sql file"]
        SQL --> H1["ksqlDB Server A"]
        SQL --> H2["ksqlDB Server B"]
        H1 --> KS3["Kafka Streams Engine"]
        H2 --> KS4["Kafka Streams Engine"]
    end

Command topic: In interactive mode, all DDL statements (CREATE STREAM, CREATE TABLE) are written to the command topic. All server nodes consume this topic — this is how query definitions are replicated across the cluster. A newly joined server replays the command topic from offset 0 to reconstruct all active queries.

Interactive vs Headless: - Interactive: REST API + CLI available, ad-hoc queries possible, command topic for coordination - Headless: no REST API, queries defined in .sql file at startup, suitable for production deployments where queries are fixed


13. ksqlDB Pull Queries vs Push Queries

sequenceDiagram
    participant Client
    participant ksqlDB as ksqlDB Server
    participant RS as RocksDB (materialized view)
    participant KT as Kafka Topic (input stream)

    Note over Client,KT: PULL QUERY: point-in-time snapshot
    Client->>ksqlDB: SELECT count FROM user_counts WHERE user='alice';
    ksqlDB->>RS: get('alice') → 42
    ksqlDB->>Client: 42 (response ends)

    Note over Client,KT: PUSH QUERY: continuous streaming result
    Client->>ksqlDB: SELECT count FROM user_counts WHERE user='alice' EMIT CHANGES;
    loop as new records arrive
        KT->>ksqlDB: new record for alice
        ksqlDB->>RS: updated count → 43
        ksqlDB->>Client: 43 (streaming row)
        KT->>ksqlDB: new record for alice
        ksqlDB->>RS: updated count → 44
        ksqlDB->>Client: 44 (streaming row)
    end

Pull queries: Read from materialized state store. Low latency (~ms). Suitable for request/response patterns. Require the table to be materialized (CREATE TABLE ... WITH (KAFKA_TOPIC=...) AS SELECT ...).

Push queries: Subscribe to ongoing changes. Long-lived HTTP connection (chunked transfer). Suitable for dashboards, real-time feeds.


14. ksqlDB Connect Integration

flowchart LR
    subgraph ksqlDB["ksqlDB Server"]
        QE["Query Engine\n(Kafka Streams)"]
        CI["Connect Integration Layer"]
    end
    subgraph ConnectCluster["Kafka Connect Cluster (external)"]
        WC["Worker: JDBC Source Connector"]
        WK["Worker: S3 Sink Connector"]
    end
    DB[(Postgres DB)] -->|poll| WC
    WC -->|records| KT["Kafka Topic"]
    KT --> QE
    QE -->|results| KT2["Output Topic"]
    KT2 --> WK
    WK -->|sink| S3[(S3 Bucket)]

    CI <-->|"REST API\nCREATE SOURCE CONNECTOR"| ConnectCluster

Embedded Connect (single node): ksqlDB runs a Kafka Connect worker in-process. Simple deployments.
External Connect: ksqlDB proxies connector DDL (CREATE SOURCE/SINK CONNECTOR) to a separate Kafka Connect cluster via REST. Production-grade, independent scaling.


15. Event-Driven Design Patterns (Designing Event-Driven Systems)

Commands vs Events vs Queries

flowchart LR
    A[Service A] -- "Command\nprocessPayment()\n(expects response, may fail)" --> B[Service B]
    A -- "Event\nOrderCreated{widget}\n(fire and forget, no expectation)" --> LOG[Kafka Topic]
    LOG -- "fan-out" --> C[Service C]
    LOG -- "fan-out" --> D[Service D]
    A -- "Query\ngetOrder(id=42)\n(read-only, no side effects)" --> E[Service E]
Interaction Direction Coupling Side effects Response
Command One-to-one Tight Yes Maybe
Event One-to-many Loose None at source Never
Query One-to-one Medium None Always

Event Collaboration Pattern

sequenceDiagram
    participant OS as Order Service
    participant PS as Payment Service
    participant IS as Inventory Service
    participant NS as Notification Service
    participant K as Kafka

    OS->>K: OrderPlaced{orderId, items, amount}
    K->>PS: OrderPlaced event
    PS->>K: PaymentProcessed{orderId, status}
    K->>IS: PaymentProcessed event
    IS->>K: InventoryReserved{orderId, items}
    K->>NS: InventoryReserved event
    NS->>K: NotificationSent{orderId, channel}
    Note over OS,NS: No direct service-to-service calls\nEach service reacts to events independently

Choreography (shown above): No central orchestrator. Services react independently. Highly decoupled — adding a new subscriber (e.g. analytics service) requires zero changes to existing services.

Orchestration: A central "process manager" service issues commands to each downstream service and awaits their responses. Tighter coupling but clearer control flow for complex sagas.


16. Stream-Table Duality & Database Inside-Out

flowchart TD
    subgraph TraditionalDB["Traditional Database"]
        App1["Application"] -->|query| DB[("Shared Database\n(coupling point)")]
        App2["Application"] -->|query| DB
        App3["Application"] -->|query| DB
    end

    subgraph DatabaseInsideOut["Database Inside-Out (Kafka)"]
        LOG[("Kafka: Commit Log\n(shared source of truth)")] 
        LOG -->|stream| V1["Materialized View 1\n(search index, inside Service A)"]
        LOG -->|stream| V2["Materialized View 2\n(analytics cache, inside Service B)"]
        LOG -->|stream| V3["Materialized View 3\n(ML features, inside Service C)"]
    end
    style TraditionalDB fill:#2d1b1b,stroke:#ff6b6b
    style DatabaseInsideOut fill:#1b2d1b,stroke:#6bff6b

Key insight (Jay Kreps / Martin Kleppmann): A database has four components — commit log, query engine, indexes, caches. Kafka unbundles these: - Commit log → Kafka topic (append-only, replayable) - Indexes/views → Kafka Streams KTable / state stores (materialized per-service) - Query engine → ksqlDB or application-level query against local state - Cache → Local RocksDB, replicated via standby replicas

Each service keeps its own lean view — only the fields it needs. If the view is lost, replay from Kafka to reconstruct. Writes and reads are fully decoupled.


17. Compacted Topics as Event-Sourced State

flowchart LR
    subgraph Log["Compacted Topic (user-profiles)"]
        R1["(user1, {name:Alice, age:30})"]
        R2["(user2, {name:Bob, age:25})"]
        R3["(user1, {name:Alice, age:31})"]
        R4["(user1, tombstone: null)"]
    end
    subgraph AfterCompaction["After Log Compaction"]
        C1["(user2, {name:Bob, age:25})"]
        C2["(user1, tombstone: null)"]
        Note["user1's older versions deleted\nTombstone marks eventual deletion"]
    end
    Log -->|"log cleaner runs"| AfterCompaction

Compacted topic = event-sourced table: Every key's latest value is retained indefinitely. A service starting fresh can replay the compacted topic from offset 0 to rebuild its complete local state — equivalent to loading a database snapshot.

Combined approach (latest-versioned pattern): Keep both a regular (time-limited) topic for audit trail AND a compacted topic for latest-state lookup. A Kafka Streams job links them — writing to the compacted topic whenever the regular topic has a new record.


Summary: Internal Data Flow

flowchart TD
    INPUT["Kafka Input Topic\n(partitioned)"]
    INPUT -->|"1 partition = 1 task"| TASK["StreamTask\n(owns state + processing)"]
    TASK -->|depth-first record traversal| PROC["Processor Chain\n(filter → map → aggregate)"]
    PROC -->|upsert| RS["RocksDB\n(SST files on disk)"]
    RS -->|async write| CL["Changelog Topic\n(fault tolerance)"]
    CL -->|standby consumption| STANDBY["Standby Replica RocksDB\n(warm copy)"]
    PROC -->|produce| OUTPUT["Kafka Output Topic"]

    REBALANCE["Rebalance Event"] -->|sticky: reassign to previous owner| TASK
    STANDBY -->|"promote (delta replay only)"| TASK

    KSQL["ksqlDB Query Engine"] -->|compiled to| KS_TOPO["Kafka Streams Topology"]
    KS_TOPO --> TASK
    KSQL_PULL["Pull Query"] -->|point-in-time| RS
    KSQL_PUSH["Push Query"] -->|continuous subscribe| OUTPUT