Schema Evolution and Late Data: Intermediate

A ride-sharing platform with a hundred million daily events needed to roll out a new pricing model. The event payload would change from a flat fare field to a structured object with surge multiplier, tax breakdown, and tip allocation. Forty-three downstream consumers read the old fare field. Coordinating a single-instant deploy across forty-three consumers was not viable. The team rolled the change out in four phases over six weeks: ship a new schema version, dual-write both shapes, migrate consumers one at a time, then drop the old shape. The phased approach is called expand-contract, and it is the production-grade alternative to the additive default. On the same platform, a separate streaming team computed five-minute fare totals for a real-time pricing dashboard. Trips taken in tunnels and dead zones produced events that arrived three to twelve minutes late. The streaming engine had to decide which window each event belonged to and when each window's results could be considered final. The answer is a watermark, the engine's promise that no events older than a stated time will be processed against a closed window. This lesson covers both: how production teams version schemas at scale, and how streaming engines reason about time when events do not arrive in order.

Schema Registries: Where They Live

Daily Life
Interviews

Recognize what a schema registry stores and how compatibility checks block incompatible producer changes.

The beginner tier treated schemas as an implicit contract. The intermediate tier turns that contract into a system of record. A schema registry is a service that stores schema definitions, assigns each one an immutable version, and runs compatibility checks before accepting a new version. Producers register the schema before publishing data under it. Consumers fetch the schema by version when they read. The registry is the single source of truth for what shape data should have at each moment in history.

What a Registry Stores

ElementPurposeExample
SubjectA logical name for a stream of related schemascheckout_events_v
SchemaThe Avro, Protobuf, or JSON Schema documentRecord with fields user_id, amount, currency
VersionMonotonic integer assigned per subject1, 2, 3, with version 4 being the latest
Schema IDGlobally unique identifier embedded in each messageShort integer prefixed to the serialized payload
Compatibility modeRule the registry enforces on new versionsBACKWARD, FORWARD, FULL, or NONE
The most widely deployed registry is Confluent Schema Registry, which stores Avro, Protobuf, and JSON Schema definitions for Kafka topics. AWS Glue Schema Registry plays the same role inside the AWS ecosystem. Apicurio is the open-source alternative widely used outside Confluent's stack. The interfaces differ in detail but the model is consistent: register schemas under a subject, validate compatibility, embed the schema ID in each message, fetch the schema on read.

How Compatibility Checks Run

1# Producer-side compatibility check before publishing v2
2from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
3
4client = SchemaRegistryClient({'url': 'http://registry:8081'})
5
6new_schema = Schema(
7 schema_str=open('checkout_event_v2.avsc').read(),
8 schema_type='AVRO',
9)
10
11# The registry checks v2 against the latest registered v1.
12# If the configured mode is BACKWARD, v2 must be readable by v1 readers
13# of the most recent prior version.
14is_compatible = client.test_compatibility(
15 subject='checkout_events_v-value',
16 schema=new_schema,
17)
18
19if not is_compatible:
20 raise RuntimeError('Schema v2 is not backward compatible; rollout blocked.')
21
22client.register_schema(
23 subject='checkout_events_v-value',
24 schema=new_schema,
25)
The compatibility check happens at registration time, not at deploy time. A schema that fails the check is rejected by the registry, which means the producer cannot publish under it. This is the key invariant: incompatible schemas cannot enter the system. The check is automated, not a human review, which is why it scales to hundreds of producers and thousands of subjects.
Without a Schema Registry
  • Schema documented in a wiki page that no one reads
  • Compatibility checked by hope and code review
  • Old payloads in flight after a schema change cannot be parsed
  • Consumer breakages discovered in production after the producer ships
With a Schema Registry
  • Schema is a versioned artifact with an immutable ID
  • Compatibility checked at registration; incompatible schemas rejected
  • Each message carries its schema ID; consumers can parse any version
  • Breaking changes are blocked at the producer, not at the consumer

Schema IDs Embedded in Messages

1Wire format of a single Kafka record
2 USING Confluent SCHEMA Registry : Byte 0 : Magic byte(always 0) Bytes 1 - 4 : SCHEMA ID(32 - bit INTEGER, big - endian) Bytes 5 + : Avro - encoded payload The consumer reads the magic byte, extracts the SCHEMA ID, fetches the matching SCHEMA
3
4
5FROM the registry(cached locally), AND uses that SCHEMA to deserialize the payload.Old payloads written under SCHEMA ID 17 are still readable WHEN SCHEMA ID 23 IS the latest, because the consumer fetches whichever ID it needs.
What a registry buys the team:
  • Producers cannot ship incompatible schemas; the registry rejects them
  • Consumers can parse any version that ever existed in the topic
  • Audit log: every schema change is timestamped and attributable
  • Cross-team contracts: producer and consumer teams share one source of truth
