Skip to content

Home / icechunk-python / performance

Performance#

Info

This is advanced material, and you will need it only if you have arrays with more than a million chunks. Icechunk aims to provide an excellent experience out of the box.

Scalability#

Icechunk is designed to be cloud native, making it able to take advantage of the horizontal scaling of cloud providers. To learn more, check out this blog post which explores just how well Icechunk can perform when matched with AWS S3.

Cold buckets and repos#

Modern object stores usually reshard their buckets on-the-fly, based on perceived load. The strategies they use are not published and very hard to discover. The details are not super important anyway, the important take away is that on new buckets and even on new repositories, the scalability of the object store may not be great from the start. You are expected to slowly ramp up load, as you write data to the repository.

Once you have applied consistently high write/read load to a repository for a few minutes, the object store will usually reshard your bucket allowing for more load. While this resharding happens, different object stores can respond in different ways. For example, S3 returns 5xx errors with a "SlowDown" indication. GCS returns 429 responses.

Icechunk helps this process by retrying failed requests with an exponential backoff. In our experience, the default configuration is enough to ingest into a fresh bucket using around 100 machines. But if this is not the case for you, you can tune the retry configuration using StorageRetriesSettings.

To learn more about how Icechunk manages object store prefixes, read our blog post on Icechunk scalability.

Warning

Currently, Icechunk implementation of retry logic during resharding is not working properly on GCS. We have a pull request open to one of Icechunk's dependencies that will solve this. In the meantime, if you get 429 errors from your Google bucket, please lower concurrency and try again. Increase concurrency slowly until errors disappear.

Preloading manifests#

Coming Soon.

Splitting manifests#

Icechunk stores chunk references in a chunk manifest file stored in manifests/. By default, Icechunk stores all chunk references in a single manifest file per array. For very large arrays (millions of chunks), these files can get quite large. Requesting even a single chunk will require downloading the entire manifest. In some cases, this can result in a slow time-to-first-byte or large memory usage. Similarly, appending a small amount of data to a large array requires downloading and rewriting the entire manifest.

Note

Note that the chunk sizes in the following examples are tiny for demonstration purposes.

Configuring splitting#

To solve this issue, Icechunk lets you __split_ the manifest files by specifying a ManifestSplittingConfig.

import icechunk as ic
from icechunk import ManifestSplitCondition, ManifestSplittingConfig, ManifestSplitDimCondition

split_config = ManifestSplittingConfig.from_dict(
    {
        ManifestSplitCondition.AnyArray(): {
            ManifestSplitDimCondition.DimensionName("time"): 365 * 24
        }
    }
)
repo_config = ic.RepositoryConfig(
    manifest=ic.ManifestConfig(splitting=split_config),
)

Then pass the config to Repository.open or Repository.create

repo = ic.Repository.open(..., config=repo_config)

Important

Once you find a splitting configuration you like, remember to persist it on-disk using repo.save_config.

This particular example splits manifests so that each manifest contains 365 * 24 chunks along the time dimension, and every chunk along every other dimension in a single file.

Options for specifying the arrays whose manifest you want to split are:

  1. ManifestSplitCondition.name_matches takes a regular expression used to match an array's name;
  2. ManifestSplitCondition.path_matches takes a regular expression used to match an array's path;
  3. ManifestSplitCondition.and_conditions to combine (1), (2), and (4) together; and
  4. ManifestSplitCondition.or_conditions to combine (1), (2), and (3) together.

And and Or may be used to combine multiple path and/or name matches. For example,

array_condition = ManifestSplitCondition.or_conditions(
    [
        ManifestSplitCondition.name_matches("temperature"),
        ManifestSplitCondition.name_matches("salinity"),
    ]
)
sconfig = ManifestSplittingConfig.from_dict(
    {array_condition: {ManifestSplitDimCondition.DimensionName("longitude"): 3}}
)

Options for specifying how to split along a specific axis or dimension are:

  1. ManifestSplitDimCondition.Axis takes an integer axis;
  2. ManifestSplitDimCondition.DimensionName takes a regular expression used to match the dimension names of the array;
  3. ManifestSplitDimCondition.Any matches any remaining dimension name or axis.

