Async Python for Data Engineers: Advanced

Here's the senior async interview problem: 'You have a pipeline with 5 concurrent fetching tasks. One task hits an unrecoverable error. What happens to the other 4?' With asyncio.gather, the answer is: it depends on return_exceptions. They might keep running. They might leak. With asyncio.TaskGroup (Python 3.11+), the answer is: they are immediately and cleanly cancelled. Structured concurrency is the concept that makes async systems predictable. This lesson is about the patterns that make async reliable at scale.
Staff-level async DE interviews probe structured concurrency (TaskGroup, no task leaks), event loop internals (one per thread, cannot be nested), testing async code (pytest-asyncio, AsyncMock), and the full async vs threading vs multiprocessing vs Spark decision matrix. These are the patterns that separate engineers who write async pipelines from engineers who build async platforms that others maintain for years.

asyncio.TaskGroup: Structured Concurrency

asyncio.gather has a critical problem: task leaks. If one task raises an exception, gather raises the exception immediately — but the other tasks continue running in the background. They have no parent scope, no owner, no cancellation. They consume resources silently until they finish (or hang forever). asyncio.TaskGroup (Python 3.11+) solves this with structured concurrency: child tasks cannot outlive their parent scope.

Structured Concurrency: The Core Promise
In structured concurrency, a task's lifetime is bounded by the scope that created it. When the scope exits (normally or with an exception), all tasks in that scope are cancelled. No task can escape its parent scope. This is the same principle that makes function calls predictable: when a function returns, its local variables are gone. TaskGroup applies this principle to concurrent tasks.
asyncio.gather (old pattern)
  • All tasks start immediately in parallel
  • If one raises: gather raises, others keep running (leaked)
  • return_exceptions=True: exceptions become values, all run to completion
  • No automatic cancellation on failure
  • Task leaks: orphaned coroutines consuming resources
  • Available in all Python 3 versions
asyncio.TaskGroup (Python 3.11+)
  • All tasks start in the group, all cancelled if any fails
  • If one raises: ALL others are immediately cancelled
  • ExceptionGroup collects ALL exceptions that occurred
  • Structured: child tasks cannot outlive the group scope
  • No task leaks: scope exit guarantees cleanup
  • Preferred for all new async code on 3.11+
1import asyncio
2import httpx
3import logging
4
5logger = logging.getLogger(__name__)
6
7
8# --- Old pattern: gather with task leaks ---
9async def fetch_all_gather(urls: list[str]) -> list[dict]:
10 async with httpx.AsyncClient() as client:
11 tasks = [fetch_one(client, url) for url in urls]
12 # If task 2 raises: gather raises, tasks 3/4/5 continue in background
13 # Those orphaned tasks hold connections, consume memory, may trigger side effects
14 return await asyncio.gather(*tasks) # NOT recommended for new code
15
16
17# --- Modern pattern: TaskGroup with structured cancellation ---
18async def fetch_all_taskgroup(urls: list[str]) -> list[dict]:
19 results: list[dict] = []
20
21 async with httpx.AsyncClient() as client:
22 try:
23 async with asyncio.TaskGroup() as tg:
24 # Create tasks within the group
25 task_refs = [
26 tg.create_task(fetch_one(client, url))
27 for url in urls
28 ]
29 # If we reach here: ALL tasks completed successfully
30 results = [t.result() for t in task_refs]
31
32 except* httpx.HTTPStatusError as eg:
33 # except* handles ExceptionGroup from TaskGroup
34 # All other tasks were cancelled when the first raised
35 logger.error(f"HTTP errors in batch: {len(eg.exceptions)} failures")
36 for exc in eg.exceptions:
37 logger.error(f" {exc}")
38 raise
39 except* httpx.TimeoutException as eg:
40 logger.error(f"Timeouts in batch: {len(eg.exceptions)}")
41 raise
42
43 return results
44
45
46# TaskGroup + timeout: the complete deadline-aware concurrent fetch
47async def fetch_with_deadline(urls: list[str], deadline_seconds: float = 30.0) -> list[dict]:
48 try:
49 async with asyncio.timeout(deadline_seconds): # Wall-clock deadline
50 async with asyncio.TaskGroup() as tg: # Structured concurrency
51 sem = asyncio.Semaphore(20) # Rate limiting
52 task_refs = [
53 tg.create_task(fetch_with_semaphore(url, sem))
54 for url in urls
55 ]
56 return [t.result() for t in task_refs]
57 except TimeoutError:
58 raise RuntimeError(f"Batch of {len(urls)} URLs exceeded {deadline_seconds}s")
except* Is New Syntax (Python 3.11+)
TaskGroup raises ExceptionGroup when multiple tasks fail — a new exception type that wraps multiple exceptions. Handling it requires the new `except*` syntax (note the asterisk). `except* HTTPStatusError as eg` catches all HTTPStatusError exceptions from the group; eg.exceptions is a list of them. This is syntactically incompatible with Python 3.10 and earlier — you can't backport it.

