Inside the Shuffle

Spark engineers who tune by knowing rather than guessing can describe what physically happens inside a shuffle. You already know a wide operation moves data across the network. This tier opens the shuffle up: the write side that stages data to disk, the read side that fetches it across the cluster, the spill that happens when memory runs out, and the one knob that controls how the whole thing is sliced. Underneath all of it sits one shape: a shuffle is two halves, a write and a read, with disk and the network in between.

The Shuffle Write: Staging Data by Key

Daily Life
Interviews
A shuffle has two halves, and the first is the write, which happens on the map side, the executors that hold the input data. When a wide operation runs, each of these executors takes its local partition and sorts the rows into buckets, one bucket for each destination partition, based on the key you are grouping or joining by. A row for region EU goes in the EU bucket; a row for APAC goes in the APAC bucket. That bucketing by key is the write.
The executor does not send these buckets immediately. It writes them to its own local disk first, as shuffle files. This staging to disk is deliberate: it means the data survives even if the receiving side is not ready yet, and it lets the fetch happen on the reduce side's schedule. It also means the write half of every shuffle pays a full disk write of the data being shuffled, one of the three costs from the beginner tier made concrete.
How does the executor decide which bucket a row belongs in? It applies a partitioner to the key, and for a hash partitioner that means computing hash(key) mod R, where R is the number of reduce partitions. A row with region EU hashes to some number between 0 and R minus 1, and that number is its destination bucket. Every executor uses the identical function, which is what guarantees that all EU rows across the whole cluster compute the same destination and therefore converge on the same reduce task. This is also the seed of data skew: if one key is far more common than the others, its bucket is huge on every map side and the single reduce task that owns it receives far more than its share, a failure mode the advanced tier returns to.
Take orders.groupBy("region") on an executor holding a partition of fifty thousand mixed rows. The task walks those rows once, hashing each region value to a bucket, and accumulates the rows destined for each reduce partition. When the partition is exhausted, it sorts the buffered records by their target partition id and flushes them to a single shuffle data file, writing an index alongside that records where each bucket's bytes begin. If the buffer fills before the partition is done, it spills a sorted run to disk and merges later, which is the write-side spill the next section discusses. The output of the task is bytes on local disk plus a note to the driver saying where they are.
Shuffle write stepWhat the executor doesThe cost
Partition by keySort local rows into per-destination bucketsCPU and memory for the sort
Write to local diskPersist the buckets as shuffle filesA full disk write of the data
Register the filesTell the driver where the buckets liveMetadata the reduce side will read
The map side, then, ends each task by leaving a set of sorted, bucketed files on local disk, one logical bucket per downstream partition, and telling the driver where they are. At this point no data has crossed the network yet; it has only been staged. The network movement is the job of the second half, the read. Understanding that the write stages to disk first explains why a shuffle touches disk even when you have plenty of memory, and why disk speed matters to shuffle performance.
This staging is also why a shuffle is a hard boundary for fault tolerance, not just for scheduling. Because the map outputs are durable files on disk rather than a transient stream, Spark can lose a reduce task and re-run just that task by re-fetching the same buckets, without recomputing the map side at all. The shuffle files are effectively a checkpoint of the map stage. That durability is the upside of paying the disk write: you bought restartability with it. A streaming shuffle that kept everything in flight would be faster when nothing failed and catastrophic the moment anything did, which on a cluster of thousands of tasks is often rather than rarely.

The Shuffle Read: Fetching Across the Network