For example, for an array with dimensions time, latitude, longitude, the following config

from icechunk import ManifestSplitDimCondition

{
    ManifestSplitDimCondition.DimensionName("longitude"): 3,
    ManifestSplitDimCondition.Axis(1): 2,
    ManifestSplitDimCondition.Any(): 1,
}

will result in splitting manifests so that each manifest contains (3 longitude chunks x 2 latitude chunks x 1 time chunk) = 6 chunks per manifest file.

Note

Python dictionaries preserve insertion order, so the first condition encountered takes priority.

Splitting behaviour#

By default, Icechunk minimizes the number of chunk refs that are written in a single commit.

Consider this simple example: a 1D array with split size 1 along axis 0.

import random

import icechunk as ic
from icechunk import (
    ManifestSplitCondition,
    ManifestSplitDimCondition,
    ManifestSplittingConfig,
)

split_config = ManifestSplittingConfig.from_dict(
    {ManifestSplitCondition.AnyArray(): {ManifestSplitDimCondition.Any(): 1}}
)
repo_config = ic.RepositoryConfig(manifest=ic.ManifestConfig(splitting=split_config))

storage = ic.local_filesystem_storage(
    f"/tmp/splitting-test/{random.randint(100, 20000)}"
)
# Note any config passed to Repository.create is persisted to disk.
repo = ic.Repository.create(storage, config=repo_config)

Create an array

import zarr

session = repo.writable_session("main")
root = zarr.group(session.store)
name = "array"
array = root.create_array(name=name, shape=(10,), dtype=int, chunks=(1,))

Now lets write 5 chunk references

import numpy as np

array[:5] = np.arange(10, 15)
print(session.status())

Groups created: /

Arrays created: /array

Chunks updated: /array: [0] [1] [2] [3] [4]

And commit

snap = session.commit("Add 5 chunks")

Use repo.lookup_snapshot to examine the manifests associated with a Snapshot

print(repo.lookup_snapshot(snap).manifests)

[ManifestFileInfo(id="1SMFQTDTMSQ051WT91M0", size_bytes=141, num_chunk_refs=1), ManifestFileInfo(id="KFAHEPFC57FWKBXXVZP0", size_bytes=141, num_chunk_refs=1), ManifestFileInfo(id="M95GM9GJ49PCFWT5QQP0", size_bytes=142, num_chunk_refs=1), ManifestFileInfo(id="VEM4Y1T6729701N4TRCG", size_bytes=144, num_chunk_refs=1), ManifestFileInfo(id="WWEGZ19YRGSPACY1WAM0", size_bytes=142, num_chunk_refs=1)]

Let's open the Repository again with a different splitting config --- where 5 chunk references are in a single manifest.

split_config = ManifestSplittingConfig.from_dict(
    {ManifestSplitCondition.AnyArray(): {ManifestSplitDimCondition.Any(): 5}}
)
repo_config = ic.RepositoryConfig(manifest=ic.ManifestConfig(splitting=split_config))
new_repo = ic.Repository.open(storage, config=repo_config)
print(new_repo.config.manifest)

ManifestConfig(preload=None, splitting=ManifestSplittingConfig((AnyArray, [(Any, 5)])))

Now let's append data.

session = new_repo.writable_session("main")
array = zarr.open_array(session.store, path=name, mode="a")
array[6:9] = [1, 2, 3]
print(session.status())

Chunks updated: /array: [6] [7] [8]

snap2 = session.commit("appended data")
repo.lookup_snapshot(snap2).manifests

Look carefully, only one new manifest with the 3 new chunk refs has been written.

Why?

Icechunk minimizes how many chunk references are rewritten at each commit (to save time and memory). The previous splitting configuration (split size of 1) results in manifests that are compatible with the current configuration (split size of 5) because the bounding box of every existing manifest slice(0, 1), slice(1, 2), etc. is fully contained in the bounding boxes implied by the new configuration [slice(0, 5), slice(5, 10)].