SubjectCompatibility ModeSchema ID
Subject
The named stream of schemas
A logical group, usually one per Kafka topic. The subject's evolution is the audit trail of how that data shape has changed over time.
Compatibility Mode
The rule that gates new versions
BACKWARD requires new schemas to be readable by recent old readers. FORWARD requires old schemas to read recent new data. FULL requires both. NONE disables the gate.
Schema ID
The pointer embedded in every message
A short integer prefixed to the payload. Consumers use it to fetch the exact schema the producer wrote, even if many versions exist.

A registry is to schemas what a version control system is to code. Without it, schemas drift through tribal knowledge. With it, every shape that has ever flowed through a topic can be reconstructed, audited, and reasoned about.

Compatibility modes are not a per-topic decision in isolation. They are part of a wider contract between producer and consumer. A topic configured BACKWARD says the consumer team's existing code keeps working when the producer ships a new schema; the producer's freedom is constrained by what the consumer is currently running. A topic configured FORWARD makes the opposite promise, useful when consumers ship faster than producers. FULL pins both directions, which is the right choice for topics with many producers and many consumers where neither side can dictate the rollout sequence. Picking the wrong mode is not a fatal mistake but it is an expensive one, because changing the mode in production usually requires a topic re-creation or a careful migration. The default mode at registration time has lasting consequences.
Production registries also enforce naming conventions and ownership at the subject level. A subject prefix maps to a team, a domain, and a contract owner. The team named on the subject is paged when consumer-side breakage is reported. This metadata is what lets large companies operate hundreds of subjects without losing track of who owes whom what when something breaks. A registry without ownership metadata is a slightly nicer way to read JSON; a registry with ownership is the operational backbone of the data platform.
TIP
When adopting a registry on an existing system, start with NONE compatibility and inventory the schemas already in flight. Tighten to BACKWARD or FULL only after the inventory is clean; flipping the mode before the inventory will block legitimate production traffic.

The Expand-Contract Pattern

Daily Life
Interviews

Apply the four phases of expand-contract to plan a breaking schema change that does not require a synchronized deploy.

The additive default works for most schema changes. It does not work for the breaking ones. When a column must be renamed, dropped, or restructured, every consumer downstream has to adapt. Doing this in a single deploy is impossible at any reasonable scale. The expand-contract pattern is the production technique for rolling out a breaking change without a flag day. It splits the change into four phases that allow producers and consumers to migrate independently.

The Four Phases

PhaseProducer StateConsumer State
1. ExpandAdd new shape; keep old shapeContinue reading old shape
2. Dual-writePopulate both shapes for every eventBegin reading new shape, validate against old
3. MigrateContinue dual-writingCut over to new shape one consumer at a time
4. ContractDrop old shapeAll consumers on new shape; old shape unused
Each phase is a deploy. Between phases the system is in a stable state. A consumer that is slow to migrate does not block the producer; it lives in phase 2 or 3 longer. The producer cannot reach phase 4 until every consumer has reached phase 3, which is the only piece of synchronization the pattern requires. That synchronization happens at the slowest consumer's pace, which is the realistic scheduling reality.

A Worked Rename

1// Phase 1 (Expand): producer adds new field alongside old
2{
3 "user_id": 42,
4 "signup_date": "2025-09-14", // old
5 "created_at": "2025-09-14T00:00:00Z" // new, additive
6}
7
8// Phase 2 (Dual-write): every event has both fields
9// Old field is required, new field is required, values are kept consistent
10
11// Phase 3 (Migrate): consumers move to created_at one at a time
12// Each consumer's deploy is independent; producer is unchanged
13
14// Phase 4 (Contract): once every consumer is on created_at
15{
16 "user_id": 42,
17 "created_at": "2025-09-14T00:00:00Z" // old field dropped
18}
What looked like a single rename in the source code becomes four production deploys spread over weeks. The cost is real. The benefit is that no consumer team is forced into a deploy on the producer's schedule, which is what makes the pattern viable in any system with more than a handful of teams. The pattern also provides a clean rollback path. If a problem surfaces in phase 3, the team rolls back consumer migrations one at a time without touching the producer. If a problem surfaces in phase 4, the team can recreate the dropped column from the dual-write history and resume migration. Each phase preserves the option to reverse course; only phase 4 closes that option, and only after the team has confirmed every consumer is on the new shape. The rollback property is what distinguishes expand-contract from a flag-day deploy disguised as four steps.

When Expand-Contract Is Necessary

alert
A column rename, where every consumer reads the old name
alert
A type change that cannot be widened (e.g., string to integer)
alert
A semantic change: same column name, different meaning (e.g., revenue from gross to net)
check
A structural reorganization: nested object becomes top-level fields, or vice versa
1
2
3ALTER TABLE users ADD COLUMN created_at TIMESTAMP ; UPDATE users SET created_at = signup_date :: TIMESTAMP
4
5
6
7WHERE created_at IS NULL ; CREATE OR REPLACE VIEW v_users AS
8
9
10
11SELECT
12 user_id,
13 created_at
14FROM users ; ALTER TABLE users DROP COLUMN signup_date ;

