Data Mining & Big Data Internals: Under the Hood¶
Source synthesis: Data mining and big data reference books (comp 234–240, 268–290, 310, 436–438, 441, 446, 448, 453–454, 488) covering Hadoop MapReduce internals, Spark RDD/DAG execution, Flink streaming, column store compression, LSM trees, and data mining algorithms.
1. Hadoop MapReduce — Execution Internals¶
flowchart TD
subgraph MapReduce Pipeline
Input["HDFS Input Files\n(128MB blocks)"]
IS["InputSplit\n(logical split ≤ 1 block)\n→ 1 Map task per split"]
Map["Map Tasks (parallel)\nmap(k1,v1) → list(k2,v2)\nrun on same DataNode as block\n(data locality)"]
Buffer["In-memory Sort Buffer\n(io.sort.mb=100MB default)\nSpillThreshold=0.80\n→ sorted runs spilled to disk"]
Combine["Combiner (optional)\nlocal reduce-like aggregation\nbefore shuffle\n→ reduces network traffic"]
Shuffle["Shuffle + Sort\nHTTP fetch from Map outputs\n→ merge-sort by key\n(external merge sort)\nGrouped by key for Reduce"]
Reduce["Reduce Tasks\nreduce(k2, list(v2)) → list(k3,v3)"]
Output["HDFS Output\n(replication factor=3)"]
end
Input --> IS --> Map --> Buffer --> Combine --> Shuffle --> Reduce --> Output
subgraph HDFS Block Layout
NN["NameNode\n- fsimage: namespace tree\n- edit log: recent mutations\n- block→DataNode mapping (in-memory)\n- heartbeat every 3s\n- block report every 6h"]
DN["DataNode × N\n- store 128MB block files\n- block scanner: CRC32 verify\n- pipeline write: client → DN1 → DN2 → DN3"]
NN <-->|"RPC"| DN
end
MapReduce Shuffle Deep Dive¶
sequenceDiagram
participant Map as Map Task
participant Buffer as Sort Buffer (RAM)
participant Disk as Local Disk (spill files)
participant Reduce as Reduce Task
Map->>Buffer: output (k,v) pairs
Note over Buffer: buffer fill → quicksort in-place by (partition, key)
Buffer->>Disk: spill sorted run to disk (spill.N)
Note over Disk: multiple spills → merge-sort into single file per Map task
Reduce->>Map: HTTP GET /mapOutput?mapId=...&reduce=R
Note over Reduce: fetch from all Map tasks (shuffle phase)
Reduce->>Reduce: merge-sort fetched segments
Note over Reduce: on-disk merge if too many segments\n→ k-way merge with priority queue
Reduce->>Reduce: reduce() called for each key group
2. Apache Spark — RDD, DAG, and Execution¶
flowchart TD
subgraph Spark Application Architecture
Driver["Driver Process\n- SparkContext\n- DAGScheduler\n- TaskScheduler\n- BlockManagerMaster"]
EM["Executor Manager\n(YARN/K8s ResourceManager)"]
E1["Executor 1\n- BlockManager\n- Task threads (cores=4)\n- JVM heap + off-heap"]
E2["Executor 2\n- BlockManager\n- Task threads"]
end
Driver -->|"task launch"| E1 & E2
Driver <-->|"resource negotiation"| EM
E1 <-->|"shuffle data"| E2
subgraph RDD Lineage (Lazy Evaluation)
R1["textFile('/data/log')\n(HadoopRDD)"]
R2["filter(line => line.contains('ERROR'))\n(FilteredRDD)"]
R3["map(line => (line.split(' ')(0), 1))\n(MappedRDD)"]
R4["reduceByKey(_ + _)\n(ShuffledRDD)\n← STAGE BOUNDARY (shuffle)"]
R5["collect()\n← ACTION → triggers execution"]
R1 --> R2 --> R3 --> R4 --> R5
end
DAG Scheduler — Stage Splitting¶
flowchart LR
subgraph DAGScheduler
Action["collect() action"]
Stage2["ResultStage\n(after shuffle)\nTask: read ShuffleMapOutput\n→ reduceByKey → output"]
Stage1["ShuffleMapStage\n(before shuffle boundary)\nTask: read HDFS block\n→ filter → map\n→ write shuffle files"]
TaskScheduler["TaskScheduler\nSchedule tasks on executors\n(data locality: PROCESS_LOCAL\n → NODE_LOCAL → RACK_LOCAL → ANY)"]
end
Action --> Stage2
Stage2 -->|"depends on shuffle output of"| Stage1
Stage1 & Stage2 --> TaskScheduler
Shuffle Internals (Sort-based)¶
flowchart TD
subgraph Spark Sort Shuffle
Map2["Map Task output\n(key,value) pairs"]
Serialize["Serialize to byte[] (Kryo / Java)"]
SortPartition["Sort by (partition_id, sort_key)\nusing Tungsten's off-heap sorter\n(unsafe memory, no GC pressure)"]
IndexFile["shuffle_{mapId}_{reduceId}.index\n(byte offsets per partition)"]
DataFile["shuffle_{mapId}_{reduceId}.data\n(all partitions concatenated)"]
Fetch["Reduce task fetches partition slice\nvia BlockManager.getRemoteBytes()\n(HTTP or direct memory transfer)"]
end
Map2 --> Serialize --> SortPartition
SortPartition --> IndexFile & DataFile
DataFile --> Fetch
IndexFile --> Fetch
3. Spark Tungsten — Off-Heap Binary Format¶
flowchart LR
subgraph Tungsten Memory Management
JavaObj["Java Object\nRow: ('Alice', 30, 95.5)\nJVM: 80+ bytes with headers, padding"]
UnsafeRow["UnsafeRow (binary)\n8B null bitmap\n8B fixed-width field 0 (ptr+len for string)\n8B fixed-width field 1 (30 as long)\n8B fixed-width field 2 (95.5 as double)\n+ variable-length data (Alice bytes)\nTotal: ~48 bytes, cache-line friendly"]
NoCopy["No deserialization needed\nDirect unsafe memory reads\nComparisons on raw bytes\n→ 2-5× less memory, no GC"]
end
JavaObj --> UnsafeRow --> NoCopy
subgraph Whole-Stage Code Generation
Interpreted["Interpreted Volcano:\nfor each row:\n Filter.next() calls Map.next() calls Scan.next()\n → virtual dispatch, no loop fusion"]
Codegen["Whole-Stage Codegen:\ngenerate single tight loop:\nfor (row in partition) {\n if (filter_cond) {\n hash_agg.update(row.getLong(0), row.getDouble(1))\n }\n}\n→ JIT-friendly, CPU register optimized"]
end
4. Apache Flink — Streaming Execution Internals¶
flowchart TD
subgraph Flink Architecture
JM["JobManager\n- JobGraph → ExecutionGraph\n- CheckpointCoordinator\n- ResourceManager"]
TM1["TaskManager 1\n- TaskSlots (1 per core)\n- Network buffers (32KB each)\n- Managed memory (off-heap)"]
TM2["TaskManager 2"]
end
subgraph Dataflow Graph
Source["Kafka Source\n(parallelism=8\n→ 8 source tasks)"]
Map3["Map/Filter\n(parallelism=8)"]
Window["WindowOperator\n(keyBy → parallel=8)\nTumbling/Sliding/Session windows"]
Sink["Sink (parallelism=4)"]
end
JM --> TM1 & TM2
Source --> Map3 --> Window --> Sink
subgraph Network Data Exchange
LocalFwd["Local Forward (same TM, same slot chain)\n→ in-memory buffer hand-off (0 copy)"]
NetworkShuffle["Network Shuffle (different TM)\n→ serialize → Netty buffer pool\n→ TCP channel → deserialize\nCredit-based flow control:\nreceiver grants credits (buffer availability)\n→ sender limited by credits → no buffer overflow"]
end
Flink Checkpoint — Chandy-Lamport Algorithm¶
sequenceDiagram
participant CC as CheckpointCoordinator (JM)
participant S1 as Source 1
participant S2 as Source 2
participant OP as Operator (Window)
participant Sink2 as Sink
CC->>S1: triggerCheckpoint(checkpointId=5)
CC->>S2: triggerCheckpoint(checkpointId=5)
S1->>S1: snapshot state → S3/HDFS
S1->>OP: emit BARRIER(5) into stream
S2->>S2: snapshot state
S2->>OP: emit BARRIER(5)
Note over OP: Wait for BARRIER from ALL inputs (barrier alignment)
OP->>OP: snapshot window state → S3/HDFS
OP->>Sink2: emit BARRIER(5)
Sink2->>Sink2: snapshot state
Sink2-->>CC: acknowledgeCheckpoint(5)
Note over CC: All operators acked → checkpoint complete → advance recovery point
5. LSM Tree — Write Path Internals¶
flowchart TD
subgraph LSM-Tree (RocksDB / Cassandra)
Write["Write: PUT(key, value)"]
WAL["WAL (Write-Ahead Log)\nappend-only, sequential write\n→ crash recovery"]
Memtable["MemTable\n(in-memory sorted structure\nRed-Black tree or SkipList)\nwrite + read: O(log n)"]
Immutable["Immutable MemTable\n(flipped when MemTable full ~64MB)\n→ being flushed to L0"]
L0["L0 SSTable files\n(sorted within file\nbut keys can overlap between L0 files)\n→ 4–8 files before compaction trigger"]
L1["L1 SSTables\n(total size ~256MB\nnon-overlapping key ranges)"]
L2["L2 SSTables\n(total size ~2.56GB)"]
Ln["L_n ...\n(10× size amplification per level)"]
end
Write --> WAL & Memtable
Memtable --> Immutable --> L0 --> L1 --> L2 --> Ln
subgraph Compaction (Leveled)
Pick["Pick overlapping SSTable from L_i\nand all overlapping SSTables from L_{i+1}"]
Merge["k-way merge (sorted iterators)\nremove tombstones (if past TTL)\ndedup: newer version wins"]
Write2["Write new sorted SSTable to L_{i+1}\ndelete old files (atomic manifest update)"]
Pick --> Merge --> Write2
end
SSTable Bloom Filter + Index¶
flowchart LR
subgraph SSTable Structure
BF["Bloom Filter\n(per file)\n~10 bits/key, 1% FPR\ncheck before disk seek\n→ avoid disk read for non-existent keys"]
Index["Block Index\n(sparse: one entry per 4KB data block)\n{first_key → file_offset}\n→ binary search to find block"]
DataBlocks["Data Blocks (4KB each)\n{key, value} pairs sorted\nSnappy/Zstd compressed\nCRC32 checksum"]
Footer["Footer\noffsets of MetaIndex and Index blocks\nmagic number"]
end
BF --> Index --> DataBlocks --> Footer
6. Column Store — Compression & Vectorized Execution¶
flowchart LR
subgraph Column Storage (Parquet / ORC)
Row["Row-oriented:\n[id=1, name=Alice, age=30, score=95.5]\n[id=2, name=Bob, age=25, score=87.0]\n→ read all columns even if only 'score' needed"]
Col["Column-oriented:\nid_col: [1,2,3,4,...]\nname_col: [Alice,Bob,Carol,...]\nage_col: [30,25,28,...]\nscore_col:[95.5,87.0,92.3,...]\n→ read only 'score_col' for aggregation"]
end
subgraph Parquet Encoding Schemes
Plain["Plain encoding:\n[30, 25, 28, 30, 25]\n(4 bytes each)"]
DeltaEnc["Delta encoding:\nfirst=30, deltas=[−5,+3,+2,−5]\n(variable-length int → 2× smaller)"]
RLE["Run-Length Encoding (RLE):\n[30, 30, 30, 30, 25, 25]\n→ {val=30, run=4}, {val=25, run=2}\n(highly repetitive cols: status, country)"]
Dict["Dictionary encoding:\n{0:Alice, 1:Bob, 2:Carol}\nname_col=[0,1,2,0,1,...]\n(low-cardinality strings: 10× smaller)"]
end
Plain --> DeltaEnc & RLE & Dict
Vectorized Query Execution¶
flowchart TD
subgraph Vectorized vs Scalar
Scalar["Scalar (row-at-a-time):\nfor (row : table) {\n if (row.age > 25)\n sum += row.score;\n}\n→ function call overhead per row\nbranch misprediction"]
Vector["Vectorized (batch of 1024 rows):\nSIMD AVX-512 comparison:\nmask = age_col[0..1023] > 25 (512-bit vector op)\nmasked sum: sum += score_col[mask]\n→ 16 FP32 ops per SIMD instruction\n→ 10–50× faster for aggregations"]
end
Scalar --> Vector
subgraph Late Materialization
Filter2["Evaluate filter on compressed column\nget matching row IDs (bitmap)\nOnly fetch other columns for matching rows\n→ avoid decompressing unwanted rows"]
end
7. Frequent Itemset Mining — Apriori & FP-Growth¶
flowchart TD
subgraph Apriori Algorithm
D["Transaction database\nT1: {milk, bread, beer}\nT2: {milk, bread}\nT3: {bread, beer, diapers}"]
C1["C1 (candidate 1-itemsets):\n{milk:2, bread:3, beer:2, diapers:1}"]
L1["L1 (freq 1-itemsets, minSup=2):\n{milk:2, bread:3, beer:2}"]
C2["C2 (candidate 2-itemsets from L1):\n{milk,bread}, {milk,beer}, {bread,beer}"]
L2["L2 (freq 2-itemsets):\n{milk,bread:2}, {bread,beer:2}"]
Prune["Anti-monotone pruning:\nif {milk,beer,bread} has subset {milk,beer} not freq\n→ prune (all subsets must be frequent)"]
end
D --> C1 --> L1 --> C2 --> L2 --> Prune
subgraph FP-Growth (no candidate generation)
FPTree["FP-Tree:\ncompressed prefix tree\nheader table → linked list per item\nTwo DB scans: 1. freq items 2. build tree"]
ConditionalDB["Conditional Pattern Base:\nfor each freq item β:\n extract all paths to β\n form conditional FP-tree\n mine recursively"]
FPTree --> ConditionalDB
end
8. K-Means Clustering — Lloyd's Algorithm Internals¶
flowchart TD
subgraph K-Means Iteration
Init["Initialize k centroids\n(K-means++: D² weighted sampling\n→ spread-out initialization)"]
Assign["Assignment step:\nfor each point x_i:\n c(x_i) = argmin_k ‖x_i - μ_k‖²\n(nearest centroid)\nO(n·k·d) per iteration"]
Update["Update step:\nμ_k = (1/|C_k|) Σ_{x∈C_k} x\n(mean of assigned points)"]
Converge["Convergence:\ncentroids change < ε\nor max_iter reached\nObjective: minimize inertia\n= Σ_i min_k ‖x_i - μ_k‖²"]
end
Init --> Assign --> Update --> Converge
Update -->|"not converged"| Assign
subgraph Distributed K-Means (Spark MLlib)
Partitions["Partitions: data distributed across executors"]
LocalSum["Local aggregation:\neach partition sums points per cluster\n→ (cluster_id, sum_vector, count)"]
GlobalReduce["Driver reduces partial sums:\nμ_k = Σ(local_sum_k) / Σ(local_count_k)"]
Broadcast["Broadcast new centroids to all executors"]
Partitions --> LocalSum --> GlobalReduce --> Broadcast --> LocalSum
end
9. Association Rule Mining — Confidence & Lift¶
flowchart LR
subgraph Rule Metrics
Sup["Support(A→B) = P(A∪B)\nfraction of transactions containing both A and B\n= count(A∪B) / |D|"]
Conf["Confidence(A→B) = P(B|A)\n= support(A∪B) / support(A)\n= P(A∪B) / P(A)"]
Lift["Lift(A→B) = confidence(A→B) / support(B)\n= P(A∪B) / (P(A)·P(B))\nLift>1: positive correlation\nLift=1: independent\nLift<1: negative correlation"]
Conv["Conviction(A→B) = (1-support(B)) / (1-confidence(A→B))\n→ measures departure from independence\nConv=∞ if conf=1 (perfect implication)"]
end
subgraph Rule Generation from Frequent Itemsets
FreqItemsets["Frequent itemsets L_k\n(already computed by Apriori/FP-Growth)"]
GenRules["For each L_k = {i1,...,ik}:\n generate all non-empty subsets s\n if conf(s → L_k - s) ≥ minConf:\n output rule s → L_k - s"]
AntiMono["Anti-monotone pruning on rules:\nif s → L_k-s has low conf\nall subsets of s also have lower conf\n→ prune descendants in rule lattice"]
FreqItemsets --> GenRules --> AntiMono
end
10. Streaming Data — Reservoir Sampling & Count-Min Sketch¶
flowchart TD
subgraph Reservoir Sampling (Vitter's Algorithm R)
Stream["Infinite stream x_1, x_2, ..., x_n, ..."]
Reservoir["Reservoir R[1..k]\n(k randomly selected items)"]
Process["For each new item x_i (i > k):\n j = random(1, i)\n if j ≤ k:\n R[j] = x_i (replace with prob k/i)\n→ each item has equal probability k/i of being in reservoir\n→ uniform random sample without knowing stream length"]
Stream --> Reservoir --> Process
end
subgraph Count-Min Sketch (Frequency Estimation)
Sketch["Data structure:\nw×d array of counters\n(e.g. w=2000, d=5)"]
Hash["d independent hash functions h_1,...,h_d\n(pairwise independent)"]
Update["Update(x, count):\nfor i in 1..d:\n sketch[i][h_i(x)] += count"]
Query["Query(x):\nreturn min_i sketch[i][h_i(x)]\n→ overestimate (hash collisions add to count)\nbut never underestimate\nError: ε = e/w, δ = e^(-d)\n(ε-approximate with prob 1-δ)"]
Sketch --> Hash --> Update & Query
end
11. HDFS Write Path — 3× Replication Pipeline¶
sequenceDiagram
participant Client
participant NN as NameNode
participant DN1 as DataNode 1
participant DN2 as DataNode 2
participant DN3 as DataNode 3
Client->>NN: create file (/data/file.txt)
NN-->>Client: block locations: [DN1, DN2, DN3]
Client->>DN1: writeBlock (64KB packets)
DN1->>DN2: pipeline forward
DN2->>DN3: pipeline forward
DN3-->>DN2: ACK
DN2-->>DN1: ACK
DN1-->>Client: ACK (packet acknowledged)
Note over Client,DN3: repeat for each 64KB packet
Client->>NN: addBlock (when block full, 128MB)
NN->>NN: record block locations in metadata
Client->>NN: complete (file closed)
Note over NN: blockReport from DNs verifies replication
12. Data Warehouse — Star Schema & Query Optimization¶
flowchart LR
subgraph Star Schema
Fact["Fact Table (sales_fact)\norder_id, date_id, customer_id, product_id\nquantity, revenue\n(100M+ rows, clustered on date_id)"]
DimDate["Dim_Date\ndate_id, year, quarter, month, day_of_week\n(365 rows/year)"]
DimCustomer["Dim_Customer\ncustomer_id, name, country, segment\n(1M rows)"]
DimProduct["Dim_Product\nproduct_id, category, brand, price_tier\n(100K rows)"]
Fact --- DimDate & DimCustomer & DimProduct
end
subgraph OLAP Query Optimization
StarJoin["Star Join optimization:\nbuild hash tables for small dimensions in memory\nprobe fact table row-by-row\n→ Bitmap join index: precompute dim FK bitmaps\n→ AND bitmaps → only access matching fact rows"]
Partition["Partition pruning:\nWHERE date_id BETWEEN 2024_01 AND 2024_03\n→ skip 75% of partitions\n→ scan only Q1 partitions"]
ColScan["Column scan (Parquet):\nSELECT SUM(revenue) GROUP BY country\n→ read only revenue + customer_id columns\n+ join with dim_customer for country\n→ skip 8/10 fact table columns"]
end
13. PageRank — Iterative Graph Computation¶
flowchart TD
subgraph PageRank Algorithm
Init["Initialize: PR(v) = 1/N for all vertices"]
Iter["Each iteration:\nPR(v) = (1-d)/N + d × Σ_{u→v} PR(u)/out_degree(u)\nd = damping factor = 0.85\n(random surfer: 85% follow link, 15% teleport)"]
Conv2["Converge when:\n‖PR_{t+1} - PR_t‖_1 < ε"]
end
Init --> Iter --> Conv2
Iter -->|"not converged"| Iter
subgraph Spark GraphX Implementation
RDD_V["RDD[VertexId, PRValue]\n(partitioned by VertexId hash)"]
RDD_E["RDD[EdgeTriplet]\n(src, dst, srcPR, outDegree)"]
SendMsg["sendMsg: edge → (dst, src_PR/out_degree)"]
Merge["mergeMsg: sum incoming messages per vertex"]
Update["updateVertex: apply PageRank formula\n+ add dangling node mass"]
RDD_V --> RDD_E --> SendMsg --> Merge --> Update --> RDD_V
end
14. Performance Numbers¶
block-beta
columns 2
block:hadoop_perf["Hadoop MapReduce"]:1
h1["Map task startup: ~1–2s (JVM launch)"]
h2["Shuffle I/O: bottleneck for large jobs"]
h3["Sort buffer: io.sort.mb=100–512MB per task"]
h4["Typical job: minutes to hours"]
end
block:spark_perf["Apache Spark"]:1
s1["Task startup: ~10ms (JVM reuse)"]
s2["In-memory cache: 10–100× faster than disk"]
s3["Shuffle: ~50–200ms for 100M rows"]
s4["Tungsten off-heap: 2–5× less memory than Java objs"]
end
block:lsm_perf["LSM-Tree (RocksDB)"]:1
l1["Write: ~100K–1M ops/sec (sequential WAL)"]
l2["Point read: ~100–500K ops/sec"]
l3["Compaction write amp: 10–30× leveled"]
l4["Bloom filter FPR: 1% at 10 bits/key"]
end
block:col_perf["Column Store"]:1
c1["Scan 1B rows, 1 column: ~1–10s (Parquet+Spark)"]
c2["Dictionary encoding: 10× compression for strings"]
c3["SIMD vectorized filter: 50–200M rows/sec per core"]
c4["Late materialization: 5–10× vs row scan"]
end
Key Takeaways¶
- MapReduce shuffle is the dominant I/O cost — combiner reduces network bytes by doing local pre-aggregation; spill files get merge-sorted using an external k-way merge before Reduce
- Spark DAG splits at shuffle boundaries into Stages — tasks within a stage are fully pipelined (no serialization between operators); Tungsten off-heap rows avoid GC pauses
- LSM-Tree achieves high write throughput by converting random writes into sequential I/O (MemTable → WAL → sorted SSTable files), at the cost of read amplification (must check multiple levels)
- Bloom filters in SSTables make point-query misses O(1) I/O instead of O(levels) — 10 bits/key gives ~1% false positive rate
- Column store + dictionary encoding is most effective on low-cardinality columns (country, status, category); delta encoding on monotonically increasing timestamps; RLE on repeated values
- FP-Growth avoids the exponential candidate generation of Apriori by building a compressed prefix tree and mining conditional databases recursively — much faster for dense datasets
- Flink checkpoints use Chandy-Lamport barrier injection — barriers flow through the dataflow graph; operators snapshot their state when barriers from ALL inputs arrive (aligned checkpointing)