Skip to content

Home / reference / dask

icechunk.dask#

Dask integration for parallel array writes.

icechunk.dask #

Functions:

Name Description
computing_meta

A decorator to handle the dask-specific computing_meta flag.

store_dask

A version of dask.array.store for Icechunk stores.

computing_meta #

computing_meta(func)

A decorator to handle the dask-specific computing_meta flag.

If computing_meta is True in the keyword arguments, the decorated function will return a placeholder meta object (np.array([object()], dtype=object)). Otherwise, it will execute the original function.

Source code in icechunk-python/python/icechunk/dask.py
def computing_meta[**P, R](func: Callable[P, R]) -> Callable[P, Any]:
    """
    A decorator to handle the dask-specific `computing_meta` flag.

    If `computing_meta` is True in the keyword arguments, the decorated
    function will return a placeholder meta object (np.array([object()], dtype=object)).
    Otherwise, it will execute the original function.
    """

    @functools.wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> Any:
        if kwargs.get("computing_meta", False):
            return np.array([object()], dtype=object)
        return func(*args, **kwargs)

    return wrapper

store_dask #

store_dask(
    *,
    sources,
    targets,
    regions=None,
    split_every=None,
    **store_kwargs,
)

A version of dask.array.store for Icechunk stores.

This method will eagerly execute writes to the Icechunk store, and will merge the changesets corresponding to each write task. The store object passed in will be updated in-place with the fully merged changeset.

For distributed or multi-processing writes, use Session.fork() to create a ForkSession that can be pickled and distributed to remote workers.

Parameters:

Name Type Description Default
sources list[Array]

List of dask arrays to write.

required
targets list of `zarr.Array`

Corresponding list of Zarr array objects to write to.

required
regions list[tuple[slice, ...]] | None

Corresponding region for each of targets to write to.

None
split_every int | None

Number of changesets to merge at a given time.

None
**store_kwargs Any

Arbitrary keyword arguments passed to dask.array.store. Notably compute, return_stored, load_stored, and lock are unsupported.

{}
Source code in icechunk-python/python/icechunk/dask.py
def store_dask(
    *,
    sources: list[Array],
    targets: "list[zarr.Array[ArrayV3Metadata]]",
    regions: list[tuple[slice, ...]] | None = None,
    split_every: int | None = None,
    **store_kwargs: Any,
) -> ForkSession:
    """
    A version of ``dask.array.store`` for Icechunk stores.

    This method will eagerly execute writes to the Icechunk store, and will
    merge the changesets corresponding to each write task. The `store` object
    passed in will be updated in-place with the fully merged changeset.

    For distributed or multi-processing writes, use `Session.fork()` to create a
    `ForkSession` that can be pickled and distributed to remote workers.

    Parameters
    ----------
    sources: list of `dask.array.Array`
        List of dask arrays to write.
    targets : list of `zarr.Array`
        Corresponding list of Zarr array objects to write to.
    regions: list of tuple of slice, optional
        Corresponding region for each of `targets` to write to.
    split_every: int, optional
        Number of changesets to merge at a given time.
    **store_kwargs:
        Arbitrary keyword arguments passed to `dask.array.store`. Notably `compute`,
        `return_stored`, `load_stored`, and `lock` are unsupported.
    """
    _assert_correct_dask_version()
    stored_arrays = dask.array.store(
        sources=sources,
        targets=targets,  # type: ignore[arg-type]
        regions=regions,
        compute=False,
        return_stored=True,
        load_stored=False,
        lock=False,
        **store_kwargs,
    )
    return session_merge_reduction(stored_arrays, split_every=split_every, **store_kwargs)