Daily Life
Interviews
The second half of a shuffle is the read, which happens on the reduce side, the executors that will run the stage after the shuffle. Each reduce task is responsible for one output partition, say all the rows for region EU, and to assemble it, that task has to fetch its bucket from every map executor that wrote one. So a single reduce task reaches out across the network to many machines, pulls down each one's EU bucket, and combines them into the complete EU partition.
This is where the all-to-all movement from the beginner tier physically happens. With M map tasks and R reduce tasks, the network sees up to M times R fetches, every reduce task pulling from every map output. This fan-out is why a shuffle saturates the network: it is not one big transfer but a dense mesh of many transfers happening at once. The data that was staged to disk on the write side now streams across the cluster to wherever its key belongs.
Walk one reduce task through it. The task responsible for the EU partition asks the driver which map outputs hold EU bytes, gets back a list of every map task that wrote one, and opens fetch connections to those executors. Each remote executor seeks to the EU offset in its shuffle data file using the index, reads just that slice, and streams it back. The reduce task receives these slices, often many in flight at once to keep the network busy, and as the EU rows arrive it feeds them into the aggregation or the join build. Reads from a map output that happens to live on the same node are local disk reads rather than network fetches, which is why locality still matters even in the middle of a shuffle.
M x R fetches
the shuffle read fan-out
Only after a reduce task has fetched all of its buckets and combined them does it have a complete partition to work on, and only then can the post-shuffle operation, the aggregation or the join, actually run. This is the synchronisation barrier in physical form: the reduce stage cannot make progress on a partition until every map output for that partition has arrived. A slow or dead map executor holds up every reduce task waiting on its buckets.
The fan-out also explains why both task counts matter, not just the reduce count people usually tune. Too few map tasks and each writes a huge bucket, so the per-bucket fetch is large and slow to seek through; too many map tasks and the reduce side opens a connection to each of thousands of map outputs, paying connection setup and many tiny reads. The number of input partitions, set when the data was read or last repartitioned, is therefore as real a lever on shuffle performance as spark.sql.shuffle.partitions, even though it gets far less attention. A balanced shuffle wants both sides sized so that each transfer is large enough to amortise its overhead but small enough to fit in memory.

The two halves around one wide op

The demo below has a single wide operation, so it has exactly one shuffle, one write half and one read half. As you run it, hold the full picture: each executor sorts its orders into region buckets and writes them to disk (the write), then the reduce tasks fetch each region's buckets from every executor and sum them (the read).
(orders
   .filter(F.col("status") == "Completed")
   .groupBy("region")
   .agg(F.sum("profit").alias("total_profit"))
   .orderBy(F.col("total_profit").desc()))
In the Spark UI, this shuffle shows up as two numbers on the stages involved: shuffle write on the map stage, the bytes staged to disk, and shuffle read on the reduce stage, the bytes pulled across the network. When you debug a slow job, those two numbers tell you how much data the shuffle moved, and a large shuffle read is one of the clearest signals that a wide operation is your bottleneck.
Those two numbers also come up in interviews. Shuffle write and shuffle read for a single shuffle should be roughly equal, because the bytes written on the map side are the bytes fetched on the reduce side. If you see them diverge wildly, you are usually looking at two different shuffles attributed to adjacent stages, or at compression changing the on-disk size relative to the in-flight size. The number that matters most for diagnosing a bottleneck is shuffle read on the reduce stage, because that is the data the stage had to drag across the network before it could do any of its real work, and it is almost always the part of the timeline where a slow stage is actually spending its minutes.

Spill: When the Shuffle Runs Out of Memory

Daily Life
Interviews
Both halves of a shuffle want to work in memory, but memory is finite, and when a task needs more than it has, it spills to disk. Spill is a word to recognise in shuffle tuning, because any spill signals a task fighting for memory, and disk is far slower than the memory it wanted. A job that spills is doing extra disk work it would not have to do if its partitions were sized to fit.
Spill happens most visibly during the sort on the write side and the combine on the read side. If a reduce task fetches more data for its partition than fits in its execution memory, it cannot hold it all to aggregate, so it writes some out to disk, processes what fits, and reads the spilled portion back later. The result is correct, but the task has now paid to write and re-read data that it hoped to keep in memory. Heavy spill can multiply a shuffle's cost several times over.
It pays to distinguish the two spill numbers the UI reports, because they are not the same quantity. Memory spill is the size the spilled records occupied in memory before they were evicted; disk spill is the size they took once serialised and compressed onto disk, which is usually much smaller. People panic at a large memory-spill figure and miss that the disk spill, the actual extra I/O, is what costs time. Either way, any spill at all is the signal: the task wanted more execution memory than it had. The spilled bytes had to make a round trip to disk that a correctly sized partition would have avoided entirely.
Put numbers on the magnitude. Suppose a reduce task is handed a five-gigabyte post-shuffle partition but has two gigabytes of execution memory. It can hold two gigabytes, so it processes that, spills it, fetches and processes the next two, spills again, then the last gigabyte, and finally merges all the spilled runs together. That is roughly three extra disk writes and three extra reads of multi-gigabyte data on top of the shuffle fetch the task already paid for. The same aggregation over partitions sized to fit, say forty partitions of a couple hundred megabytes each, would have done all of its work in memory with zero spill. Same data, different partition sizing, and that alone turns a spilling task into a clean one.
No spill (healthy)
  • Partition fits in execution memory
  • Sort and combine happen in memory
  • Disk is touched only for the staged buckets
  • The shuffle runs at expected speed