Why It Beats a Flag Day

Flag-Day Migration
  • All teams deploy at the same instant
  • One slow team blocks everyone else
  • Rollback is all-or-nothing
  • Only viable at small organizations
Expand-Contract
  • Teams deploy independently across weeks
  • Slow teams extend the dual-write phase, not block the change
  • Rollback is per phase: revert just the latest deploy
  • Standard practice at any organization with more than a few consumer teams
Operational discipline the pattern requires:
  • Phase 2 must validate that both shapes carry equivalent data; drift between them is a bug
  • Phase 3 must track which consumers have migrated; without a registry of consumers, the producer cannot know when phase 4 is safe
  • Each phase must hold long enough that observed traffic confirms it is stable
  • Rollback plan: every phase must be reversible without destroying data
1/* Validate dual-write consistency during phase 2 */
2WITH dual AS (
3 SELECT
4 1 AS user_id,
5 DATE '2025-09-14' AS signup_date,
6 TIMESTAMP '2025-09-14 00:00:00' AS created_at
7
8 UNION ALL
9
10 SELECT
11 2,
12 DATE '2025-09-15',
13 TIMESTAMP '2025-09-15 00:00:00'
14
15 UNION ALL
16
17 SELECT
18 3,
19 DATE '2025-09-16',
20 TIMESTAMP '2025-09-15 23:00:00' /* drift */
21
22 UNION ALL
23
24 SELECT
25 4,
26 DATE '2025-09-17',
27 TIMESTAMP '2025-09-17 00:00:00'
28)
29
30SELECT
31 user_id,
32 signup_date,
33 created_at,
34 CASE
35 WHEN signup_date = DATE(
36 created_at
37 ) THEN 'consistent'
38 ELSE 'drift'
39 END AS dual_write_state
40FROM dual
41ORDER BY user_id
Do
  • Treat expand-contract as four deploys, not one
  • Backfill historical rows during the expand phase so the new field has a complete history
  • Add a dual-write consistency check that fails the build if the two shapes drift
Don't
  • Skip the dual-write phase under time pressure; without it, rollback is impossible
  • Drop the old shape before confirming all consumers have migrated
  • Apply expand-contract to a change that did not require it; the cost is not free
TIP
When estimating the timeline for expand-contract, double the slowest consumer's expected migration time. The actual blocker is rarely the producer or the schema. It is the consumer team that has not yet found a sprint to do the work.

Event Time Versus Processing Time

Daily Life
Interviews

Distinguish event time from processing time and pick the correct domain for a given consumer query.

The beginner tier introduced event time and processing time as a pair. The intermediate tier turns the distinction into the foundation of every streaming aggregation. The choice of which time domain to use is not a stylistic preference. It is the single most consequential decision in a streaming pipeline. It determines what numbers the consumer sees, how the system handles late events, and how much state the engine has to keep around.

The Two Domains, Restated

PropertyEvent TimeProcessing Time
SourceStamped by the producer or deviceStamped by the engine on receipt
StabilityFixed property of the event foreverDepends on producer, broker, and engine lag
ReproducibilityReruns produce the same bucketsReruns may produce different buckets if lag changed
Correctness for historyCorrect: the event happened when it happenedWrong: late events land in the wrong bucket
Implementation costEngine must track watermarks and hold stateEngine just reads its own clock
Most production streaming systems have to compute on event time, even though processing time is cheaper. The reason is that the consumer of the streaming output usually wants a number that reflects what happened in the world, not what happened to arrive at the engine on schedule. A revenue chart that shows yesterday's revenue should reflect every yesterday transaction, not only the ones that arrived at the engine before midnight. The shift from processing time to event time is also what makes a streaming pipeline reproducible. Replaying the same input stream through the same engine produces the same buckets under event time, regardless of when the replay happens or how lag conditions differ from the original run. Under processing time, the same replay produces different buckets every time it runs, because the engine's clock advances independently of the input. Reproducibility is not a luxury; it is what allows backfills, audits, and incident postmortems to reach the same answer twice.

Why the Two Diverge

1Sources of event - TIME / processing - TIME divergence : Mobile SDK queueing + / - seconds to days Producer batch flush(linger.ms) + / - milliseconds to seconds Network latency(geo - dispersed) + / - tens of milliseconds Kafka broker lag + / - milliseconds to minutes Consumer GROUP rebalance + / - seconds to minutes Backpressure
2
3 ON the engine + / - minutes to hours Producer outage + / - hours to days The long - tail of these gaps IS what late data IS.The pipeline has to decide how much of the tail to wait for before producing a result.

