Backfilling Streams In Kappa Architecture At Netflix
Background - Event Streaming
Netflix's DEs / DPEs build data systems to power data analytics and ML algorithms such as their Real time Merchant Impression (RMI) Flink App.
RMI Pipeline
Joins Impression events
with Playback events
in real-time to attributes plays to impressions to compute Take Rate
.

Why we need Backfills
Event streaming operations and apps can fail due to various reasons:
Source / Sink failures
Dependent service failures
Upstream data changes
Thus we need to backfill to:
Correct wrong data
Backfill missing data
Bootstrap state for app to continue consuming live data
How Should we Backfill?
1. Replaying Source Events
Easiest option since we are simply re-running the streaming job to reprocess events from a problematic period. However, we have to deal with:
Troubleshooting - Can take hours or days and source data can expire
Increasing message queue retention - Very expensive
Row based formats
(E.g. Avro) have lower compression rate v.s.column based
Parquet / ORCLow-latency storage solutions (E.g. EBS gp2) are more costly (v.s. S3)
It would cost Netflix $93M/year to retain 30 days of data generated by all apps. So can we store it else where? Answer: Data Lake
2. Lambda Architecture
Building on longer message queue retention as a viable option we can use a Data Lake to service this together with the Lambda Architecture.
Data Lake for Longer Message Retention
Data Lakes are a central location for storing large amounts of data in its native raw format using a flat architecture and object storage
Why data lake?
Cost Effective - Compressed columnar formats like Parquet
File Pruning
Schema Evolution
Engine Agnostic
Etc.

We can build and maintain a batch-based application that is equivalent to the streaming application reading from our Data Lake Iceberg Tables and use it for Backfilling given its high throughput.

However the downside is:
Initial development of batch jobs can take days or weeks especially with data validation between the Batch and Streaming applications
Continuous engineering effort to ensure that the Batch app is up to date / in sync with the Streaming app.
3. Unified Batch and Streaming Framework
Use frameworks that supports both batch and streaming modes such as Flink or Beam.
However, Flink still requires significant code changes to run in batch mode and Beam only has partial support for state, timers and watermarks.
Summary of Backfilling Options

Backfilling in Kappa with Data Lake
Goals
Obtain a generic solution that works for all classes of applications (Not tied to specific business logic)
Minimal code changes to add support
Scale horizontally to backfill quickly
Overview

Ingesting Streaming Data into a Data Lake
We batch our data instead of storing them individually to:
Efficient compression format compatible - Parquet / Orc
Avoid small file problem - Makes reading more efficient

But how do we backfill?
Challenge 1: Applications assume Ordering

Strawman A: Read Events from Files filtered by Backfill Dates
Positive - Scales horizontally to backfill quickly
Negative - Does not work for all types of applications
Strawman B: Order all files and read them in order
Positive - Guarantees similar ordering semantics as Live Traffic
Negative - Does not scale horizontally (Bottlenecked by parallelism)
Netflix Solution: Use lateness tolerated by the App (Read files while maintaining lateness constraints)
Positives:
Gurantees ordering that works for the application (Use event-time semantics unlike Kafka's strict ordering)
Scales horizontally to finish backfill quickly
Alignment across sources to avoid state size explosion (See below)
tolerate-lateness
Challenge 2: Reading from Multiple Sources
When we read from multiple sources like a playback source
and an impression source
, one source can have significantly more data than the other ('Data Skew').
Thus during a backfill operation this could lead to a Watermark Skew resulting in a State size explosion
. Consequently resulting in slow checkpoints or checkpoint timeouts.
Netflix solves this by Coordinating watermarks and using a Global Watermark

So despite Source 3
haveing a IW
of 10, it can only read up till a lateness of 5 minutes since it is bounded by the earliest watermark across all sources. Thus avoiding the aforementioned skew issues.
Adopting Kappa Backfill
Overview of Production Applcation

This enables minimal code changes as shown:
@SpringBootApplication
class PersonalizationStreamingApp {
@Bean
def flinkJob(
@Source("impression-source") impressionSource: SourceBuilder[Record[ImpressionEvent]],
@Source("playback-source") playbackSource: SourceBuilder[Record[PlaybackEvent]],
@Sink("summary-sink") summarySink: SinkBuilder[ImpressionPlaySummary]) {...}
@Bean
def liveImpressionSourceConfigurer(): KafkaSourceConfigurer[Record[ImpressionEvent]] = new KafkaSourceConfigurer("live-impression-source", KafkaCirceDesrializer[ImpressionEvent])
@Bean
def backfillImpressionSourceConfigurer(): IcebergSourceConfigurer[Record[ImpressionEvent]] = new IcebergSourceConfigurer(
"backfill-impression-source",
Avro.deserializerFactory(ImpressionEvent)
)
}
Note: In-memory representation of the Iceberg source is consistent with the Kafka Source thus no change in processing logic is necessary plus cost savings with Apache Iceberg
Results of Kappa Backfill
High throughput - 24 hours of data takes only ~5 hours
Consistent data quality: Backfill output matches 99.9% of production
Lessons Learnt
Backfilling window and configs depend on application logic - E.g. Stateful jobs will require sufficient amount of data to build up sufficient state
Backfilling job needs separate tuning form Production job - During backfills data flows through at a higher rate so we can allocate more resources
Backfill can product more correct results than in production - Due to how Flink watermarks work etc.
Reference
Netflix @ Data+AI Summit 2022 by Databricks: https://www.youtube.com/watch?v=aCIWI5k7deM
Last updated