Spill (memory pressure)
  • Partition is too big for memory
  • Data is written out and read back mid-task
  • Extra disk I/O on top of the shuffle
  • The shuffle runs several times slower
TIP
In the Spark UI, any nonzero spill, memory or disk, on a shuffle stage is a red flag worth investigating. The senior reading is: "this stage spilled, so its partitions are too large for the memory it has; I would either raise the partition count so each one is smaller or give the executors more memory." You size spill away; you rarely accept it.
The cause of spill is almost always a partition that is too big, which ties directly to the next section: the number of partitions a shuffle produces is a knob, and setting it wrong is the most common reason a shuffle spills. If the data is divided into too few partitions after the shuffle, each one is large and each reduce task is more likely to overflow its memory. The fix is usually more, smaller partitions, not more hardware.

The 200 Knob: spark.sql.shuffle.partitions

Daily Life
Interviews
Every shuffle produces a number of output partitions, and that number is controlled by a single configuration value: spark.sql.shuffle.partitions. Its default is 200, which means that unless you change it, every shuffle in your job divides its output into exactly 200 partitions, regardless of how much data you have. This one default is behind a surprising share of Spark performance problems, in both directions.
Think about what 200 means at different scales. If you are shuffling a few hundred megabytes, 200 partitions means each one is tiny, a couple of megabytes, and you have spun up 200 reduce tasks each doing almost nothing, paying scheduler overhead for trivial work. If you are shuffling a terabyte, 200 partitions means each one is five gigabytes, far too large to fit in a reduce task's memory, so every one of them spills. The default fits a fairly narrow middle band and is wrong on either side of it.
The trap is that 200 stays fixed while data volume grows, so the same job that was tuned perfectly last quarter is mis-sized this quarter simply because the table grew. A pipeline written when order_items held a hundred gigabytes might have been fine at 200 partitions; two years and a tenfold data growth later, those same 200 partitions are ten times too large and the nightly job has quietly started spilling and slipping its window. Nobody changed the code. This is why the durable habit is to size the count from the current data volume, ideally computed rather than hardcoded, instead of picking a number once and trusting it to stay right as the data underneath it moves.
200
the default shuffle partition count
The too-many side is more insidious than it sounds, because it does not spill or crash; it just runs slowly for no obvious reason. Two hundred reduce tasks each handling two megabytes means two hundred task launches, two hundred sets of fetch connections, two hundred result files, and two hundred entries in every scheduler and UI table, all to do a few seconds of real arithmetic. The fixed per-task overhead, scheduling, serialisation of the task itself, and bookkeeping, starts to dominate the actual work. A job that grouped a small dimension and inexplicably took thirty seconds is often a job that split a trivial shuffle into two hundred near-empty partitions and paid overhead two hundred times.
The sizing rule is the same one from the partitioning lesson, applied to the shuffle output: aim for partitions of roughly 128 megabytes. Take the size of the data being shuffled, divide by 128 megabytes, and set spark.sql.shuffle.partitions to something near that. For a terabyte shuffle that is around 8000 partitions, not 200. For a hundred-megabyte shuffle it is closer to 1. The number 200 is a starting guess that Spark cannot tailor to data it has not seen, the gap that adaptive query execution later fills automatically.

Where the knob bites

The fill-in below is an aggregation that produces a shuffle. The operations are familiar; the point to carry away is that whatever this shuffle produces gets divided into spark.sql.shuffle.partitions partitions, and on real data you would size that number to the data rather than leave it at 200. Complete the grouping and the aggregation.

