DR. ATABAK KH

Cloud Platform Modernization Architect specializing in transforming legacy systems into reliable, observable, and cost-efficient Cloud platforms.

Certified: Google Professional Cloud Architect, AWS Solutions Architect, MapR Cluster Administrator

how to design and tune Spark pipelines for high-volume lead, view, and sales event data; covering shuffle behavior, skew handling, and storage layout so pipelines stay correct and scalable.

Context: Platforms that combine lead/view events (e.g. product views, form submissions, campaign touchpoints) with sales/transaction data at scale (e.g. multi-TB per day) run on Spark (or similar) for ingestion and batch/streaming processing. The main challenges are not only “what to compute” but how Spark executes: shuffle volume, skew, and storage layout directly drive latency, cost, and reliability. This post explains how a resident big data architect approaches ingestion, partitioning, shuffle, skew, and storage so the platform meets freshness SLAs and stays operable.


Why Spark, and what breaks at scale

At multi-TB daily volume, ingestion and processing must be distributed. Spark gives a single programming model for batch and streaming (Structured Streaming), but shuffle, skew, and storage become the main levers. If you ignore them:

  • Shuffle: Wide transformations (joins, aggregations by new keys, distinct) trigger shuffle. Too many partitions -> many small shuffle writes and slow fetches; too few -> few huge tasks, OOM or stragglers. Wrong partition keys -> unnecessary shuffle or data skew.
  • Skew: A few keys (e.g. power users, hot products) get a disproportionate share of data. One or a few tasks do most of the work; the rest sit idle. Job latency is dictated by the slowest task.
  • Storage: Many small files (e.g. per-microbatch) cause slow listing and many small reads; huge files cause poor parallelism and spill. Partition layout and file size directly affect read performance and downstream jobs.

So the architect’s job is to design the data flow and key layout so that shuffle is minimized and balanced, skew is handled explicitly, and storage is optimized for the read patterns. The rest of this post is organized around those three themes.


Pipeline shape: ingestion, raw layer, and Spark processing

Ingestion
Events (lead, view, sales) land in a message bus (e.g. Kafka). A Spark Streaming (or connector) job reads from topics and writes to a raw layer (e.g. object storage or HDFS). Design choices here:

  • Partitioning on write: Partition by date (and optionally source/region) so that downstream batch jobs can read by date range and avoid full scans. Use a single partition key scheme (e.g. dt=yyyy-MM-dd) so that reprocessing and backfills are consistent.
  • Shuffle at ingestion: Keep ingestion narrow; no joins, no global aggregation. Each batch writes to its partition; that implies no shuffle in the ingestion job if you use the same partition key as the topic or a simple projection. If the connector repartitions by dt, that’s one controlled shuffle; avoid extra groupBy or joins here.

Raw -> curated (Spark batch)
A batch job (e.g. every 15-30 minutes or hourly) reads the raw layer, deduplicates, handles late arrivals, joins to dimensions, and writes the curated layer. This is where shuffle and skew show up:

  • Deduplication: Typically dropDuplicates(event_id) or a window + rank. This is a shuffle by the dedupe key. If the key is well distributed (e.g. UUID), shuffle is even; if the key is something like (user_id, date), you may get skew (see below).
  • Joins: Joining events to dimension tables (e.g. product, user) triggers shuffle (or broadcast). Dimension tables that fit in memory should be broadcast so you avoid shuffle for that side; large fact-fact or fact-dimension joins need a clear strategy for the partition key and skew.
  • Late arrivals: Reprocessing last N hours of raw data and merging into curated implies reading by partition (e.g. by dt and hour), so partition pruning works. The merge step (e.g. overwrite by partition or merge via a key) should align with the same partition key to avoid full-table shuffle.

So from an architect’s view: define the partition key for raw and curated once (e.g. dt, and optionally hour or source), and design every Spark job to respect that key so that shuffle is predictable and limited to the minimum necessary.


Shuffle: how much, and how to tune it

