Python Interview Practice
15 Python interview questions with worked solutions, ranked roughly by frequency in real data engineering loops. Each one shows the approach, the typical wrong answer, and runnable Python where it adds value. Almost nothing looks like LeetCode.
15 Python interview questions with worked solutions, ranked by how often they show up in real data engineering loops. Each question names the level it typically lands at, shows the approach the interviewer is looking for, and includes runnable Python where it adds value. Almost nothing looks like LeetCode.
15 Python interview questions
Each links to its full worked answer. The level tag is the seniority at which the question typically lands.
- Q01L3 · Dict aggregation
Group orders by customer, returning total spend and order count per customer.
defaultdict with a lambda accumulator. Skips the 'if key not in dict' dance. The standard-library answer to SQL GROUP BY with SUM and COUNT. If you can write this from a blank file in under three minutes, the dictionary section of a Python round is solved.
from collections import defaultdict def group_orders(orders): agg = defaultdict(lambda: {"total": 0, "count": 0}) for row in orders: cid = row["customer_id"] agg[cid]["total"] += row["amount"] agg[cid]["count"] += 1 return dict(agg) - Q02L3 · Inversion
Invert a dict; when values collide, collect the original keys into a list.
defaultdict(list) keyed on the value. Append the original key. O(n). The trap is reaching for a dict comprehension and silently losing collisions when two keys had the same value.
from collections import defaultdict def invert(d): out = defaultdict(list) for k, v in d.items(): out[v].append(k) return dict(out) - Q03L4 · Dedup
Dedupe a list of dicts by a composite key, keeping the latest record per key.
Two reasonable answers. A dict keyed on the composite tuple, updating when newer ts arrives: O(n) time and memory. Or sort by (key, ts DESC) and take the first occurrence per key: O(n log n), friendlier on streamed input. The dict approach is the Python version of the SQL ROW_NUMBER dedup pattern.
def dedup_latest(records, key_fn, ts_fn): latest = {} for r in records: k = key_fn(r) if k not in latest or ts_fn(r) > ts_fn(latest[k]): latest[k] = r return list(latest.values()) - Q04L4 · File I/O
Compute the average of a numeric column in a 10GB CSV without loading the whole file.
Running sum and count, one row at a time. csv.DictReader iterates lazily; pandas.read_csv without chunksize crashes the kernel. The interviewer's follow-up is what changes when the file is gzipped (open via gzip.open in text mode) or distributed across S3 prefixes.
import csv def avg_column(path, col): total, count = 0.0, 0 with open(path) as f: for row in csv.DictReader(f): try: total += float(row[col]); count += 1 except (ValueError, TypeError): pass return total / count if count else None - Q05L4 · JSON walking
Flatten a nested JSON into a flat dict with dot-separated keys.
Recursive walk. For each key, if the value is a dict, recurse with prefix + key + '.'. Otherwise, add to result. Decide upfront how to handle lists: explode into multiple rows or serialize. The interviewer always asks; have an answer ready.
def flatten(d, prefix=""): out = {} for k, v in d.items(): key = f"{prefix}{k}" if not prefix else f"{prefix}.{k}" if isinstance(v, dict): out.update(flatten(v, key)) else: out[key] = v return out - Q06L4 · Sessionization
Sessionize an event stream with a 30-minute inactivity gap.
Sort by (user_id, ts). Walk the list. Start a new session whenever the gap exceeds 30 minutes or the user changes. Assign per-user session counters. The mistake to avoid: comparing against the session start instead of the immediately previous event.
def sessionize(events, gap=1800): events.sort(key=lambda e: (e["user_id"], e["ts"])) sid, prev_user, prev_ts = 0, None, None for e in events: if e["user_id"] != prev_user or e["ts"] - prev_ts > gap: sid += 1 e["session_id"] = sid prev_user, prev_ts = e["user_id"], e["ts"] return events - Q07L4 · Generators
Generator that reads a large file and yields batches of N lines.
yield from a list that resets every N. Memory stays constant. Pandas has chunksize for the DataFrame version. The interviewer's follow-up is what happens at the last partial batch: yield the partial or skip it. Yield it.
def batches(path, n): batch = [] with open(path) as f: for line in f: batch.append(line.rstrip("\n")) if len(batch) == n: yield batch batch = [] if batch: yield batch - Q08L4 · Retries
Decorator that retries a function up to 3 times with exponential backoff and jitter.
Inner function catches a narrow set of exceptions (requests.RequestException, not bare Exception), sleeps 2**attempt seconds plus a random jitter, re-raises after the last attempt. Jitter prevents the self-inflicted DDoS when every worker retries at the same later time. Mention tenacity as the production library.
import time, random from functools import wraps def retry(max_attempts=3): def deco(fn): @wraps(fn) def wrapped(*args, **kwargs): for attempt in range(max_attempts): try: return fn(*args, **kwargs) except Exception: if attempt == max_attempts - 1: raise time.sleep(2 ** attempt + random.uniform(0, 1)) return wrapped return deco - Q09L4 · Schema validation
Validate incoming records against a schema; route invalid records to a dead-letter queue.
Define expected fields and types. Iterate records, collect errors per record. Valid records go to output; invalid go to DLQ list with the error attached. The senior signal is returning both lists and tagging the failure mode (missing field, wrong type, out-of-range value) rather than just rejecting.
def validate(records, schema): valid, dlq = [], [] for r in records: errs = [] for field, typ in schema.items(): if field not in r: errs.append(f"missing:{field}") elif not isinstance(r[field], typ): errs.append(f"type:{field}") if errs: dlq.append({"record": r, "errors": errs}) else: valid.append(r) return valid, dlq - Q10L5 · Memory
Find the top 10 rows by a numeric column in a 50GB CSV.
heapq.nsmallest or nlargest with a min-heap of size 10. Read line by line. Never call pd.read_csv without chunksize on a 50GB file. The interviewer is filtering for candidates who know that pandas isn't a magic solution and that streaming + small-state is the standard pattern.
import csv, heapq def top_n(path, col, n=10): heap = [] with open(path) as f: for row in csv.DictReader(f): try: v = float(row[col]) except (ValueError, TypeError): continue if len(heap) < n: heapq.heappush(heap, (v, row)) elif v > heap[0][0]: heapq.heappushpop(heap, (v, row)) return [r for _, r in sorted(heap, reverse=True)] - Q11L5 · Concurrency
Concurrent fetch with rate limit using asyncio.
asyncio.Semaphore(N) bounds in-flight requests. The interviewer's follow-up is what happens when one request hangs: asyncio.wait_for with a timeout per call. The senior signal is naming why rate limiting matters for downstream services even when your local quota allows more.
- Q12L5 · Iterator merge
Stream-merge sorted iterators into a single sorted iterator.
heapq.merge yields lazily. O(n log k). Beats materialize-then-sort for k iterators. The classic external-sort merge step. Don't reinvent it; name heapq.merge and explain why.
import heapq def merge_sorted(*iters, key=None): return heapq.merge(*iters, key=key) - Q13L5 · pandas
SCD Type 2 merge in pandas: expire current rows where the source differs and insert new rows.
Identify rows where source differs from target. Expire current (set valid_to, is_current=False), insert new with valid_from=now and is_current=True. Use pd.merge with indicator=True to spot the diff. The interview signal is naming why the surrogate key changes and the natural key stays.
- Q14L5 · Log parsing
Parse a 2GB Apache access log; output the top 100 URLs by request count, grouped by HTTP status code.
Read line by line. Parse with regex (compile outside the loop). collections.defaultdict(Counter) keyed by status code. After processing, .most_common(100) per status. Mention that on truly large logs the right answer is Spark or DuckDB, not single-machine Python.
- Q15L6 · Property-based testing
Write a property-based test confirming a transformation function never drops rows.
hypothesis.@given with strategies that produce realistic input. Assert len(output) == len(input). The senior signal is volunteering this kind of test for data quality functions where the input space is too large for example-based tests to cover. Most candidates have never used hypothesis; mentioning it lands well.
How the 15 questions map to seniority
An onsite at your target level pulls from the band that matches, plus one warm-up below and (for L5 and up) one stretch above.
- L3 (Junior)2 questionsQ1 to Q2
Dict aggregation, dict inversion. Warm-up shape.
- L4 (Mid)7 questionsQ3 to Q9
Dedup, file I/O, JSON walking, sessionization, generators, retries, schema validation.
- L5 (Senior)5 questionsQ10 to Q14
Memory-bounded top-N, async with rate limiting, iterator merge, SCD Type 2 in pandas, log parsing at scale.
- L6 (Staff)1 questionQ15
Property-based testing for data quality. Rarely asked, lands well when volunteered.
How a DE Python round actually runs
45 minutes. A vague prompt that looks like the work: 'parse this log and pull out the error patterns', 'dedupe this event stream', 'walk this JSON and flatten it'. You ask about input format, expected output, and scale. The interviewer answers the way a real one does, which is to say slightly under-specified. You write Python. The evaluator runs it. Follow-ups push on edge cases, complexity, and whether the code could survive in production.
The shape is almost nothing like a software engineering Python round. SWE leans on data structures and algorithms; the DE version leans on data manipulation. If you've been prepping with LeetCode trees, you're practicing for a different exam.
Standard library before pandas is the default expectation. Most rounds want defaultdict, csv.DictReader, json, itertools, and Counter before a DataFrame. Reaching for pandas on a five-row dedup reads as overkill. If you can't write a GROUP BY with defaultdict in under five minutes, that's the gap to close first.
Know the patterns before the interviewer asks them.
Common questions about Python interview practice
What Python questions actually come up in DE interviews?+
How is this different from SWE Python interviews?+
Do I need pandas?+
How many Python problems should I solve before a loop?+
Should I prepare for PySpark?+
Should I practice with autocomplete on or off?+
Run a mock before the real one
- 01
Active recall beats re-reading by 50%
Cognitive-science meta-reviews (Dunlosky et al., 2013) rank practice testing as a top-tier study technique, while re-reading and highlighting rank near the bottom
- 02
76% of hiring managers reject on the coding task, not the resume
From HackerRank's 2024 Developer Skills Report. Candidates who look strong on paper still fail the live screen if they haven't done timed, executable practice
- 03
Five problem shapes cover 80% of data engineer loops
Dedup, sessionization, top-N-per-group, slowly-changing dimensions, partition tricks. Writing the shapes by hand turns the unfamiliar into pattern recognition