Skip to content

Home / icechunk-python / dask

Distributed Writes with dask#

You can use Icechunk in conjunction with Xarray and Dask to perform large-scale distributed writes from a multi-node cluster. However, because of how Icechunk works, it's not possible to use the existing Dask.Array.to_zarr or Xarray.Dataset.to_zarr functions with either the Dask multiprocessing or distributed schedulers. (It is fine with the multithreaded scheduler.)

Instead, Icechunk provides its own specialized functions to make distributed writes with Dask and Xarray. This page explains how to use these specialized functions.

Start with an icechunk store and dask arrays.

import icechunk
import tempfile

# initialize the icechunk store
storage = icechunk.local_filesystem_storage(tempfile.TemporaryDirectory().name)
icechunk_repo = icechunk.Repository.create(storage)
icechunk_session = icechunk_repo.writable_session("main")

Icechunk + Dask#

Use icechunk.dask.store_dask to write a Dask array to an Icechunk store. The API follows that of dask.array.store without support for the compute kwarg.

First create a dask array to write:

import dask.array as da
shape = (100, 100)
dask_chunks = (20, 20)
dask_array = da.random.random(shape, chunks=dask_chunks)

Now create the Zarr array you will write to.

import zarr

zarr_chunks = (10, 10)
group = zarr.group(store=icechunk_session.store, overwrite=True)

zarray = group.create_array(
    "array",
    shape=shape,
    chunks=zarr_chunks,
    dtype="f8",
    fill_value=float("nan"),
)
Note that the chunks in the store are a divisor of the dask chunks. This means each individual write task is independent, and will not conflict. It is your responsibility to ensure that such conflicts are avoided.

Now write

import icechunk.dask

icechunk.dask.store_dask(
    icechunk_session,
    sources=[dask_array],
    targets=[zarray]
)

Finally commit your changes!

print(icechunk_session.commit("wrote a dask array!"))

5S5ZXVDAM04B9GEAB0YG

Distributed#

In distributed contexts where the Session, and Zarr Array objects are sent across the network, you must opt-in to successful pickling of a writable store. This will happen when you have initialized a dask cluster. This will be case if you have initialized a distributed.Client. icechunk.dask.store_dask takes care of the hard bit of merging Sessions but it is required that you opt-in to pickling prior to creating the target Zarr array objects.

Here is an example:

from distributed import Client
client = Client()

import icechunk.dask

# start a new session. Old session is readonly after committing

icechunk_session = icechunk_repo.writable_session("main")
zarr_chunks = (10, 10)
with icechunk_session.allow_pickling():
    group = zarr.group(
        store=icechunk_session.store,
        overwrite=True
    )

    zarray = group.create_array(
        "array",
        shape=shape,
        chunks=zarr_chunks,
        dtype="f8",
        fill_value=float("nan"),
    )

    icechunk.dask.store_dask(
        icechunk_session,
        sources=[dask_array],
        targets=[zarray]
    )
print(icechunk_session.commit("wrote a dask array!"))
7GJMG4Q8FPYJ5G118NR0

Icechunk + Dask + Xarray#

The icechunk.xarray.to_icechunk is functionally identical to Xarray's Dataset.to_zarr, including many of the same keyword arguments. Notably the compute kwarg is not supported.

Warning

When using Xarray, Icechunk in a Dask Distributed context, you must use to_icechunk so that the Session has a record of the writes that are executed remotely. Using to_zarr in such cases, will result in the local Session having no record of remote writes, and a meaningless commit.

Now roundtrip an xarray dataset

import icechunk.xarray
import xarray as xr

icechunk_session = icechunk_repo.writable_session("main")
dataset = xr.tutorial.open_dataset(
    "rasm",
    chunks={"time": 1}).isel(time=slice(24)
    )

# `to_icechunk` takes care of "allow_pickling" for you
icechunk.xarray.to_icechunk(dataset, icechunk_session, mode="w")

with icechunk_session.allow_pickling():
    roundtripped = xr.open_zarr(icechunk_session.store, consolidated=False)
    print(dataset.identical(roundtripped))
True

Finally commit your changes!

print(icechunk_session.commit("wrote an Xarray dataset!"))
M7RB2JYGE9M7XQ9SC94G