Skip to content

Home / icechunk-python / dask

Distributed Writes with dask#

You can use Icechunk in conjunction with Xarray and Dask to perform large-scale distributed writes from a multi-node cluster. However, because of how Icechunk works, it's not possible to use the existing Dask.Array.to_zarr or Xarray.Dataset.to_zarr functions with either the Dask multiprocessing or distributed schedulers. (It is fine with the multithreaded scheduler.)

Instead, Icechunk provides its own specialized functions to make distributed writes with Dask and Xarray. This page explains how to use these specialized functions.

Start with an icechunk store and dask arrays.

import icechunk
import tempfile

# initialize the icechunk store
storage = icechunk.local_filesystem_storage(tempfile.TemporaryDirectory().name)
icechunk_repo = icechunk.Repository.create(storage)
icechunk_session = icechunk_repo.writable_session("main")

Icechunk + Dask#

Use icechunk.dask.store_dask to write a Dask array to an Icechunk store. The API follows that of dask.array.store without support for the compute kwarg.

First create a dask array to write:

import dask.array as da
shape = (100, 100)
dask_chunks = (20, 20)
dask_array = da.random.random(shape, chunks=dask_chunks)

Now create the Zarr array you will write to.

import zarr

zarr_chunks = (10, 10)
group = zarr.group(store=icechunk_session.store, overwrite=True)

zarray = group.create_array(
    "array",
    shape=shape,
    chunks=zarr_chunks,
    dtype="f8",
    fill_value=float("nan"),
)
Note that the chunks in the store are a divisor of the dask chunks. This means each individual write task is independent, and will not conflict. It is your responsibility to ensure that such conflicts are avoided.

Now write ```python exec="on" session="dask" source="material-block" result="code" import icechunk.dask

icechunk.dask.store_dask( icechunk_session, sources=[dask_array], targets=[zarray] ) ```

Finally commit your changes!

print(icechunk_session.commit("wrote a dask array!"))

CKXX9AB0X0E9AV14QWSG

Distributed#

In distributed contexts where the Session, and Zarr Array objects are sent across the network, you must opt-in to successful pickling of a writable store. This will happen when you have initialized a dask cluster. This will be case if you have initialized a distributed.Client. icechunk.dask.store_dask takes care of the hard bit of merging Sessions but it is required that you opt-in to pickling prior to creating the target Zarr array objects.

Here is an example:

```python exec="on" session="dask" source="material-block" result="code"

from distributed import Client client = Client()

import icechunk.dask

start a new session. Old session is readonly after committing#

icechunk_session = icechunk_repo.writable_session("main") zarr_chunks = (10, 10) with icechunk_session.allow_pickling(): group = zarr.group( store=icechunk_session.store, overwrite=True )

zarray = group.create_array(
    "array",
    shape=shape,
    chunks=zarr_chunks,
    dtype="f8",
    fill_value=float("nan"),
)

icechunk.dask.store_dask(
    icechunk_session,
    sources=[dask_array],
    targets=[zarray]
)

print(icechunk_session.commit("wrote a dask array!")) ```

Icechunk + Dask + Xarray#

The icechunk.xarray.to_icechunk is functionally identical to Xarray's Dataset.to_zarr, including many of the same keyword arguments. Notably the compute kwarg is not supported.

Warning

When using Xarray, Icechunk in a Dask Distributed context, you must use to_icechunk so that the Session has a record of the writes that are executed remotely. Using to_zarr in such cases, will result in the local Session having no record of remote writes, and a meaningless commit.

Now roundtrip an xarray dataset

import icechunk.xarray
import xarray as xr

icechunk_session = icechunk_repo.writable_session("main")
dataset = xr.tutorial.open_dataset(
    "rasm",
    chunks={"time": 1}).isel(time=slice(24)
    )

# `to_icechunk` takes care of "allow_pickling" for you
icechunk.xarray.to_icechunk(dataset, icechunk_session, mode="w")

with icechunk_session.allow_pickling():
    roundtripped = xr.open_zarr(icechunk_session.store, consolidated=False)
    print(dataset.identical(roundtripped))
True

Finally commit your changes!

print(icechunk_session.commit("wrote an Xarray dataset!"))
52PQE1PPW8M0SH8KDKZ0