# The Hours They Stayed

Canonical URL: <https://datadriven.io/problems/the-hours-they-stayed>

Domain: PySpark · Difficulty: medium · Seniority: junior

## Problem

We run a streaming media platform and the content team wants to know which formats are holding the most audience time. Find the content types with more than 10 unique viewers, reporting the total watch time per type from most to least.

## Worked solution and explanation

### What this problem is really testing

The business words say 'which formats hold attention', but underneath this is a fact-to-dimension join feeding a grouped aggregation with a post-aggregation threshold. The watch events live in content_views (the fact, one row per play), the content_type label lives in content_items (the dimension, one row per title). Two things separate a clean answer from a messy one: joining on content_id at the right grain so you do not double-count watch time, and applying the 'more than 10 viewers' rule against the per-type distinct count (HAVING semantics) rather than as a row filter. The scaling instinct an interviewer listens for on top of that: content_items is tiny, so the smart physical plan broadcasts it and the huge fact table never shuffles across the cluster.

---

### Break down the requirements

#### Step 1: Attach the content_type label

watch_seconds and user_id live in content_views; content_type lives in content_items. Join on content_id so every view row picks up its format label. content_items is the small side, so in production you broadcast it (see the comparison below); the result is identical to a default join, just without dragging the fact table over the network.

#### Step 2: Collapse to one row per type

groupBy content_type, then sum watch_seconds for total audience time and count the distinct user_id for unique viewers. Both aggregates come out of the same pass. Materialize this as its own DataFrame so the next stage filters a finished result, not a half-built grouping.

#### Step 3: Filter on the aggregate, then order

Keep only types with more than 10 unique viewers. This is a post-aggregation filter (HAVING semantics) applied to the computed count on the already-aggregated DataFrame, not a row filter on raw events. Then sort by total watch time descending.

---

### The solution

**Aggregate first, then filter the count**

```python
from pyspark.sql import functions as F

views = spark.table("content_views")
items = spark.table("content_items")

per_type = (
    views
    .join(items, "content_id")
    .groupBy("content_type")
    .agg(
        F.sum("watch_seconds").alias("total_watch_seconds"),
        F.countDistinct("user_id").alias("unique_viewers"),
    )
)

result = (
    per_type
    .filter(F.col("unique_viewers") > 10)
    .orderBy(F.col("total_watch_seconds").desc())
)
```

*Aggregate into per_type first, then apply the threshold on top so the post-aggregation count exists before you filter on it.*

**Default join**

views.join(items, "content_id"). At ~40M view rows Spark plans a sort-merge join: it hashes and re-partitions every view row by content_id, shuffles both sides over the network, sorts each partition, then merges. The fact table moves in full before a single second of watch time is summed.

**Broadcast-optimized (what you ship)**

views.join(F.broadcast(items), "content_id"). content_items is a few thousand rows, so one tiny copy is shipped to every executor and the join becomes a map-side lookup. The fact table is read once and never reshuffled. Same rows out, dramatically less network.

> **Cost Analysis**
>
> At ~40M view rows, a default sort-merge join shuffles both sides: it hashes and re-partitions every view row by content_id across the network, sorts each partition, then merges. Broadcasting content_items (a few thousand rows) ships one tiny copy to every executor and turns the join into a map-side lookup, so the fact table is read once and never reshuffled. The only remaining wide step is the groupBy, where countDistinct still shuffles user_id by content_type; with a handful of content types that exchange is cheap. Net: one unavoidable aggregation shuffle instead of a full-fact join shuffle plus an aggregation shuffle.

> **Interviewers Watch For**
>
> Say out loud which side is the dimension and why you broadcast it, and name the broadcast threshold (spark.sql.autoBroadcastJoinThreshold, 10MB by default) so the interviewer knows you understand it can happen automatically but you would force it for predictability. Bonus points for noting that countDistinct is the one remaining wide transformation here.

> **Common Pitfall**
>
> Filtering unique_viewers > 10 with a .filter() BEFORE the groupBy. That reads as a row-level predicate and either errors or silently changes the result, because the per-type count does not exist yet. The threshold is a post-aggregation (HAVING) condition and must sit on the aggregated DataFrame, after .agg(). The second classic miss is collect()-ing the joined rows to the driver to count viewers in Python, which defeats the whole point of distributed aggregation.

---

## Common follow-up questions

- One content_type dominates 90% of views and that groupBy partition melts a single executor. How do you handle the skew? _(Tests salting, AQE skew-join/skew-agg handling, and recognizing data skew at the aggregation stage.)_
- If content_items grew to 5GB and could no longer be broadcast, how would the plan and cost change? _(Tests understanding of when broadcast stops being viable and sort-merge join becomes necessary.)_

## Related

- [All practice problems](https://datadriven.io/problems)
- [Mock interview mode](https://datadriven.io/interview/the-hours-they-stayed)
- [Data Engineering Interview Prep Guide](https://datadriven.io/data-engineer-interview-prep)
- [Daily Challenge](https://datadriven.io/daily)

---

Source: DataDriven (https://datadriven.io). 100% free data engineering interview prep. Live code execution against Postgres 16, Python 3.11, and Spark sandboxes. No paywall, no premium tier, no signup gate.