Worked Example: Same Events, Two Domains

1# Three events, with event_time and processing_time recorded
2events = [
3 {'id': 'a', 'event_time': '09:00:02', 'processing_time': '09:00:05', 'amount': 100},
4 {'id': 'b', 'event_time': '09:00:14', 'processing_time': '09:00:18', 'amount': 200},
5 {'id': 'c', 'event_time': '08:59:58', 'processing_time': '09:01:30', 'amount': 50},
6]
7
8# Aggregating by 1-minute buckets keyed on event_time:
9# 08:59 bucket -> {c} -> 50
10# 09:00 bucket -> {a, b} -> 300
11
12# Aggregating by 1-minute buckets keyed on processing_time:
13# 09:00 bucket -> {a, b} -> 300
14# 09:01 bucket -> {c} -> 50
15
16# Event 'c' lands in the 08:59 bucket under event time and the 09:01 bucket
17# under processing time. The two answers are different.
18# Reports of '8:59 minute revenue' that use processing time are wrong by 50.
When event time wins:
  • Reports keyed on real-world time (revenue per hour, sessions per day)
  • Auditable systems where re-running must produce identical history
  • Any system that compares producer-side metrics to consumer-side metrics
When processing time is acceptable:
  • Operational metrics about the engine itself (events per second processed)
  • Real-time alerting where 'when the engine saw it' is the question
  • Coarse cost monitoring where small lateness is in the noise
1/* Compare the two domains side-by-side on the same events */
2WITH evts AS (
3 SELECT
4 'a' AS id,
5 TIMESTAMP '2026-04-25 09:00:02' AS event_time,
6 TIMESTAMP '2026-04-25 09:00:05' AS proc_time,
7 100 AS amount
8
9 UNION ALL
10
11 SELECT
12 'b',
13 TIMESTAMP '2026-04-25 09:00:14',
14 TIMESTAMP '2026-04-25 09:00:18',
15 200
16
17 UNION ALL
18
19 SELECT
20 'c',
21 TIMESTAMP '2026-04-25 08:59:58',
22 TIMESTAMP '2026-04-25 09:01:30',
23 50
24
25 UNION ALL
26
27 SELECT
28 'd',
29 TIMESTAMP '2026-04-25 09:00:45',
30 TIMESTAMP '2026-04-25 09:00:48',
31 75
32)
33
34SELECT
35 DATE_TRUNC(
36 'minute',
37 event_time
38 ) AS bucket_event_time,
39 DATE_TRUNC(
40 'minute',
41 proc_time
42 ) AS bucket_proc_time,
43 SUM(amount) AS total
44FROM evts
45GROUP BY 1, 2
46ORDER BY 1, 2
The 50-amount event 'c' produces different totals depending on the time domain. Under event time, it joins the 08:59 bucket. Under processing time, it joins the 09:01 bucket. A consumer that asks for revenue between 08:59 and 09:00 sees 50 under event time and 0 under processing time. The two answers are not interchangeable. Anyone reading the streaming output as if processing time were event time is reading a different metric than the one labeled. The mismatch is invisible in the dashboard, which displays 'revenue per minute' without naming which time domain it used. The team that owns the streaming pipeline must enforce the labeling discipline; the consumer team will not catch the difference until the numbers diverge from a batch source of truth months later.
Confusion between the two domains causes a category of bug that is hard to catch in code review. Aggregations look correct in test, where events arrive in order. They run correct in early production, where lag is small. They drift wrong as the system scales, lag grows, and the gap between event time and processing time widens. Every senior streaming engineer has watched a junior engineer add a GROUP BY clock_minute to an aggregation and ship it, only to have the on-call team realize months later that the dashboard is keyed on processing time and the historical buckets reshuffle every time the stream is replayed. The fix is to make the time domain explicit in every aggregation, not implicit in the engine's defaults.

Apache Beam, Flink, and Spark Structured Streaming all expose event-time semantics as a first-class concept. The Beam programming model in particular treats every aggregation as having an explicit WindowFn and a TimestampFn. This is not academic. It is the only way an engine can answer the question of which window an out-of-order event belongs to without inspecting every event individually.

TIP
When designing a streaming aggregation, write the consumer's expected query first. If the query says 'between 9 and 10 yesterday morning,' the answer is event time. If the query says 'how many events did the engine see in the last minute,' the answer is processing time. The phrasing of the query reveals the domain.

Watermarks: The Engine's Promise

Daily Life
Interviews

Recognize a watermark as the engine's promise about no-earlier events and reason about how it closes a windowed aggregation.

If a streaming engine waits forever for late events, no window ever closes and no result is ever produced. If it does not wait at all, every late event is dropped and every aggregation is wrong. The watermark is the compromise. A watermark is a timestamp that the engine emits, periodically, declaring that no events with event_time earlier than the watermark will be processed against an open window. The watermark is the engine's commitment to a closing rule.