> From order_items, compute total quantity sold per product_id, highest first. The grouping triggers a shuffle whose output is split into the configured partition count, which you would size to the data.

(order_items
   .___("product_id")
   .agg(F.___("quantity").alias("units"))
   .orderBy(F.col("units").desc(), F.col("product_id").asc()))
groupBy
sum
count
avg
On the small seed data this shuffle is trivial, but on a real order_items table of billions of rows, the 200 default would give you oversized partitions that spill, and the fix would be to raise the partition count until each one is around 128 megabytes. Knowing this knob exists, and that its default is a guess, is half of shuffle tuning.
The reason the default survives is that modern Spark can often fix it for you with adaptive query execution, which is on by default in recent versions. AQE looks at the actual size of the shuffle once the map side has written it, then coalesces the configured partitions down to a sensible count for the data it really saw, so two hundred near-empty partitions get merged into a handful. This is genuinely useful, but it does not retire the knob from interviews or from older clusters, and it can only coalesce, not split, so a configured count that is too low still leaves you with oversized partitions. The durable lesson is the sizing target of around 128 megabytes per partition; AQE is one mechanism that aims at it, and knowing the target lets you sanity-check whatever AQE decided.

Why the Shuffle Dominates Runtime

Daily Life
Interviews
Pulling the mechanics together explains the central fact of Spark performance: the shuffle is almost always where the time goes. It is not one cost but a stack of them, each individually expensive, all paid at once for potentially the entire dataset. Narrow work, by comparison, is CPU on data already in memory. The two are not in the same league, and a job's runtime is usually dominated by its shuffles even when the narrow work looks like the bulk of the code.
Cost in a shuffleWhy it is slowWhen it gets worse
Disk write (map side)Disk is far slower than memoryLarge shuffle volume
Network transferBandwidth is shared and limitedM x R fan-out on a big cluster
SerialisationEncoding and decoding every rowMany small rows, fat schemas
SpillRe-writing and re-reading mid-taskPartitions too big for memory
Barrier waitEveryone waits for the slowest taskSkew, one giant partition
Notice that several of those costs compound with each other. A shuffle with too few partitions has large partitions, which spill, which adds disk I/O, and the large partitions also make skew more likely, which makes the barrier wait worse. Tuning the partition count well can relieve several of these at once, which is why it is the first knob to reach for. A job that avoids a shuffle entirely sidesteps the whole stack, which is why shuffle elimination, the subject of the advanced tier, is the most powerful optimisation available.
There is a corollary that explains a lot of confusing profiling. Because the shuffle dominates, optimising the narrow work around it usually does nothing measurable. An engineer who rewrites a withColumn expression to shave CPU, or who agonises over a select, on a job whose runtime is ninety percent shuffle, is polishing the ten percent that was never the problem. The right first question on any slow Spark job is how many shuffles there are and how much data each one moves. If you find a single stage with a multi-gigabyte shuffle read and spill, you have found the runtime, and every minute spent elsewhere is a minute spent in the wrong place.
The mental model to leave this tier with is a budget. Every shuffle in your job is a line item, and the bill is disk plus network plus serialisation times the data moved. You reduce the bill three ways: move less data into each shuffle by filtering and pre-aggregating first, size the partitions so nothing spills, or remove a shuffle altogether by restructuring the job. The advanced tier is about that third option, the most powerful one, and about the internals that make the first two possible.
One ordering principle ties the whole budget together: do narrow work before wide work whenever the result allows it. A filter, a column prune, or a partial aggregation placed before a shuffle is paid at the cheap per-partition rate and shrinks the data the expensive shuffle then has to carry. The same operations placed after the shuffle are paid on data that already crossed the network for nothing. This is why query optimisers, and experienced engineers writing by hand, push filters and projections down toward the source: every byte you drop before a wide operation is a byte you do not write to disk, send over the network, and read back.
Do
  • Read shuffle write and shuffle read in the UI to see how much data a shuffle moves.
  • Treat any spill as a sizing problem: raise the partition count or the memory.
  • Size spark.sql.shuffle.partitions to roughly data over 128MB, not the 200 default.
  • Filter and pre-aggregate before a wide operation so the shuffle moves less.