Event Loop Internals: What Staff Candidates Know

Staff-level candidates know the event loop architecture well enough to debug problems that look like async issues but are actually event loop lifecycle issues. The three most common: nesting event loops in Jupyter/notebooks, running async code from a sync context, and creating tasks that outlive their loop.

1import asyncio
2
3# FACT: One event loop per thread
4# asyncio.run() creates a NEW loop, runs the coroutine, then CLOSES and DESTROYS the loop
5# After asyncio.run() returns, the loop is gone
6
7asyncio.run(main()) # Creates loop, runs main, destroys loop
8asyncio.run(main()) # Creates a SECOND loop, runs main, destroys it
9# These are independent executions with separate event loops
10
11
12# FACT: Cannot nest event loops
13import asyncio
14
15async def outer():
16 # This raises RuntimeError: This event loop is already running
17 asyncio.run(inner()) # WRONG: run() can't be called inside a running loop
18
19 # CORRECT: just await the coroutine -- you're already in an async context
20 await inner()
21
22
23# Why Jupyter notebooks need nest_asyncio:
24# Jupyter runs its own event loop to handle cell execution.
25# When you call asyncio.run() from a cell, it tries to create a nested loop --> RuntimeError
26# nest_asyncio patches asyncio to allow nesting
27# This is why `asyncio.run()` in Jupyter cells raises an error without nest_asyncio
28import nest_asyncio
29nest_asyncio.apply() # Jupyter-specific workaround -- never use in production code
30
31
32# FACT: asyncio.get_event_loop() is deprecated (3.10) and will raise/be removed (3.16)
33# OLD (deprecated):
34loop = asyncio.get_event_loop() # DeprecationWarning in 3.10+
35loop.run_until_complete(main()) # Deprecated pattern
36
37# CORRECT in scripts/CLI:
38asyncio.run(main()) # Creates, runs, destroys
39
40# CORRECT in async context (already inside a loop):
41result = await some_coroutine() # Just await it
42
43# CORRECT when you need the running loop from inside async code:
44loop = asyncio.get_event_loop() # In async context this still works (the running loop)
45# But the cleaner modern API:
46task = asyncio.create_task(some_coroutine()) # Creates task on the CURRENT running loop
47
48
49# Worker threads have NO event loop by default
50import threading
51
52def worker_thread_with_async():
53 # This thread has no event loop -- asyncio.run() creates one for it
54 result = asyncio.run(some_async_function())
55 return result
56
57t = threading.Thread(target=worker_thread_with_async)
58t.start()
59t.join()
The Staff-Level Event Loop Answer
'Each thread in Python has its own event loop (or no loop if asyncio.run() hasn't been called). asyncio.run() creates a loop, runs the coroutine, and destroys the loop when done. You cannot nest event loops — calling asyncio.run() inside a running loop raises RuntimeError. Worker threads start with no loop; asyncio.run() in the thread creates one. Jupyter notebooks need nest_asyncio because they maintain a persistent running loop.'

