Home / icechunk-python / dask
Distributed Writes with dask
You can use Icechunk in conjunction with Xarray and Dask to perform large-scale distributed writes from a multi-node cluster. However, because of how Icechunk works, it's not possible to use the existing Dask.Array.to_zarr
or Xarray.Dataset.to_zarr
functions with either the Dask multiprocessing or distributed schedulers. (It is fine with the multithreaded scheduler.)
Instead, Icechunk provides its own specialized functions to make distributed writes with Dask and Xarray. This page explains how to use these specialized functions.
Note
Using Xarray, Dask, and Icechunk requires icechunk>=0.1.0a5
, dask>=2024.11.0
, and xarray>=2024.11.0
.
First let's start a distributed Client and create an IcechunkStore.
# initialize a distributed Client
from distributed import Client
client = Client()
# initialize the icechunk store
import icechunk
storage = icechunk.local_filesystem_storage("./icechunk-xarray")
icechunk_repo = icechunk.Repository.create(storage_config)
icechunk_session = icechunk_repo.writable_session("main")
Icechunk + Dask
Use icechunk.dask.store_dask
to write a Dask array to an Icechunk store. The API follows that of dask.array.store
without support for the compute
kwarg.
First create a dask array to write:
shape = (100, 100)
dask_chunks = (20, 20)
dask_array = dask.array.random.random(shape, chunks=dask_chunks)
Now create the Zarr array you will write to.
zarr_chunks = (10, 10)
group = zarr.group(store=icechunk_sesion.store, overwrite=True)
zarray = group.create_array(
"array",
shape=shape,
chunks=zarr_chunks,
dtype="f8",
fill_value=float("nan"),
)
Now write
import icechunk.dask
icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray])
Finally commit your changes!
Distributed
In distributed contexts where the Session, and Zarr Array objects are sent across the network, you must opt-in to successful pickling of a writable store.
icechunk.dask.store_dask
takes care of the hard bit of merging Sessions but it is required that you opt-in to pickling prior to creating the target Zarr array objects.
Here is an example:
import icechunk.dask
zarr_chunks = (10, 10)
with icechunk_session.allow_pickling():
group = zarr.group(store=icechunk_sesion.store, overwrite=True)
zarray = group.create_array(
"array",
shape=shape,
chunks=zarr_chunks,
dtype="f8",
fill_value=float("nan"),
)
icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray])
icechunk_session.commit("wrote a dask array!")
Icechunk + Dask + Xarray
The icechunk.xarray.to_icechunk
is functionally identical to Xarray's Dataset.to_zarr
, including many of the same keyword arguments. Notably the compute
kwarg is not supported.
Warning
When using Xarray, Icechunk in a Dask Distributed context, you must use to_icechunk
so that the Session has a record of the writes that are executed remotely. Using to_zarr
in such cases, will result in the local Session having no record of remote writes, and a meaningless commit.
Now roundtrip an xarray dataset
import icechunk.xarray
import xarray as xr
# Assuming you have a valid writable Session named icechunk_session
dataset = xr.tutorial.open_dataset("rasm", chunks={"time": 1}).isel(time=slice(24))
icechunk.xarray.to_icechunk(dataset, session)
roundtripped = xr.open_zarr(icechunk_session.store, consolidated=False)
dataset.identical(roundtripped)
Finally commit your changes!