Don't
  • Don't accept the 200 default on large or tiny data; it is a guess that fits a narrow band.
  • Don't ignore spill; it multiplies a shuffle's cost with avoidable disk I/O.
  • Don't forget the shuffle is a barrier; one giant partition stalls the whole stage.
  • Don't optimise narrow work to save a job that is dominated by a shuffle.
PUTTING IT ALL TOGETHER

> A nightly aggregation that shuffles a large fact table has started spilling to disk and missing its window. The Spark UI shows one stage with high shuffle read and nonzero disk spill, running on the default 200 shuffle partitions.

The high shuffle read tells you the wide operation is moving a lot of data across the network.
The disk spill says the post-shuffle partitions are too large for the reduce tasks' memory.
With 200 partitions over a large fact table, each partition is oversized, which is the direct cause of the spill.
You raise spark.sql.shuffle.partitions so each partition lands near 128MB, the spill disappears, and the stage fits its window.
KEY TAKEAWAYS
A shuffle has two halves: the write stages sorted buckets to local disk by key.
The read fetches each partition's buckets from every map output, an M x R network fan-out.
Spill is a memory-pressure symptom: a partition too big to hold spills to disk and slows the shuffle.
spark.sql.shuffle.partitions defaults to 200; size it to roughly data over 128MB instead.
Shuffle costs (disk, network, serialisation, spill, barrier) stack and compound, dominating runtime.

Two halves, a write and a read, with disk and the network in between.

Category
Spark
Difficulty
intermediate
Duration
14 minutes
Challenges
2 hands-on challenges

Topics covered: The Shuffle Write: Staging Data by Key, The Shuffle Read: Fetching Across the Network, Spill: When the Shuffle Runs Out of Memory, The 200 Knob: spark.sql.shuffle.partitions, Why the Shuffle Dominates Runtime

Lesson Sections

  1. The Shuffle Write: Staging Data by Key

    A shuffle has two halves, and the first is the write, which happens on the map side, the executors that hold the input data. When a wide operation runs, each of these executors takes its local partition and sorts the rows into buckets, one bucket for each destination partition, based on the key you are grouping or joining by. A row for region EU goes in the EU bucket; a row for APAC goes in the APAC bucket. That bucketing by key is the write. The executor does not send these buckets immediately.

  2. The Shuffle Read: Fetching Across the Network

    The second half of a shuffle is the read, which happens on the reduce side, the executors that will run the stage after the shuffle. Each reduce task is responsible for one output partition, say all the rows for region EU, and to assemble it, that task has to fetch its bucket from every map executor that wrote one. So a single reduce task reaches out across the network to many machines, pulls down each one's EU bucket, and combines them into the complete EU partition. This is where the all-to-al

  3. Spill: When the Shuffle Runs Out of Memory

    Both halves of a shuffle want to work in memory, but memory is finite, and when a task needs more than it has, it spills to disk. Spill is a word to recognise in shuffle tuning, because any spill signals a task fighting for memory, and disk is far slower than the memory it wanted. A job that spills is doing extra disk work it would not have to do if its partitions were sized to fit. Spill happens most visibly during the sort on the write side and the combine on the read side. If a reduce task fe

  4. The 200 Knob: spark.sql.shuffle.partitions

    Every shuffle produces a number of output partitions, and that number is controlled by a single configuration value: spark.sql.shuffle.partitions. Its default is 200, which means that unless you change it, every shuffle in your job divides its output into exactly 200 partitions, regardless of how much data you have. This one default is behind a surprising share of Spark performance problems, in both directions. Think about what 200 means at different scales. If you are shuffling a few hundred me

  5. Why the Shuffle Dominates Runtime

    Pulling the mechanics together explains the central fact of Spark performance: the shuffle is almost always where the time goes. It is not one cost but a stack of them, each individually expensive, all paid at once for potentially the entire dataset. Narrow work, by comparison, is CPU on data already in memory. The two are not in the same league, and a job's runtime is usually dominated by its shuffles even when the narrow work looks like the bulk of the code. Notice that several of those costs