When shuffle happens
In Spark, shuffle is triggered by wide transformations: join, aggregate/groupBy with a new key, distinct, repartition, sortByKey, etc. Each of these writes a shuffle stage (map side) and reads it (reduce side). Shuffle is expensive: network, disk I/O, and serialization. So the architect’s goals are: (1) reduce shuffle where possible, (2) size partitions so that shuffle is balanced, and (3) avoid shuffle where a broadcast or partition-local op will do.

Reduce shuffle

  • Broadcast small dimension tables (e.g. product category lookup). Threshold is driver memory and executor memory; typically tables < 100-200 MB are safe. Use broadcast() in joins so the large table is not shuffled to match the small one; only the large side is partitioned.
  • Partition alignment: If the raw table is already partitioned by dt, and the curated table is also by dt, a “merge by dt” job can read and write by partition without shuffling on dt. Shuffle only on the keys you need for dedupe or join (e.g. event_id or user_id).
  • Avoid redundant groupBy: If you already have a grouped result from a previous step, don’t re-group by the same key in the next stage; chain transformations so that you don’t shuffle twice on the same key.

Size shuffle partitions

  • spark.sql.shuffle.partitions (default often 200) controls how many reduce-side tasks (and hence output partitions) you get after a shuffle. Too high -> many small tasks, overhead and small files; too low -> few large tasks, risk of OOM and skew. Rule of thumb: start with something like 2-4 × num_cores across the cluster, then adjust so that each task gets a few hundred MB to a few GB of shuffle read. For very large jobs, 500-2000 is common.
  • Repartition before shuffle-heavy writes: If the next stage is a write to object storage, repartition by the table partition key (e.g. dt, hour) so that each output partition corresponds to one directory; that avoids a separate shuffle in the next job when reading by partition.

Monitor shuffle

  • In Spark UI: shuffle read/write size per stage, and skew in task duration. If a few tasks take 10× longer than the median, you have skew (see next section). If shuffle read/write is huge relative to input size, look for redundant shuffles or suboptimal join order.

Skew: why it happens and how to handle it in Spark

Where skew comes from
Skew is uneven distribution of data per partition key. Common causes:

  • Join key: A few user_ids or product_ids have orders of magnitude more events than the rest. When you join on that key, one partition gets a huge amount of data.
  • GroupBy key: Same idea; e.g. “events per user” when 1% of users have 80% of events.
  • Deduplication key: If the key is correlated with time or source, some partitions can be much larger.

Effect
A few tasks run for a long time; others finish quickly. Job latency is dominated by the stragglers. You may also see OOM on those tasks or spill.

Handling skew in Spark

  1. Broadcast the skewed side (if small)
    If the “large” side of the join is actually a small set of hot keys (e.g. a list of 10K power users), you can filter them out, join the rest normally, and then join the hot keys via broadcast. So: “cold” data joins with even partitions; “hot” data is handled separately with a broadcast.

  2. Salting
    Add a random salt to the key on the big side so that the hot key is split across many partitions. Example: concat(user_id, "_", rand(0, 99)) so that one user_id is spread across 100 partitions. The small side is replicated (e.g. 100 copies) so that each salted partition can join. After the join, aggregate back by the original key if needed. This increases shuffle volume (replicated small side) but balances load.

  3. AQE skew join (Spark 3.x)
    Adaptive Query Execution can detect skew at runtime and split the skewed partition into multiple tasks. Enable spark.sql.adaptive.enabled and spark.sql.adaptive.skewJoin.enabled. AQE repartitions the skewed partition so that the work is spread. This is the least invasive: no salting logic in the code, but you need to run Spark 3 and tune AQE (e.g. skewJoinSkewedPartitionFactor, skewJoinPartitionFactor).

  4. Split hot and cold
    Identify hot keys (e.g. from a pre-aggregation or histogram), then: (a) process hot keys in a separate job with broadcast or salting, (b) process the rest in the main job. Union the results. This keeps the main job skew-free and confines special handling to a small subset.

In practice, a resident architect will enable AQE first, monitor task duration distribution; if skew remains, add salting or split hot/cold for the specific stages that show skew.


Storage optimization: partitioning, file size, and format

