Skip to content

Home / icechunk-python / dask

Distributed Writes with dask

You can use Icechunk in conjunction with Xarray and Dask to perform large-scale distributed writes from a multi-node cluster. However, because of how Icechunk works, it's not possible to use the existing Dask.Array.to_zarr or Xarray.Dataset.to_zarr functions with either the Dask multiprocessing or distributed schedulers. (It is fine with the multithreaded scheduler.)

Instead, Icechunk provides its own specialized functions to make distributed writes with Dask and Xarray. This page explains how to use these specialized functions.

Note

Using Xarray, Dask, and Icechunk requires icechunk>=0.1.0a5, dask>=2024.11.0, and xarray>=2024.11.0.

First let's start a distributed Client and create an IcechunkStore.

# initialize a distributed Client
from distributed import Client

client = Client()

# initialize the icechunk store
import icechunk

storage = icechunk.local_filesystem_storage("./icechunk-xarray")
icechunk_repo = icechunk.Repository.create(storage_config)
icechunk_session = icechunk_repo.writable_session("main")

Icechunk + Dask

Use icechunk.dask.store_dask to write a Dask array to an Icechunk store. The API follows that of dask.array.store without support for the compute kwarg.

First create a dask array to write:

shape = (100, 100)
dask_chunks = (20, 20)
dask_array = dask.array.random.random(shape, chunks=dask_chunks)

Now create the Zarr array you will write to.

zarr_chunks = (10, 10)
group = zarr.group(store=icechunk_sesion.store, overwrite=True)

zarray = group.create_array(
    "array",
    shape=shape,
    chunks=zarr_chunks,
    dtype="f8",
    fill_value=float("nan"),
)
Note that the chunks in the store are a divisor of the dask chunks. This means each individual write task is independent, and will not conflict. It is your responsibility to ensure that such conflicts are avoided.

Now write

import icechunk.dask

icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray])

Finally commit your changes!

icechunk_session.commit("wrote a dask array!")

Distributed

In distributed contexts where the Session, and Zarr Array objects are sent across the network, you must opt-in to successful pickling of a writable store.

icechunk.dask.store_dask takes care of the hard bit of merging Sessions but it is required that you opt-in to pickling prior to creating the target Zarr array objects.

Here is an example:

import icechunk.dask

zarr_chunks = (10, 10)
with icechunk_session.allow_pickling():
    group = zarr.group(store=icechunk_sesion.store, overwrite=True)

    zarray = group.create_array(
        "array",
        shape=shape,
        chunks=zarr_chunks,
        dtype="f8",
        fill_value=float("nan"),
    )
    icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray])
icechunk_session.commit("wrote a dask array!")

Icechunk + Dask + Xarray

The icechunk.xarray.to_icechunk is functionally identical to Xarray's Dataset.to_zarr, including many of the same keyword arguments. Notably the compute kwarg is not supported.

Warning

When using Xarray, Icechunk in a Dask Distributed context, you must use to_icechunk so that the Session has a record of the writes that are executed remotely. Using to_zarr in such cases, will result in the local Session having no record of remote writes, and a meaningless commit.

Now roundtrip an xarray dataset

import icechunk.xarray
import xarray as xr

# Assuming you have a valid writable Session named icechunk_session
dataset = xr.tutorial.open_dataset("rasm", chunks={"time": 1}).isel(time=slice(24))

icechunk.xarray.to_icechunk(dataset, session)

roundtripped = xr.open_zarr(icechunk_session.store, consolidated=False)
dataset.identical(roundtripped)

Finally commit your changes!

icechunk_session.commit("wrote an Xarray dataset!")