Testing Async Code: pytest-asyncio and AsyncMock

Async testing is a strong-hire signal in DE interviews. It shows you've maintained an async codebase, not just written async scripts. The key tools are pytest-asyncio for running async test functions, and AsyncMock for mocking async functions and clients.

1# pyproject.toml or pytest.ini:
2# [tool.pytest.ini_options]
3# asyncio_mode = "auto" # All async test functions automatically get the event loop
4# -- OR --
5# asyncio_mode = "strict" # Requires explicit @pytest.mark.asyncio decoration
6
7import pytest
8import asyncio
9from unittest.mock import AsyncMock, MagicMock, patch
10import httpx
11
12# The module under test:
13from mymodule.fetcher import fetch_all, fetch_one
14
15
16# Basic async test with pytest-asyncio
17@pytest.mark.asyncio
18async def test_fetch_one_success():
19 """Test fetch_one with a mock HTTP client."""
20 # AsyncMock: a mock that can be awaited
21 mock_response = MagicMock()
22 mock_response.json.return_value = {"id": 1, "value": 42}
23 mock_response.raise_for_status.return_value = None
24
25 mock_client = AsyncMock(spec=httpx.AsyncClient)
26 mock_client.get.return_value = mock_response
27
28 sem = asyncio.Semaphore(1)
29 result = await fetch_one(mock_client, "https://api.test/records/1", sem)
30
31 assert result["data"]["id"] == 1
32 mock_client.get.assert_awaited_once_with(
33 "https://api.test/records/1",
34 timeout=10.0
35 )
36
37
38@pytest.mark.asyncio
39async def test_fetch_all_handles_partial_failure():
40 """Test that fetch_all continues when some URLs fail."""
41 # Side effects: first call succeeds, second raises, third succeeds
42 mock_client = AsyncMock(spec=httpx.AsyncClient)
43 mock_client.get.side_effect = [
44 MagicMock(json=lambda: {"id": 1}, raise_for_status=lambda: None),
45 httpx.TimeoutException("timeout"),
46 MagicMock(json=lambda: {"id": 3}, raise_for_status=lambda: None),
47 ]
48
49 with patch("mymodule.fetcher.httpx.AsyncClient") as mock_cls:
50 # AsyncMock for the context manager (__aenter__ / __aexit__)
51 mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
52 mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
53
54 results = await fetch_all(
55 ["https://api.test/1", "https://api.test/2", "https://api.test/3"]
56 )
57
58 # 2 successes, 1 failure
59 successes = [r for r in results if r["status"] == "ok"]
60 failures = [r for r in results if r["status"] != "ok"]
61 assert len(successes) == 2
62 assert len(failures) == 1
63
64
65@pytest.mark.asyncio
66async def test_producer_consumer_pipeline():
67 """Test the full producer-consumer pipeline with mocked I/O."""
68 processed_records = []
69
70 async def mock_worker(queue, worker_id, db_pool):
71 while True:
72 item = await queue.get()
73 if item is None:
74 queue.task_done()
75 break
76 processed_records.append(item)
77 queue.task_done()
78 return len(processed_records)
79
80 with patch("mymodule.pipeline.worker", side_effect=mock_worker):
81 await run_pipeline(api_urls=["https://api.test/batch/1"])
82
83 assert len(processed_records) > 0
AsyncMock vs MagicMock for Async Functions
MagicMock() returns a regular value when called. AsyncMock() returns an awaitable — it can be awaited. Use AsyncMock for: async functions, async context managers (__aenter__/__aexit__), and async iterators (__aiter__/__anext__). If you accidentally mock an async function with MagicMock and then await it, Python 3.8+ raises a TypeError. pytest-asyncio 0.21+ with asyncio_mode='auto' handles most boilerplate automatically.

Concurrency Decision Matrix: Async vs Threading vs Multiprocessing vs Spark

This is the most common staff-level async DE interview question. The interviewer describes a workload and asks which concurrency model you'd choose. The strong-hire answer gives specific criteria, not just the tool name.

