Apache Iceberg 101
What is Iceberg?
Apache Iceberg is an open table format
designed for PB scale tables that helps us manage, organise and track all of our files (E.g. ORC
or Parquet
) that comprise a table with the cloud in mind (i.e. Object storage like S3
).
Table vs File Formats
File formats help us modify and skip data within a single file. Table formats are the same (i.e. HIVE
) just applied to the table level (multiple files).
Why Iceberg?
Data engineers at Netflix did not want spend time to:
Rewrite SQL queries
Reorder joins by hand to improve performance
Tune parquet files based on block size / dictionary page size suitable for their respective datasets
in order to optimize their pipeline performance given PB scale data and wanted to focus on delivering value to their clients. So performance tuning etc. should be more automatic
without heavy intervention by DEs. However, there are a few stumbling blocks:
Unsafe operations everywhere
Writing to multiple partitions
Renaming a column (E.g. We cannot rename a Parquet column, if we do we need to backfill)
Interaction with object stores (like
S3
) cause issues likeConsistency problems
Performance problems
Endless scale challenges
These challenges and bottlenecks can be addressed by a table format (i.e. Iceberg)
Current State of Table Formats - Hive & Hive Metastore
In HIVE tables, a given table's files are organised such:
Since Hive Metastore
(database) tracks data at the directory
(partition
) level, it thus needs to perform file list operations when working with data in that table.
This leads to a couple for problems:
State is being kept in both the metastore and in a file system
Changes are not atomic without locking since we need to make a thousand files go live at the exact same moment
Consequently, this requires too many directory listing calls
O(n) listing calls, n = Number of matching partitions thus
Performance problems
Eventual Consistency
breaks correctness
Data may also appear missing during such operations on eventually consistency object stores like S3
.
Iceberg's Goals
Serializable Isolation - Reads will be isolated from concurrent writes and will always use a committed snapshot of a table's data + Writes will support removing and adding files in a single operation and are never partially visible. Readers will not acquire locks
Speed - Ops will use O(1) remote calls to plan the files for scanning and not O(N)
Scale - Job planning will be handled primarily by clients and not bottleneck on a central metadata store + Metadata will include information needed for cost-based optimisation
Evolution - Tables support full schema and partition spec evolution
Dependable Types
Storage Separation - Partitioning will be a table configuration with Reads planned using predicates on data values and not partition values
Formats - Underlying data file formats will support identical schema evolution rules and types
Iceberg's Design
Key Ideas
Track all files in a table over time using a persistent tree structure
A snapshot is a complete set of files in a table
Each write produces and commits a new snapshot
Thus expensive list operations are no longer needed when querying for instance. (i.e. O(N) file listing becomes O(1) RPC to read a snapshot, thus Hive Metastore is no longer a bottleneck + Pruning is also available to speed up query planning)
Gain atomicity from tacking snapshots
Readers will use the current snapshot or historical ones when time travelling
Writers optimistically create new snapshots then commit
Thus isolated Reads and Writes without locking the table
NOTE: Actual implementation is much more complicated since if we were to do the above it would be incredibly slow on Writes since we are re-writing the entire table state every single time. But TLDR, there is some reuse across each snapshot.
Iceberg Format Overview
Snapshot
: State of a table at some point in time including the set of all data files that make up the contents of the table. Data files are stored across multiple manifest files and manifests for a snapshot are listed in a single manifest list file. Every write / delete produces a new snapshot that reuses as much of the previous snapshot's metadata tree as possible toavoid high write volumes
Snapshot Metadata File
: Metadata about the table (E.g. Schema, Partition Specification, Path to the Manifest List)Manifest List
: Metadata file that contains an entry for each manifest file associated with the snapshot. Each entry also includes a path to the manifest file & metadata (i.e. Partitions value ranges and Data file counts) allowing us to avoid reading manifests that are not required for a given operation. One per snapshot.Manifest File
: Contains a list of paths to related data or delete files with each entry for a data file including some metadata about the file (i.e. column statistics) such as per-column upper and lower bounds useful for pruning files during scan planning. A subset of a snapshot.Data File
: Physical data files (E.g. file01.parquet or file02.orc or file03.avro). Contains rows of a table.Delete File
: Encodes rows of a table that are deleted by position or data values.
Key Features
Summary of the most essential bits that Iceberg brings to the table
1. Evolution
In-place table evolution is supported where a table schema can evolve even in nested structures and partition layouts can be changed when the data volume changes without any costly migration operations. (E.g. In Hive we need to rewrite to a new table if we change the names of the tables or change the granularity from daily to an hourly partition layout)
Schema Evolution
Iceberg support the following schema evolution changes by updating metadata (data files
are nout updated and thus do not need to be rewritten):
Add
Drop
Rename
Update
Reorder
Correctness
through schema evolution is guaranteed to be independent
and free of side-effects
without file rewrites:
Added columns never read existing values from another column
Dropping a column or field does not change the values in any other column
Updating a column or field does not change values in any other column
Changing the order of columns or fields in a strcut does not change the values associated with a column or field name
Each column in a table is tracked using a unique id
so that it never violates #1 and #2 above.
Partition Evolution
Partitioning
is crucial to making queries faster by grouping similar rows together when writing
. For example a query for logs between 10 and 12 AM as such
where logs is partitioned by the date of the event_time will group our log events into files with the same event date. This allows Iceberg to skip files with other dates that do not contain useful data.
Partitioning can be updated in an existing table as queries do not reference partition values directly
. When a partition spec evolves, the old data
written with an earlier spec
remains unchanged
. New data
is written using the new spec
in a new layout.
Metadata for each partition version is kept separately and thus gives us split planning
when querying where each partition layout plans files separately using the filter it derives for that specific partition layout. Allowing for partition layouts to coexist
in the same table as such:
Note: Each data file is sotred with a partition tuple (tuple or struct of partition data) where all values in a partition tuple are the same for all rows stored in a datafile. These are produced by transforming values from row data using a partition spec which are stored unmodified unlike in Hive
Hidden Partitioning in Iceberg vs Hive Partitions
In HIVE, we often have to explicitly specify a partition (which shows up as a column) for instance when writing data:
consequently our queries performed on the logs
table must use an event_date
filter on top of the event_time
filter. If event_date
were not included, Hive would scan through every file belonging to the logs
table.
Thus, this leads to a number of problems:
Hive cannot validate partition values
User has to write the queries correctly (E.g. specify the correct date format)
Working queries are tied to the table's partitioning scheme and cannot be changed without breaking production code
In Iceberg, we use hidden partitioning
where partiton values are produced by taking a column value and optionally transforming it. That is, Iceberg is responsible for converting event_time
into event_date
and keeping track of the relationship.
Thus, there is no need for user-maintained partition columns to be specified, thus we just need to query the data we need and Iceberg will automatically prune out files
that do not contain matching data
.
Sort Order Evolution
Same as partition spec. Old data written maintains old sort order.
2. Time Travel & Version Rollback
Iceberg keeps a log of previous snapshots of a table as shown in the Design
section above. This enables us to query from past snapshots or rollback tables.
3. Performance
Iceberg enables high performance with multi-PB tables being able to be read from a single node without the need for a distributed SQL engine to sift through table metadata.
Scan Planning
Fast scan planning in Iceberg enables it to fit on a single node since its metadata can be used to prune unnecessary metadata files that do not contain matching data. Thus enabling 1) Lower latency queries (Eliminating distributed scans to plan a distributed scan) and 2) Access from any client (Standalone processes can read data directly from Iceberg tables). This is achieved using:
Metadata Filtering
This occurs using 1)
Manifest files
(stores list of data files + partition data + column-level stats) and 2)Manifest lists
(stores a snapshot's list of manifests + range of values for each partition field)Fast scan planning occurs by first filtering manifests using the partiton value ranges in the
manifest list
Then it reads each manifest to obtain the data files with the
manifest list
acting as an index over manifests thus not needing to readall manifests
.
Data Filtering
Manifest files
include a tuple of partition data + column-level stats for each data fileDuring planning query predicates are converted to predicates on the partition data and applied to filter the data files
Column-level stats are used to eliminate files that cannot match the query predicate
4. Reliability
To overcome the reliability issues (lack of atomicity) in Hive tables which tracks data files using both a central metastore for partitions and a file system for individual files making atomic changes impossible (i.e. S3 may return incorrect results due to the use of listing files to reconstruct the state of the table - O(Number of Partitions))
Reliability guarantees in Iceberg:
Serializable Isolation
: All changes to the table occur in a linear history of atomic table updatesReliable Reads
: Always read using a consistent snapshot of the table without holding a lockVersion history and Rollback
: Snapshots are kept to rollback if latest job produces bad dataSafe file-level Operations
: With atomic changes we can safely compact small files + append late data to tables
Performance benefits:
O(1) RPCs to Plan
: No need to list O(n) directions in a table to plan jobs since we can read in O(1) time the latest snapshot.Distributed Planning
: File pruning and predicate push-down are distributed to jobs, removing the metastore as a bottleneckFile Granularity Partitioning
: Distributed planning + O(1) RPC calls remove current barriers to finer-grained partitioning
Concurrent Writes
Iceberg uses optimistic concurrency
where each writer assumes no other writers are operating and writes to a new table metadata before committing by swapping the new table metadata file for the existing metadata file. If it failes a retry will occur using the new current table state (Iceberg reduces cost here by maximising the work that can be reused across retries + Validates after a conflict to ensure that assumptions are met by the current table state and thus is safe to re-apply commit and its actions)
Java API Basics
Creating a Schema
Creating a Iceberg Table
Using the
Hive Catalog
Use
Hive catalog
to connect to aHive MetaStore
to keep track of Iceberg Table.Using the Hadoop Catalog
Does not need to connect to the Hive MetaStore but can only be used with HDFS or file systems which support atomic renaming. Concurrent writes are not safe with a local FS or S3
Using Hadoop Tables
Supports tables that are stored in a directory in HDFS but once again concurrent writes with a Hadoop table aer not safe when stored in the local FS or S3.
Partition Spec & Evolution
Partition specs
describes how Iceberg should group records into their respective data files and can be created for a table's schema using a builder (i.e. How partition values are derived from data fields). A partition spec
has a list of fields that consist of:
Source Column Id
from the table'sschema
of primitive type (no Maps or Lists)Partition Field Id
used to identify a partition field and is unique within a partition spec (or ALL partition specs for v2 table metadata)Transform
applied to the source column to produce a partition valuePartition Name
Example for a logs
table partitioned by the hour
of the event's timestamp and by log level
To update a partition spec we can do the following
OR in sql
Sort Order Spec & Evolution
A sort field
comprises:
Source Column id
Transform
(Same asPartition Transforms
)Sort Direction
-Asc
orDesc
onlyNull Order
- Describes the order of null values when sorted. Can only benulls-first
ornulls-last
Time Travel
To view all snapshots with spark:
OR in sql
Rollbacks
Miscellaneous
Maintenance Best Practices
1. Expiring Snapshots
Each write to an Iceberg table generates a new snapshot
of the table for time-travelling
and rollbacks
. These snapshots
accumulate until they are expired to remove data files that are no longer needed + keep the size of table metadata small. For example:
2. Removing Old Metadata Files
Iceberg tracks table metadata using JSON
files which are produced upon each change to the table to ensure atomicity
. However, this can pile up when there are frequent commits like in streaming jobs. We can autmatically clean them up by configuring write.metadata.delete-after-commit.enabled=true
which retains metadata up till write.metadata.previous-versions-max
3. Deletion of Orphaned Files
In distributed processing task and job failures may leave files that are not referenced by table metadata. Thus we can clean these up based on a tables location as such
Note: Do not remove orphan files with a retention interval < time expected for any write to complete.
4. Compaction of Data Files
Similar to the small files issue we have in Hive, streaming queries may similarly produce small data files that can and should be compacted into larger files
+ Some tables can benefit from rewriting manifest files
to make locating data much faster. Compaction can be done as follows:
5. Rewriting Manifests
Manifest list metadata is used to speed up query planning and are automatically compacted in the order in which they are added. This speeds up queries when the write pattern aligns with the read filters. However this may not always be the case. To rewrite manifests for speed ups we can do the following which rewrites small manifests and groups data files by the first partition field.
Type Conversion
Hive to Iceberg
Hive
Iceberg
Notes
boolean
boolean
short
integer
auto-conversion
byte
integer
auto-conversion
integer
integer
long
long
float
float
double
double
date
date
timestamp
timestamp without timezone
timestamplocaltz
timestamp with timezone
Hive 3 only
interval_year_month
not supported
interval_day_time
not supported
char
string
auto-conversion
varchar
string
auto-conversion
string
string
binary
binary
decimal
decimal
struct
struct
list
list
map
map
union
not supported
References
Last updated