# The DAG Executor

> Wire up a mini pipeline and watch it run.

Canonical URL: <https://datadriven.io/problems/the_dag_executor>

Domain: Python · Difficulty: hard · Seniority: L6

## Problem

Given a task dict mapping task names to {'deps': [dep_names], 'func': lambda_source_string}, topologically order the tasks. For each task, evaluate its lambda by passing a dict {dep_name: result_of_dep} as 'inputs'. Return a dict of task_name -> result. Raise an exception if a cycle exists.

## Worked solution and explanation

### Why this problem exists in real interviews

This tests **topological sort with task execution**, the core of every DAG-based pipeline orchestrator (Airflow, dbt, Dagster). It probes graph traversal, dependency resolution, and output propagation between tasks.

> **Trick to Solving**
>
> Use Kahn's algorithm (BFS topological sort) with an in-degree map. Process tasks with zero in-degree first, passing their outputs to dependents. This naturally handles execution ordering and output forwarding.

---

### Break down the requirements

#### Step 1: Build an adjacency list and in-degree map

Parse the dependency graph to know which tasks depend on which.

#### Step 2: Initialize a queue with zero-dependency tasks

These can run immediately since they have no prerequisites.

#### Step 3: Execute tasks in topological order

For each task, call its function with the outputs of its dependencies, store the result, and decrement the in-degree of downstream tasks.

#### Step 4: Return all task outputs

Collect results in a dict mapping task name to output.

---

### The solution

**Kahn's BFS with output propagation**

```python
from collections import deque
def execute_dag(tasks: dict) -> dict:
    in_degree = {}
    dependents = {}
    for name in tasks:
        in_degree[name] = len(tasks[name].get('deps', []))
        dependents[name] = []
    for name in tasks:
        for dep in tasks[name].get('deps', []):
            dependents[dep].append(name)
    queue = deque()
    for name in in_degree:
        if in_degree[name] == 0:
            queue.append(name)
    outputs = {}
    while queue:
        name = queue.popleft()
        task = tasks[name]
        dep_outputs = {}
        for dep in task.get('deps', []):
            dep_outputs[dep] = outputs[dep]
        outputs[name] = task['fn'](dep_outputs)
        for downstream in dependents[name]:
            in_degree[downstream] -= 1
            if in_degree[downstream] == 0:
                queue.append(downstream)
    return outputs
```

> **Time and Space Complexity**
>
> **Time:** O(V + E) where V is the number of tasks and E is the total number of dependency edges.
> 
> **Space:** O(V + E) for the adjacency list, in-degree map, and output storage.

> **Interviewers Watch For**
>
> Whether you detect cycles. If the queue empties before all tasks are processed, the graph has a cycle. Strong candidates add a check for this.

> **Common Pitfall**
>
> Executing tasks in arbitrary order without respecting dependencies. This causes `KeyError` when a task tries to read an output that has not been produced yet.

---

## Common follow-up questions

- How would you detect and report cycles? _(Tests comparing the count of executed tasks to the total task count.)_
- How would you parallelize independent tasks? _(Tests identifying tasks with zero in-degree simultaneously and using thread/process pools.)_
- What if a task fails and you need to retry? _(Tests retry logic, dead-letter handling, and partial DAG re-execution.)_
- How does Airflow handle this differently? _(Tests knowledge of scheduler architecture, XCom for output passing, and backfill semantics.)_
- What if the DAG has thousands of tasks? _(Tests memory and scheduling efficiency at scale.)_

## Related

- [All practice problems](https://datadriven.io/problems)
- [Mock interview mode](https://datadriven.io/interview/the_dag_executor)
- [Python Interview Questions](https://datadriven.io/python-interview-questions)
- [Data Engineering Interview Prep Guide](https://datadriven.io/data-engineer-interview-prep)
- [Daily Challenge](https://datadriven.io/daily)

---

Source: DataDriven (https://datadriven.io). 100% free data engineering interview prep. Live code execution against Postgres 16, Python 3.11, and Spark sandboxes. No paywall, no premium tier, no signup gate.