Partitioning

  • Raw and curated: Partition by date (dt=yyyy-MM-dd) at minimum. If downstream often filters by hour or source, add hour or source so that listing and reads only touch the needed directories. Avoid over-partitioning (e.g. by minute or by user_id) so you don’t get millions of tiny partitions and slow metadata/list operations.
  • Align with Spark reads: Use df.write.partitionBy("dt") (and optionally "hour") so that spark.read.parquet(path).filter("dt = '2023-09-15'") only reads that partition. No need to scan the whole table.

File size and count

  • Small files: Many small files (e.g. one per microbatch or per task) cause slow listing (object storage) and many small reads. Spark’s listing and planning overhead grows with file count. Target roughly 128 MB-512 MB per file (or per partition) for good throughput. If your write produces many small files, coalesce or repartition by the partition key before writing (e.g. df.repartition(N, "dt").write.partitionBy("dt").parquet(...) so that each dt has about N files).
  • Compaction: Run a separate compaction job (e.g. daily): read a partition (e.g. yesterday’s dt), coalesce to a target number of files, overwrite that partition. That keeps file count and size in a good range without changing the pipeline logic.
  • Repartition on write: For append or overwrite-by-partition, use repartition("dt", "hour") (or a fixed number per partition) so that the number of output files is under control. Avoid coalesce(1) for large partitions; that creates a single huge file and kills parallelism on read.

Format and compression

  • Parquet is the default choice: columnar, good compression, predicate pushdown (so filter(dt = ...) is pushed to the reader and only needed columns are read). Use Snappy (faster) or Zstd (better ratio, slightly more CPU). Schema evolution is supported if you use a table format (e.g. Delta, Iceberg) or strict schema discipline.
  • Block size: Parquet row group size affects read parallelism and pushdown. Defaults are often fine; for very wide tables, tuning row group size can help.

Idempotent overwrite
For “merge” or “reprocess last N hours,” use overwrite by partition (e.g. df.write.mode("overwrite").partitionBy("dt").save(path) with dynamic partition overwrite if your Spark/source supports it) so that you don’t leave duplicate or partial data. That way, storage stays consistent and reprocessing is safe.


Late arrivals and deduplication (Spark view)

Late arrivals

  • Streaming: Use event-time and watermarking so that late events within the watermark window are included; beyond that, they’re dropped (or sent to a side output for a batch backfill).
  • Batch: Reprocess a window (e.g. last 24-48 hours) by reading the raw partitions for that window, deduplicating and joining, then overwriting the corresponding curated partitions. That’s idempotent if the keys and logic are deterministic.

Deduplication

  • Use a stable key (e.g. event_id). dropDuplicates("event_id") triggers a shuffle by event_id. If event_id is uniform, shuffle is even. If you need “last event wins” per (user, date), that’s a window + rank; partition by (user, date), order by timestamp, take rank 1. Again, shuffle is by (user, date); watch for skew on high-activity users and apply the same skew tactics (AQE, salting, or split).

Summary: how a big data architect sees it

A large-scale lead/view/sales event platform on Spark rests on:

  • Clear partition strategy for raw and curated (e.g. dt, optional hour/source) so that jobs can read and write by partition, reprocess safely, and avoid unnecessary full scans.
  • Shuffle discipline: broadcast small dimensions; set spark.sql.shuffle.partitions and repartition by the table partition key before writing; avoid redundant shuffles; monitor shuffle read/write and task duration in Spark UI.
  • Skew handling: enable AQE skew join; where needed, use salting or split hot/cold so that no single key dominates a partition.
  • Storage optimization: partition by date (and optional hour/source); target 128-512 MB per file; use coalesce/repartition on write and compaction jobs to control file count; use Parquet with sensible compression; use overwrite-by-partition for idempotent merges.
  • Late arrivals and deduplication: event-time and watermark in streaming; batch reprocess with overwrite-by-partition and deterministic keys so that correctness and operability are maintained.

Getting these right; shuffle, skew, and storage; is what makes the difference between a pipeline that “works on a sample” and one that meets freshness SLAs and stays reliable at multi-TB scale. The same patterns apply to any Spark-based event and transaction platform.

This is a personal blog. The views, thoughts, and opinions expressed here are my own and do not represent, reflect, or constitute the views, policies, or positions of any employer, university, client, or organization I am associated with or have been associated with.

© Copyright 2017-2026