What a Watermark Actually Is

PropertyDescription
TimestampAn event-time value that advances over time
PromiseNo events with event_time < watermark will be admitted to a window
EffectWindows whose end is below the watermark are eligible to close and emit
SourceComputed from the input stream; falls behind when sources are slow
Per-source vs globalEach input has its own watermark; the engine takes the minimum
A watermark is not a wall clock. It advances based on observed events. If the input stream stalls, the watermark stalls. If the input stream is heavily out of order, the watermark lags behind the wall clock by however far behind the slowest event runs. The engine emits the watermark with each batch of records it processes, and downstream operators use it to decide which windows have closed. The decoupling from the wall clock is what makes the watermark useful. A wall-clock-driven window closes at midnight regardless of whether late events have arrived, which is what produces the silent-drop behavior described in the beginner tier. A watermark-driven window closes when the engine has reason to believe no earlier events remain, which is a stronger and more useful guarantee. The cost is that the engine must hold window state for the duration of the lag between wall clock and watermark, but that holding is what allows the late events to land in the right bucket.

How the Watermark Closes a Window

1Consider a 1 - minute tumbling WINDOW aligned
2 ON event TIME.Window [ 09 : 00, 09 : 01) IS closed WHEN the watermark advances to 09 : 01. Timeline : 09 : 00 : 00 WINDOW opens 09 : 00 : 30 Event A(event_time = 09 : 00 : 25) arrives, joins WINDOW 09 : 00 : 50 Event B(event_time = 09 : 00 : 48) arrives, joins WINDOW 09 : 01 : 00 Wall clock crosses 09 : 01, but watermark IS still AT 09 : 00 : 42 09 : 01 : 15 Event C(event_time = 09 : 00 : 55) arrives late, watermark advances to 09 : 00 : 55 09 : 01 : 30 Event D(event_time = 09 : 01 : 10) arrives, watermark advances to 09 : 01 : 10 AT 09 : 01 : 30, the watermark has crossed 09 : 01. WINDOW [ 09 : 00, 09 : 01) closes AND emits its result containing A, B, AND C.Event D belongs to the next WINDOW.
The example shows two important properties. First, the watermark lags behind the wall clock. Second, the window does not close on the wall clock; it closes on the watermark. Without this distinction, event C would have been late by 14 seconds and would have missed the window. With the watermark, the window stayed open until the watermark passed 09:01, at which point C was already inside it.

Watermark Strategies

StrategyHow It ComputesWhen To Use
Bounded out-of-ordernesswatermark = max_event_time_seen - allowed_lagMost cases; assumes lateness has a known upper bound
Ascending timestampswatermark = max_event_time_seenEvents arrive in strict order; rare but cheap when true
PunctuatedWatermark embedded in special marker events from the producerProducer can signal end-of-stream segments explicitly
CustomApplication-specific functionWhen the source has unusual lateness characteristics
1# Flink-style bounded out-of-orderness watermark in PyFlink
2from pyflink.common import WatermarkStrategy, Duration
3from pyflink.common.time import Time
4
5strategy = (
6 WatermarkStrategy
7 .for_bounded_out_of_orderness(Duration.of_seconds(60))
8 .with_timestamp_assigner(MyTimestampAssigner())
9)
10
11# This says: 'I expect events to be at most 60 seconds out of order.'
12# Watermark advances to (max_event_time_seen - 60s).
13# Windows whose end is older than this watermark close and emit.
14# Events arriving with event_time < (current_watermark) are late.
What the watermark guarantees and does not guarantee:
  • Guarantees: events older than the watermark have already been routed to their windows
  • Does NOT guarantee: every event that will ever arrive has been seen
  • Does NOT guarantee: the watermark advances monotonically (it can stall on slow sources)
  • Implication: events arriving after the watermark are 'late' and require an explicit policy
Bounded Out-of-OrdernessAscending TimestampsPunctuated
Bounded Out-of-Orderness
Default for most workloads
Watermark = max event time observed minus a fixed lag. Tunable, predictable, and handles the bulk of real-world lateness profiles.
Ascending Timestamps
When events are strictly ordered
Watermark = max event time observed. Cheapest to compute. Brittle: any out-of-order event is dropped without recourse.
Punctuated
Producer-driven boundaries
Watermark advances on explicit marker events emitted by the producer. Useful for systems with known segment boundaries (trading sessions, ad auctions).
No Watermark
  • Windows close on processing time, dropping late events silently
  • Reruns produce different answers as lag conditions change
  • Cannot reason about correctness of historical results
  • Engine state grows unbounded if windows never close
Watermark-Driven Windows
  • Windows close when the engine knows no earlier events will arrive
  • Reruns are reproducible; the watermark is a function of the input data
  • Late-arriving events are surfaced as a separate signal, not silently dropped
  • Engine state bounded: closed windows release their state