Now for a more complex example: let's rewrite the references in slice(3,7) i.e. spanning the break in manifests

session = new_repo.writable_session("main")
array = zarr.open_array(session.store, path=name, mode="a")
array[3:7] = [1, 2, 3, 4]
print(session.status())

Chunks updated: /array: [3] [4] [5] [6]

snap3 = session.commit("rewrite [3,7)")
print(repo.lookup_snapshot(snap3).manifests)

[ManifestFileInfo(id="2PJ63R49NHKVG3WCKTH0", size_bytes=190, num_chunk_refs=5), ManifestFileInfo(id="C4QMVT57B5RJ528A7SF0", size_bytes=180, num_chunk_refs=4)]

This ends up rewriting all refs to two new manifests.

Rewriting manifests#

Remember, by default Icechunk only writes one manifest per array regardless of size. For large enough arrays, you might see a relative performance hit while committing a new update (e.g. an append), or when reading from a Repository object that was just created. At that point, you will want to experiment with different manifest split configurations.

To force Icechunk to rewrite all chunk refs to the current splitting configuration use rewrite_manifests

To illustrate, we will use a split size of 3 --- for the current example this will consolidate to two manifests.

split_config = ManifestSplittingConfig.from_dict(
    {ManifestSplitCondition.AnyArray(): {ManifestSplitDimCondition.Any(): 3}}
)
repo_config = ic.RepositoryConfig(
    manifest=ic.ManifestConfig(splitting=split_config),
)
new_repo = ic.Repository.open(storage, config=repo_config)

snap4 = new_repo.rewrite_manifests(
    f"rewrite_manifests with new config", branch="main"
)

rewrite_snapshots will create a new commit on branch with the provided message.

print(repo.lookup_snapshot(snap4).manifests)

[ManifestFileInfo(id="SMGZM1E32JYNZQH58S20", size_bytes=170, num_chunk_refs=3), ManifestFileInfo(id="VE9S4KBFRPWEFGTQPHZG", size_bytes=170, num_chunk_refs=3), ManifestFileInfo(id="XEF65F8V69FSHEPTRXE0", size_bytes=172, num_chunk_refs=3)]

The splitting configuration is saved in the snapshot metadata.

print(repo.lookup_snapshot(snap4).metadata)

{'splitting_config': {'split_sizes': [['any_array', [{'condition': 'Any', 'num_chunks': 3}]]]}}

Important

Once you find a splitting configuration you like, remember to persist it on-disk using repo.save_config.

Example workflow#

Here is an example workflow for experimenting with splitting

# first define a new config
split_config = ManifestSplittingConfig.from_dict(
    {ManifestSplitCondition.AnyArray(): {ManifestSplitDimCondition.Any(): 5}}
)
repo_config = ic.RepositoryConfig(
    manifest=ic.ManifestConfig(splitting=split_config),
)
# open the repo with the new config.
repo = ic.Repository.open(storage, config=repo_config)

We will rewrite the manifests on a different branch

repo.create_branch("split-experiment-1", repo.lookup_branch("main"))
snap = repo.rewrite_manifests(
    f"rewrite_manifests with new config", branch="split-experiment-1"
)
print(repo.lookup_snapshot(snap).manifests)

[ManifestFileInfo(id="PFYVVAWB0PZE6FND2WCG", size_bytes=189, num_chunk_refs=5), ManifestFileInfo(id="ZXCDPWMP231A2WXEC6VG", size_bytes=176, num_chunk_refs=4)]

Now benchmark reads on main vs split-experiment-1
store = repo.readonly_session("main").store
store_split = repo.readonly_session("split-experiment-1").store
# ...
Assume we decided the configuration on split-experiment-1 was good. First we persist that configuration to disk
repo.save_config()

Now point the main branch to the commit with rewritten manifests

repo.reset_branch("main", repo.lookup_branch("split-experiment-1"))

Notice that the persisted config is restored when opening a Repository

print(ic.Repository.open(storage).config.manifest)

ManifestConfig(preload=None, splitting=ManifestSplittingConfig((AnyArray, [(Any, 5)])))