ApproachBest ForWhyLimitation
asyncioMany concurrent I/O ops (API calls, DB queries)Single thread, zero OS overhead, thousands of concurrent connectionsBlocking calls kill everything; CPU-bound work doesn't benefit; GIL still applies
threadingCan't refactor to async; concurrent.futures interop; legacy sync libsGIL released during I/O; preemptive; simpler migration from sync codeGIL limits CPU parallelism; race conditions with shared state; higher overhead than async
multiprocessingCPU-bound work (Pandas transforms, compression, ML inference)Each process has own GIL bypass; true parallelism; isolated memoryHigh overhead per process; IPC serialization cost; not for I/O-bound work
Spark / DaskDataset doesn't fit on one machine; distributed fault-tolerance neededHorizontal scaling; built-in fault tolerance; SQL/DataFrame APIOverhead for small data; cluster management; no benefit under ~10GB
1import asyncio
2from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
3
4
5# Pattern: async for I/O, ProcessPoolExecutor for CPU -- the hybrid approach
6# Ingest from APIs (async), then transform each batch (multiprocessing)
7
8async def ingest_and_transform(urls: list[str]) -> list[dict]:
9 # Step 1: Async fetch (I/O-bound)
10 sem = asyncio.Semaphore(20)
11 async with httpx.AsyncClient() as client:
12 raw_batches = await asyncio.gather(
13 *[fetch_one(client, url, sem) for url in urls],
14 return_exceptions=True
15 )
16
17 valid_batches = [b for b in raw_batches if not isinstance(b, Exception)]
18
19 # Step 2: CPU-bound transform using ProcessPoolExecutor
20 # run_in_executor bridges sync executor with async code
21 loop = asyncio.get_event_loop()
22 with ProcessPoolExecutor(max_workers=4) as pool:
23 transformed = await asyncio.gather(*[
24 loop.run_in_executor(pool, transform_batch, batch)
25 for batch in valid_batches
26 ])
27
28 return [record for batch in transformed for record in batch]
29
30
31def transform_batch(batch: list[dict]) -> list[dict]:
32 """CPU-bound: runs in a separate process, bypasses GIL."""
33 import pandas as pd
34 df = pd.DataFrame(batch)
35 # ... expensive CPU transforms ...
36 return df.to_dict("records")
37
38
39# When to use threading instead of asyncio:
40# 1. Legacy sync library you can't replace
41# 2. Code that already uses threading.Lock and you don't want to refactor
42# 3. Library that releases the GIL during its blocking I/O (SQLite, some C extensions)
43
44async def use_legacy_sync_lib(data: list[dict]) -> list[dict]:
45 """Run a sync-only library in a thread without blocking the event loop."""
46 return await asyncio.to_thread(legacy_sync_process, data)
47
48
49# The GIL note for Python 3.13+
50# Python 3.13 introduced a no-GIL build (--disable-gil / PYTHON_GIL=0)
51# If adopted, threading for CPU-bound work becomes viable without multiprocessing overhead
52# Not yet production-stable; monitor PEP 703 for status
The Interview Answer on When to Use Each
'For I/O-bound work — hitting APIs, querying databases, reading from object storage — asyncio is my first choice: single thread, no OS overhead, thousands of concurrent connections. For CPU-bound transforms like Pandas processing, asyncio doesn't help because the GIL is held; I use ProcessPoolExecutor. For legacy sync libraries in an async pipeline, asyncio.to_thread() is the bridge. Threading is my fallback when I can't use async and don't need CPU parallelism. Spark comes in when the data doesn't fit on one machine or I need distributed fault tolerance.'

Shared State and Async Safety

asyncio's single-threaded cooperative model means most shared state is safe — coroutines can only switch at await points, so there are no preemptive race conditions between coroutines. BUT: this breaks down the moment you introduce threads (via asyncio.to_thread or ThreadPoolExecutor). Staff-level candidates understand exactly where the safety boundary is.