Watermarks turn the late-data problem from a quality bug into an explicit policy parameter. The team chooses how long to wait, the engine enforces the wait, and any event arriving after the watermark is handled by a named policy rather than a silent drop.

The watermark is a contract between the engine and the consumer about how much waiting is acceptable. A consumer that wants every minute bar emitted within thirty seconds of the minute closing is asking for an aggressive watermark and accepting the cost: late events past thirty seconds will be either dropped or routed to allowed-lateness handling. A consumer that is willing to wait five minutes for higher accuracy is asking for a relaxed watermark. The engine cannot guess; the consumer team must say. Most production incidents that present as 'streaming output is wrong' resolve to a watermark setting that did not match the consumer's expectation, set by a different team months earlier and never revisited.
TIP
When debugging a streaming aggregation that produces wrong numbers, look at the watermark trace before looking at the data. Most apparent late-data bugs are watermark-progression bugs: the engine is closing windows too early because the watermark is advancing too aggressively for the input.

1-Hour Allowed Lateness

Daily Life
Interviews

Configure a streaming aggregation with allowed lateness and reason about what happens to on-time, slightly-late, and very-late events.

The pieces from the prior sections combine into a single concrete configuration. A streaming aggregation tumbles in 5-minute event-time windows, with a watermark using bounded out-of-orderness of 60 seconds, plus an allowed lateness of 60 minutes. This configuration shows up in production at scale; the numbers are tuned per workload. The walkthrough below traces what happens to events that arrive on time, slightly late, and very late.

The Configuration

1# Apache Beam-style streaming aggregation with allowed lateness
2(
3 events
4 | 'ParseTimestamp' >> beam.Map(lambda e: (e.user_id, e.amount, e.event_time))
5 | 'Window' >> beam.WindowInto(
6 beam.window.FixedWindows(5 * 60), # 5-minute event-time windows
7 trigger=AfterWatermark(
8 early=AfterProcessingTime(60), # speculative results every minute
9 late=AfterCount(1), # update on each late event
10 ),
11 allowed_lateness=Duration(seconds=60 * 60), # accept events up to 1 hour late
12 accumulation_mode=AccumulationMode.ACCUMULATING, # late updates are merged in
13 )
14 | 'Aggregate' >> beam.CombinePerKey(sum)
15 | 'Sink' >> beam.io.WriteToBigQuery(...)
16)
The configuration says four things. Windows are 5 minutes wide, aligned on event time. The watermark closes a window when 60 seconds have passed without any earlier events. Any event arriving up to 60 minutes after the window closed will still be admitted, and the engine will emit an updated result. Events arriving more than 60 minutes after the window closed are dropped and routed to a separate dead-letter stream. Each of the four numbers comes from a measurement, not from intuition. The 5-minute window matches the consumer's expected granularity for the dashboard. The 60-second watermark lag matches the 99th percentile of measured event-to-arrival lag for this source. The 60-minute allowed lateness covers the 99.9th percentile of mobile-tail lateness. The dead-letter stream exists because the team accepts that events past the 99.9th percentile will not be caught by streaming and must be handled by the next-day reconciliation pass. Without these four anchors, the configuration is guesswork and the output is unreliable.

The Three Cases

CaseEvent TimeArrival TimeLatenessWhat Happens
On time09:02:1409:02:184 secondsJoins window [09:00, 09:05); window closes at watermark = 09:06
Slightly late09:02:1409:34:0032 minutesWindow already closed; allowed lateness admits it; aggregation re-fires with updated total
Very late09:02:1411:15:002 hours 13 minutesPast allowed lateness; event is dropped from the streaming output and routed to a dead-letter stream

Walking the Timeline

1WINDOW : [ 09 : 00 : 00, 09 : 05 : 00)(5 - minute event - TIME WINDOW) T = 09 : 02 : 18 Event E1(event_time = 09 : 02 : 14) arrives.Window state : 1 event.T = 09 : 04 : 30 Event E2(event_time = 09 : 03 : 55) arrives.Window state : 2 events.T = 09 : 05 : 00 Wall clock crosses 09 : 05. T = 09 : 06 : 02 Watermark advances past 09 : 05. WINDOW [ 09 : 00, 09 : 05) closes.First emission : total = E1 + E2.T = 09 : 34 : 00 Event E3(event_time = 09 : 02 : 14) arrives.31 minutes late.Within allowed lateness(60 MIN).Re - fires.Updated emission : total = E1 + E2 + E3.T = 11 : 15 : 00 Event E4(event_time = 09 : 02 : 14) arrives.2 h13m late.Past allowed lateness.Dropped
2
3FROM WINDOW ; routed to DLQ.Window emission unchanged.State retention : the WINDOW 's state was held in the engine for the full 60-minute allowed-lateness duration. After that, state is released.'
The window emits multiple times. The first emission happens when the watermark closes the window, with whatever events were on time. Subsequent emissions happen each time a late event arrives within the allowed lateness. The downstream consumer must be able to handle updates to a previously emitted result. This is the key constraint of accumulating-mode windows: the consumer's storage must support upserts keyed on the window identifier. Accumulation mode itself is a separate decision. ACCUMULATING (the example above) treats each re-fire as the new total for the window, so a downstream sink that overwrites by window key gets the latest answer. DISCARDING treats each re-fire as a delta from the previous emission, so the sink must add deltas to the running total. ACCUMULATING_AND_RETRACTING goes further, emitting a retraction of the prior total before the new total, useful when consumers cannot accept overwrites and must process additions and removals separately. Picking the wrong mode produces silently wrong totals, in the same family of bugs as picking the wrong time domain.
1/* Demonstrate the window classification logic on sample arrivals */
2WITH arrivals AS (
3 SELECT
4 'E1' AS id,
5 TIMESTAMP '2026-04-25 09:02:14' AS event_time,
6 TIMESTAMP '2026-04-25 09:02:18' AS arrived_at,
7 100 AS amount
8
9 UNION ALL
10
11 SELECT
12 'E2',
13 TIMESTAMP '2026-04-25 09:03:55',
14 TIMESTAMP '2026-04-25 09:04:30',
15 200
16
17 UNION ALL
18
19 SELECT
20 'E3',
21 TIMESTAMP '2026-04-25 09:02:14',
22 TIMESTAMP '2026-04-25 09:34:00',
23 50
24
25 UNION ALL
26
27 SELECT
28 'E4',
29 TIMESTAMP '2026-04-25 09:02:14',
30 TIMESTAMP '2026-04-25 11:15:00',
31 75
32),
33classified AS (
34 SELECT
35 id,
36 event_time,
37 arrived_at,
38 EXTRACT(EPOCH FROM (
39 arrived_at - event_time
40 )) AS lateness_seconds,
41 CASE
42 WHEN arrived_at <= event_time + INTERVAL '1 minute' THEN 'on_time'
43 WHEN arrived_at <= event_time + INTERVAL '60 minutes' THEN 'slightly_late_admitted'
44 ELSE 'past_allowed_lateness_dropped'
45 END AS classification,
46 amount
47 FROM arrivals
48)
49
50SELECT
51 *
52FROM classified
53ORDER BY arrived_at

The Tradeoff Costs

alert
State cost: holding window state for 60 minutes after it closed multiplies engine memory use roughly 12-fold over a no-lateness baseline.
alert
Update cost: every late event causes a re-emission. Downstream sinks must be idempotent on the window key.
check
Correctness gain: late events within the window are accounted for, including the 1 to 5 percent of mobile retry tail.
alert
Boundary policy: events past 60 minutes are dropped from the streaming output. Without a separate fix, those events are lost from the answer.
Choosing the allowed-lateness number:
  • Measure the lateness distribution from production: 95th, 99th, and 99.9th percentile
  • Set allowed lateness above the 99th percentile to capture the bulk of late events
  • Expect state cost roughly proportional to allowed lateness divided by window size
  • Plan for the truly-late tail: a separate batch reconciliation job, covered in the advanced tier
The allowed-lateness configuration also dictates downstream sink design. A sink that supports only INSERT does not work, because late re-fires would create duplicate rows. The sink must support upsert by window key, or the consumer must read through a deduplicating view. Iceberg and Delta tables support this pattern natively through MERGE; BigQuery streaming inserts handle it through partitioned table merges; Snowflake handles it through MERGE statements driven by a streaming task. Each platform has a different idiom but the requirement is the same: late re-fires from the streaming engine must not create duplicates downstream. A pipeline that adds allowed lateness without a corresponding upsert sink is a pipeline that double-counts late events for as long as the lateness window is open.
Do
  • Tune allowed lateness against measured lateness percentiles, not guesses
  • Route past-lateness events to a dead-letter stream so they are not silently lost
  • Make downstream sinks idempotent on the window key to absorb late re-emissions
Don't
  • Set allowed lateness to infinity; engine state grows without bound
  • Set allowed lateness to zero; the long tail of mobile retries is silently dropped
  • Forget that the watermark can stall when an upstream partition goes idle
TIP
When a streaming output is consistently 1 to 2 percent below the batch reconciliation, the gap is almost always the allowed-lateness tail. Either widen the lateness window, accept the gap as the streaming-vs-batch reconciliation delta, or build the reconciliation pass that fixes it in batch.
PUTTING IT ALL TOGETHER