1import asyncio
2import threading
3
4# SAFE: coroutines don't preempt each other
5counter = 0
6
7async def increment_safe():
8 global counter
9 # This is a non-atomic read-modify-write BUT it's safe in pure asyncio:
10 # No other coroutine can run between the read and write (no await between them)
11 counter += 1
12
13async def safe_main():
14 await asyncio.gather(
15 increment_safe(),
16 increment_safe(),
17 increment_safe(),
18 )
19 print(counter) # 3 -- correct, no race condition in pure asyncio
20
21
22# UNSAFE: mixing asyncio with threads
23counter_threaded = 0
24lock = threading.Lock() # Need a real threading.Lock when threads are involved
25
26async def increment_unsafe():
27 global counter_threaded
28 # asyncio.to_thread runs this coroutine on a thread pool
29 # Multiple threads can preempt each other
30 await asyncio.to_thread(lambda: None) # Simulates mixed async+thread usage
31 counter_threaded += 1 # RACE CONDITION if called from multiple threads
32
33async def increment_with_lock():
34 global counter_threaded
35 with lock: # threading.Lock, not asyncio.Lock
36 counter_threaded += 1
37
38
39# asyncio.Lock vs threading.Lock:
40# asyncio.Lock: for protecting shared state between coroutines (async-aware, won't block thread)
41# threading.Lock: for protecting shared state between threads (blocks the thread while waiting)
42# Use asyncio.Lock when you're only in async context
43# Use threading.Lock when you're mixing async + threads
44
45async_lock = asyncio.Lock()
46
47async def safe_with_async_lock(shared_list: list, value: int) -> None:
48 async with async_lock:
49 # Only one coroutine can be in this block at a time
50 # Other coroutines wait (and yield to the event loop) while waiting
51 shared_list.append(value)
52 await asyncio.sleep(0) # Can yield while holding the lock
53
54
55# asyncio.Event for coroutine coordination
56data_ready = asyncio.Event()
57
58async def producer_event(event: asyncio.Event) -> None:
59 await asyncio.sleep(1) # Simulate work
60 event.set() # Signal: data is ready
61
62async def consumer_event(event: asyncio.Event) -> None:
63 await event.wait() # Block until event is set
64 print("Data is ready -- starting processing")
The Safety Boundary
Pure asyncio code is safe for shared state between coroutines because coroutines can only switch at await points — you control exactly where context switches happen. The moment you introduce threading.Thread, ProcessPoolExecutor, or asyncio.to_thread, preemptive scheduling enters the picture and all standard race conditions apply. The rule: asyncio.Lock for coroutine-to-coroutine coordination. threading.Lock when any thread pool is involved.
asyncio.Event for Pipeline Stage Signaling
asyncio.Event is the clean way to signal between pipeline stages: a producer sets the event when data is ready, and consumers await it. asyncio.Condition for wait-notify patterns. asyncio.Semaphore for resource limits (covered in intermediate). asyncio.Queue for producer-consumer with backpressure (covered in intermediate). These async synchronization primitives mirror their threading counterparts but are cooperative — they yield to the event loop while waiting, not block the thread.
PUTTING IT ALL TOGETHER

No task leaks. No orphaned coroutines. No surprise hangs.

Category
Python
Difficulty
advanced
Duration
42 minutes
Challenges
0 hands-on challenges

Topics covered: asyncio.TaskGroup: Structured Concurrency, Event Loop Internals: What Staff Candidates Know, Testing Async Code: pytest-asyncio and AsyncMock, Concurrency Decision Matrix: Async vs Threading vs Multiprocessing vs Spark, Shared State and Async Safety

Lesson Sections

  1. asyncio.TaskGroup: Structured Concurrency

  2. Event Loop Internals: What Staff Candidates Know

  3. Testing Async Code: pytest-asyncio and AsyncMock

  4. Concurrency Decision Matrix: Async vs Threading vs Multiprocessing vs Spark

  5. Shared State and Async Safety