> A growth-stage analytics platform runs a streaming pipeline that ingests checkout events from Kafka, aggregates revenue per minute, and writes to a Snowflake table powering a real-time pricing dashboard. The product team requests two changes the same week. First, the checkout event payload must change: the legacy 'fare_cents' field is replaced with a structured 'pricing' object containing base, surge, and tax. Second, analysts complain that the streaming dashboard consistently reads 1.6 percent below the next-day batch reconciliation. The new tech lead is asked to design a single rollout that handles both.

The pricing change is a destructive schema change: a rename from a flat field to a nested object. The expand-contract pattern applies. Phase 1 adds the pricing object alongside fare_cents. Phase 2 dual-writes both. Phase 3 migrates the streaming aggregation and the dashboard to read pricing.base. Phase 4 drops fare_cents.
The schema registry from this lesson enforces the additive nature of phase 1. Without the registry, a producer could ship phase 4 by accident, breaking every consumer that had not yet migrated. The registry is the safety net that makes expand-contract operationally feasible.
The 1.6 percent gap is the streaming pipeline's allowed-lateness tail. Events arriving more than 60 minutes after their event time are dropped from the streaming output but picked up the next day by the batch job that scans 7 days of raw data. The fix is either to widen the streaming allowed-lateness window (more state) or to accept the gap and rely on the batch reconciliation, which is the topic of the advanced tier.
Both fixes share the same operational discipline: produce the change, validate it in dual mode, and contract once consumers are ready. This is the same idea applied to schemas (expand-contract) and to time domains (watermarks plus allowed lateness) plus the rerun window from the prior beginner tier and the storage-level schema evolution covered in the storage-layer lesson.
KEY TAKEAWAYS
Schema registries make compatibility a system property, not a wiki note: the registry rejects incompatible producer changes before they reach the topic.
Expand-contract is the standard rollout for breaking changes: expand the schema, dual-write, migrate consumers, contract. Each phase is independently rollback-able.
Event time and processing time are not interchangeable: consumer queries that reason about real-world history require event time, even when processing time is cheaper.
Watermarks are the engine's promise about no-earlier events: they let windows close deterministically and bound the state the engine has to hold.
Allowed lateness extends the window to absorb late tails: the cost is engine state proportional to lateness duration; the gain is that the slightly-late tail is no longer silently dropped.

Schema registries and watermarks turn ad-hoc tolerance into engineered guarantees

Category
Pipeline Architecture
Difficulty
intermediate
Duration
32 minutes
Challenges
0 hands-on challenges

Topics covered: Schema Registries: Where They Live, The Expand-Contract Pattern, Event Time Versus Processing Time, Watermarks: The Engine's Promise, 1-Hour Allowed Lateness

Lesson Sections

  1. Schema Registries: Where They Live (concepts: paSchemaRegistry)

    The beginner tier treated schemas as an implicit contract. The intermediate tier turns that contract into a system of record. A schema registry is a service that stores schema definitions, assigns each one an immutable version, and runs compatibility checks before accepting a new version. Producers register the schema before publishing data under it. Consumers fetch the schema by version when they read. The registry is the single source of truth for what shape data should have at each moment in

  2. The Expand-Contract Pattern (concepts: paExpandContract)

    The additive default works for most schema changes. It does not work for the breaking ones. When a column must be renamed, dropped, or restructured, every consumer downstream has to adapt. Doing this in a single deploy is impossible at any reasonable scale. The expand-contract pattern is the production technique for rolling out a breaking change without a flag day. It splits the change into four phases that allow producers and consumers to migrate independently. The Four Phases Each phase is a d

  3. Event Time Versus Processing Time (concepts: paEventTimeVsProcessingTime)

    The beginner tier introduced event time and processing time as a pair. The intermediate tier turns the distinction into the foundation of every streaming aggregation. The choice of which time domain to use is not a stylistic preference. It is the single most consequential decision in a streaming pipeline. It determines what numbers the consumer sees, how the system handles late events, and how much state the engine has to keep around. The Two Domains, Restated Most production streaming systems h

  4. Watermarks: The Engine's Promise (concepts: paWatermarks)

    If a streaming engine waits forever for late events, no window ever closes and no result is ever produced. If it does not wait at all, every late event is dropped and every aggregation is wrong. The watermark is the compromise. A watermark is a timestamp that the engine emits, periodically, declaring that no events with event_time earlier than the watermark will be processed against an open window. The watermark is the engine's commitment to a closing rule. What a Watermark Actually Is A waterma

  5. 1-Hour Allowed Lateness (concepts: paAllowedLateness, paWatermarks)

    The pieces from the prior sections combine into a single concrete configuration. A streaming aggregation tumbles in 5-minute event-time windows, with a watermark using bounded out-of-orderness of 60 seconds, plus an allowed lateness of 60 minutes. This configuration shows up in production at scale; the numbers are tuned per workload. The walkthrough below traces what happens to events that arrive on time, slightly late, and very late. The Configuration The configuration says four things. Windows