Skip to content

API Reference

Home / icechunk-python / reference

BasicConflictSolver

Bases: ConflictSolver

A basic conflict solver that allows for simple configuration of resolution behavior

This conflict solver allows for simple configuration of resolution behavior for conflicts that may occur during a rebase operation. It will attempt to resolve a limited set of conflicts based on the configuration options provided.

  • When a user attribute conflict is encountered, the behavior is determined by the on_user_attributes_conflict option
  • When a chunk conflict is encountered, the behavior is determined by the on_chunk_conflict option
  • When an array is deleted that has been updated, fail_on_delete_of_updated_array will determine whether to fail the rebase operation
  • When a group is deleted that has been updated, fail_on_delete_of_updated_group will determine whether to fail the rebase operation
Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class BasicConflictSolver(ConflictSolver):
    """A basic conflict solver that allows for simple configuration of resolution behavior

    This conflict solver allows for simple configuration of resolution behavior for conflicts that may occur during a rebase operation.
    It will attempt to resolve a limited set of conflicts based on the configuration options provided.

    - When a user attribute conflict is encountered, the behavior is determined by the `on_user_attributes_conflict` option
    - When a chunk conflict is encountered, the behavior is determined by the `on_chunk_conflict` option
    - When an array is deleted that has been updated, `fail_on_delete_of_updated_array` will determine whether to fail the rebase operation
    - When a group is deleted that has been updated, `fail_on_delete_of_updated_group` will determine whether to fail the rebase operation
    """

    def __init__(
        self,
        *,
        on_user_attributes_conflict: VersionSelection = VersionSelection.UseOurs,
        on_chunk_conflict: VersionSelection = VersionSelection.UseOurs,
        fail_on_delete_of_updated_array: bool = False,
        fail_on_delete_of_updated_group: bool = False,
    ) -> None:
        """Create a BasicConflictSolver object with the given configuration options
        Parameters:
        on_user_attributes_conflict: VersionSelection
            The behavior to use when a user attribute conflict is encountered, by default VersionSelection.use_ours()
        on_chunk_conflict: VersionSelection
            The behavior to use when a chunk conflict is encountered, by default VersionSelection.use_theirs()
        fail_on_delete_of_updated_array: bool
            Whether to fail when a chunk is deleted that has been updated, by default False
        fail_on_delete_of_updated_group: bool
            Whether to fail when a group is deleted that has been updated, by default False
        """
        ...

__init__(*, on_user_attributes_conflict=VersionSelection.UseOurs, on_chunk_conflict=VersionSelection.UseOurs, fail_on_delete_of_updated_array=False, fail_on_delete_of_updated_group=False)

Create a BasicConflictSolver object with the given configuration options Parameters: on_user_attributes_conflict: VersionSelection The behavior to use when a user attribute conflict is encountered, by default VersionSelection.use_ours() on_chunk_conflict: VersionSelection The behavior to use when a chunk conflict is encountered, by default VersionSelection.use_theirs() fail_on_delete_of_updated_array: bool Whether to fail when a chunk is deleted that has been updated, by default False fail_on_delete_of_updated_group: bool Whether to fail when a group is deleted that has been updated, by default False

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
def __init__(
    self,
    *,
    on_user_attributes_conflict: VersionSelection = VersionSelection.UseOurs,
    on_chunk_conflict: VersionSelection = VersionSelection.UseOurs,
    fail_on_delete_of_updated_array: bool = False,
    fail_on_delete_of_updated_group: bool = False,
) -> None:
    """Create a BasicConflictSolver object with the given configuration options
    Parameters:
    on_user_attributes_conflict: VersionSelection
        The behavior to use when a user attribute conflict is encountered, by default VersionSelection.use_ours()
    on_chunk_conflict: VersionSelection
        The behavior to use when a chunk conflict is encountered, by default VersionSelection.use_theirs()
    fail_on_delete_of_updated_array: bool
        Whether to fail when a chunk is deleted that has been updated, by default False
    fail_on_delete_of_updated_group: bool
        Whether to fail when a group is deleted that has been updated, by default False
    """
    ...

CachingConfig

Configuration for how Icechunk caches its metadata files

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class CachingConfig:
    """Configuration for how Icechunk caches its metadata files"""

    def __init__(
        self,
        num_snapshot_nodes: int | None = None,
        num_chunk_refs: int | None = None,
        num_transaction_changes: int | None = None,
        num_bytes_attributes: int | None = None,
        num_bytes_chunks: int | None = None,
    ) -> None: ...
    @property
    def num_snapshot_nodes(self) -> int | None: ...
    @num_snapshot_nodes.setter
    def num_snapshot_nodes(self, value: int | None) -> None: ...
    @property
    def num_chunk_refs(self) -> int | None: ...
    @num_chunk_refs.setter
    def num_chunk_refs(self, value: int | None) -> None: ...
    @property
    def num_transaction_changes(self) -> int | None: ...
    @num_transaction_changes.setter
    def num_transaction_changes(self, value: int | None) -> None: ...
    @property
    def num_bytes_attributes(self) -> int | None: ...
    @num_bytes_attributes.setter
    def num_bytes_attributes(self, value: int | None) -> None: ...
    @property
    def num_bytes_chunks(self) -> int | None: ...
    @num_bytes_chunks.setter
    def num_bytes_chunks(self, value: int | None) -> None: ...
    @staticmethod
    def default() -> CachingConfig: ...

CompressionAlgorithm

Bases: Enum

Enum for selecting the compression algorithm used by Icechunk to write its metadata files

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class CompressionAlgorithm(Enum):
    """Enum for selecting the compression algorithm used by Icechunk to write its metadata files"""

    Zstd = 0

    def __init__(self) -> None: ...
    @staticmethod
    def default() -> CompressionAlgorithm: ...

CompressionConfig

Configuration for how Icechunk compresses its metadata files

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class CompressionConfig:
    """Configuration for how Icechunk compresses its metadata files"""

    def __init__(
        self, algorithm: CompressionAlgorithm | None = None, level: int | None = None
    ) -> None: ...
    @property
    def algorithm(self) -> CompressionAlgorithm | None: ...
    @algorithm.setter
    def algorithm(self, value: CompressionAlgorithm | None) -> None: ...
    @property
    def level(self) -> int | None: ...
    @level.setter
    def level(self, value: int | None) -> None: ...
    @staticmethod
    def default() -> CompressionConfig: ...

Conflict

A conflict detected between snapshots

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class Conflict:
    """A conflict detected between snapshots"""

    @property
    def conflict_type(self) -> ConflictType:
        """The type of conflict detected"""
        ...

    @property
    def path(self) -> str:
        """The path of the node that caused the conflict"""
        ...

    @property
    def conflicted_chunks(self) -> list[list[int]] | None:
        """If the conflict is a chunk conflict, this will return the list of chunk indices that are in conflict"""
        ...

conflict_type: ConflictType property

The type of conflict detected

conflicted_chunks: list[list[int]] | None property

If the conflict is a chunk conflict, this will return the list of chunk indices that are in conflict

path: str property

The path of the node that caused the conflict

ConflictDetector

Bases: ConflictSolver

A conflict solver that can be used to detect conflicts between two stores, but does not resolve them

Where the BasicConflictSolver will attempt to resolve conflicts, the ConflictDetector will only detect them. This means that during a rebase operation the ConflictDetector will raise a RebaseFailed error if any conflicts are detected, and allow the rebase operation to be retried with a different conflict resolution strategy. Otherwise, if no conflicts are detected the rebase operation will succeed.

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class ConflictDetector(ConflictSolver):
    """A conflict solver that can be used to detect conflicts between two stores, but does not resolve them

    Where the `BasicConflictSolver` will attempt to resolve conflicts, the `ConflictDetector` will only detect them. This means
    that during a rebase operation the `ConflictDetector` will raise a `RebaseFailed` error if any conflicts are detected, and
    allow the rebase operation to be retried with a different conflict resolution strategy. Otherwise, if no conflicts are detected
    the rebase operation will succeed.
    """

    def __init__(self) -> None: ...

ConflictError

Bases: Exception

Error raised when a commit operation fails due to a conflict.

Source code in icechunk-python/python/icechunk/session.py
class ConflictError(Exception):
    """Error raised when a commit operation fails due to a conflict."""

    _error: ConflictErrorData

    def __init__(self, error: PyConflictError) -> None:
        self._error = error.args[0]

    def __str__(self) -> str:
        return str(self._error)

    @property
    def expected_parent(self) -> str:
        """
        The expected parent snapshot ID.

        Returns
        -------
        str
            The snapshot ID that the session was based on when the commit operation was called.
        """
        return self._error.expected_parent

    @property
    def actual_parent(self) -> str:
        """
        The actual parent snapshot ID of the branch that the session attempted to commit to.

        Returns
        -------
        str
            The snapshot ID of the branch tip. If this error is raised, it means the branch was modified and committed by another session after the session was created.
        """
        return self._error.actual_parent

actual_parent: str property

The actual parent snapshot ID of the branch that the session attempted to commit to.

Returns:

Type Description
str

The snapshot ID of the branch tip. If this error is raised, it means the branch was modified and committed by another session after the session was created.

expected_parent: str property

The expected parent snapshot ID.

Returns:

Type Description
str

The snapshot ID that the session was based on when the commit operation was called.

ConflictErrorData

Data class for conflict errors. This describes the snapshot conflict detected when committing a session

If this error is raised, it means the branch was modified and committed by another session after the session was created.

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class ConflictErrorData:
    """Data class for conflict errors. This describes the snapshot conflict detected when committing a session

    If this error is raised, it means the branch was modified and committed by another session after the session was created.
    """
    @property
    def expected_parent(self) -> str:
        """The expected parent snapshot ID.

        This is the snapshot ID that the session was based on when the
        commit operation was called.
        """
        ...
    @property
    def actual_parent(self) -> str:
        """
        The actual parent snapshot ID of the branch that the session attempted to commit to.

        When the session is based on a branch, this is the snapshot ID of the branch tip. If this
        error is raised, it means the branch was modified and committed by another session after
        the session was created.
        """
        ...

actual_parent: str property

The actual parent snapshot ID of the branch that the session attempted to commit to.

When the session is based on a branch, this is the snapshot ID of the branch tip. If this error is raised, it means the branch was modified and committed by another session after the session was created.

expected_parent: str property

The expected parent snapshot ID.

This is the snapshot ID that the session was based on when the commit operation was called.

ConflictSolver

An abstract conflict solver that can be used to detect or resolve conflicts between two stores

This should never be used directly, but should be subclassed to provide specific conflict resolution behavior

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class ConflictSolver:
    """An abstract conflict solver that can be used to detect or resolve conflicts between two stores

    This should never be used directly, but should be subclassed to provide specific conflict resolution behavior
    """

    ...

ConflictType

Bases: Enum

Type of conflict detected

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class ConflictType(Enum):
    """Type of conflict detected"""

    NewNodeConflictsWithExistingNode = 1
    NewNodeInInvalidGroup = 2
    ZarrMetadataDoubleUpdate = 3
    ZarrMetadataUpdateOfDeletedArray = 4
    UserAttributesDoubleUpdate = 5
    UserAttributesUpdateOfDeletedNode = 6
    ChunkDoubleUpdate = 7
    ChunksUpdatedInDeletedArray = 8
    ChunksUpdatedInUpdatedArray = 9
    DeleteOfUpdatedArray = 10
    DeleteOfUpdatedGroup = 11

IcechunkError

Bases: Exception

Base class for all Icechunk errors

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class IcechunkError(Exception):
    """Base class for all Icechunk errors"""

    ...

IcechunkStore

Bases: Store, SyncMixin

Source code in icechunk-python/python/icechunk/store.py
class IcechunkStore(Store, SyncMixin):
    _store: PyStore
    _allow_pickling: bool

    def __init__(
        self,
        store: PyStore,
        allow_pickling: bool,
        *args: Any,
        **kwargs: Any,
    ):
        """Create a new IcechunkStore.

        This should not be called directly, instead use the `create`, `open_existing` or `open_or_create` class methods.
        """
        super().__init__(read_only=store.read_only)
        if store is None:
            raise ValueError(
                "An IcechunkStore should not be created with the default constructor, instead use either the create or open_existing class methods."
            )
        self._store = store
        self._is_open = True
        self._allow_pickling = allow_pickling

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, IcechunkStore):
            return False
        return self._store == value._store

    def __getstate__(self) -> object:
        # we serialize the Rust store as bytes
        if not self._allow_pickling and not self._store.read_only:
            raise ValueError(
                "You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager"
            )
        d = self.__dict__.copy()
        d["_store"] = self._store.as_bytes()
        return d

    def __setstate__(self, state: Any) -> None:
        # we have to deserialize the bytes of the Rust store
        store_repr = state["_store"]
        state["_store"] = PyStore.from_bytes(store_repr)
        state["_read_only"] = state["_store"].read_only
        self.__dict__ = state

    @property
    def session(self) -> "Session":
        from icechunk import Session

        return Session(self._store.session, self._allow_pickling)

    async def clear(self) -> None:
        """Clear the store.

        This will remove all contents from the current session,
        including all groups and all arrays. But it will not modify the repository history.
        """
        return await self._store.clear()

    def sync_clear(self) -> None:
        """Clear the store.

        This will remove all contents from the current session,
        including all groups and all arrays. But it will not modify the repository history.
        """
        return self._store.sync_clear()

    async def is_empty(self, prefix: str) -> bool:
        """
        Check if the directory is empty.

        Parameters
        ----------
        prefix : str
            Prefix of keys to check.

        Returns
        -------
        bool
            True if the store is empty, False otherwise.
        """
        return await self._store.is_empty(prefix)

    async def get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: ByteRequest | None = None,
    ) -> Buffer | None:
        """Retrieve the value associated with a given key.

        Parameters
        ----------
        key : str
        byte_range : ByteRequest, optional

            ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved.

            - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned.
            - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header.
            - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

        Returns
        -------
        Buffer
        """

        try:
            result = await self._store.get(key, _byte_request_to_tuple(byte_range))
        except KeyError as _e:
            # Zarr python expects None to be returned if the key does not exist
            # but an IcechunkStore returns an error if the key does not exist
            return None

        return prototype.buffer.from_bytes(result)

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRequest | None]],
    ) -> list[Buffer | None]:
        """Retrieve possibly partial values from given key_ranges.

        Parameters
        ----------
        key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]]
            Ordered set of key, range pairs, a key may occur multiple times with different ranges

        Returns
        -------
        list of values, in the order of the key_ranges, may contain null/none for missing keys
        """
        # NOTE: pyo3 has not implicit conversion from an Iterable to a rust iterable. So we convert it
        # to a list here first. Possible opportunity for optimization.
        ranges = [(k[0], _byte_request_to_tuple(k[1])) for k in key_ranges]
        result = await self._store.get_partial_values(list(ranges))
        return [prototype.buffer.from_bytes(r) for r in result]

    async def exists(self, key: str) -> bool:
        """Check if a key exists in the store.

        Parameters
        ----------
        key : str

        Returns
        -------
        bool
        """
        return await self._store.exists(key)

    @property
    def supports_writes(self) -> bool:
        """Does the store support writes?"""
        return self._store.supports_writes

    async def set(self, key: str, value: Buffer) -> None:
        """Store a (key, value) pair.

        Parameters
        ----------
        key : str
        value : Buffer
        """
        if not isinstance(value, Buffer):
            raise TypeError(
                f"IcechunkStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
            )
        return await self._store.set(key, value.to_bytes())

    async def set_if_not_exists(self, key: str, value: Buffer) -> None:
        """
        Store a key to ``value`` if the key is not already present.

        Parameters
        -----------
        key : str
        value : Buffer
        """
        return await self._store.set_if_not_exists(key, value.to_bytes())

    def set_virtual_ref(
        self,
        key: str,
        location: str,
        *,
        offset: int,
        length: int,
        checksum: str | datetime | None = None,
        validate_container: bool = False,
    ) -> None:
        """Store a virtual reference to a chunk.

        Parameters
        ----------
        key : str
            The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'
        location : str
            The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'
        offset : int
            The offset in bytes from the start of the file location in storage the chunk starts at
        length : int
            The length of the chunk in bytes, measured from the given offset
        checksum : str | datetime | None
            The etag or last_medified_at field of the object
        validate_container: bool
            If set to true, fail for locations that don't match any existing virtual chunk container
        """
        return self._store.set_virtual_ref(
            key, location, offset, length, checksum, validate_container
        )

    async def delete(self, key: str) -> None:
        """Remove a key from the store

        Parameters
        ----------
        key : str
        """
        return await self._store.delete(key)

    async def delete_dir(self, prefix: str) -> None:
        """Delete a prefix

        Parameters
        ----------
        key : str
        """
        return await self._store.delete_dir(prefix)

    @property
    def supports_partial_writes(self) -> bool:
        """Does the store support partial writes?"""
        return self._store.supports_partial_writes

    async def set_partial_values(
        self, key_start_values: Iterable[tuple[str, int, BytesLike]]
    ) -> None:
        """Store values at a given key, starting at byte range_start.

        Parameters
        ----------
        key_start_values : list[tuple[str, int, BytesLike]]
            set of key, range_start, values triples, a key may occur multiple times with different
            range_starts, range_starts (considering the length of the respective values) must not
            specify overlapping ranges for the same key
        """
        # NOTE: pyo3 does not implicit conversion from an Iterable to a rust iterable. So we convert it
        # to a list here first. Possible opportunity for optimization.
        return await self._store.set_partial_values(list(key_start_values))

    @property
    def supports_listing(self) -> bool:
        """Does the store support listing?"""
        return self._store.supports_listing

    @property
    def supports_deletes(self) -> bool:
        return self._store.supports_deletes

    def list(self) -> AsyncIterator[str]:
        """Retrieve all keys in the store.

        Returns
        -------
        AsyncIterator[str, None]
        """
        # This method should be async, like overridden methods in child classes.
        # However, that's not straightforward:
        # https://stackoverflow.com/questions/68905848

        # The zarr spec specefies that that this and other
        # listing methods should not be async, so we need to
        # wrap the async method in a sync method.
        return self._store.list()

    def list_prefix(self, prefix: str) -> AsyncIterator[str]:
        """Retrieve all keys in the store that begin with a given prefix. Keys are returned relative
        to the root of the store.

        Parameters
        ----------
        prefix : str

        Returns
        -------
        AsyncIterator[str, None]
        """
        # The zarr spec specefies that that this and other
        # listing methods should not be async, so we need to
        # wrap the async method in a sync method.
        return self._store.list_prefix(prefix)

    def list_dir(self, prefix: str) -> AsyncIterator[str]:
        """
        Retrieve all keys and prefixes with a given prefix and which do not contain the character
        “/” after the given prefix.

        Parameters
        ----------
        prefix : str

        Returns
        -------
        AsyncIterator[str, None]
        """
        # The zarr spec specefies that that this and other
        # listing methods should not be async, so we need to
        # wrap the async method in a sync method.
        return self._store.list_dir(prefix)

    async def getsize(self, key: str) -> int:
        return await self._store.getsize(key)

    async def getsize_prefix(self, prefix: str) -> int:
        return await self._store.getsize_prefix(prefix)

supports_listing: bool property

Does the store support listing?

supports_partial_writes: bool property

Does the store support partial writes?

supports_writes: bool property

Does the store support writes?

__init__(store, allow_pickling, *args, **kwargs)

Create a new IcechunkStore.

This should not be called directly, instead use the create, open_existing or open_or_create class methods.

Source code in icechunk-python/python/icechunk/store.py
def __init__(
    self,
    store: PyStore,
    allow_pickling: bool,
    *args: Any,
    **kwargs: Any,
):
    """Create a new IcechunkStore.

    This should not be called directly, instead use the `create`, `open_existing` or `open_or_create` class methods.
    """
    super().__init__(read_only=store.read_only)
    if store is None:
        raise ValueError(
            "An IcechunkStore should not be created with the default constructor, instead use either the create or open_existing class methods."
        )
    self._store = store
    self._is_open = True
    self._allow_pickling = allow_pickling

clear() async

Clear the store.

This will remove all contents from the current session, including all groups and all arrays. But it will not modify the repository history.

Source code in icechunk-python/python/icechunk/store.py
async def clear(self) -> None:
    """Clear the store.

    This will remove all contents from the current session,
    including all groups and all arrays. But it will not modify the repository history.
    """
    return await self._store.clear()

delete(key) async

Remove a key from the store

Parameters:

Name Type Description Default
key str
required
Source code in icechunk-python/python/icechunk/store.py
async def delete(self, key: str) -> None:
    """Remove a key from the store

    Parameters
    ----------
    key : str
    """
    return await self._store.delete(key)

delete_dir(prefix) async

Delete a prefix

Parameters:

Name Type Description Default
key str
required
Source code in icechunk-python/python/icechunk/store.py
async def delete_dir(self, prefix: str) -> None:
    """Delete a prefix

    Parameters
    ----------
    key : str
    """
    return await self._store.delete_dir(prefix)

exists(key) async

Check if a key exists in the store.

Parameters:

Name Type Description Default
key str
required

Returns:

Type Description
bool
Source code in icechunk-python/python/icechunk/store.py
async def exists(self, key: str) -> bool:
    """Check if a key exists in the store.

    Parameters
    ----------
    key : str

    Returns
    -------
    bool
    """
    return await self._store.exists(key)

get(key, prototype, byte_range=None) async

Retrieve the value associated with a given key.

Parameters:

Name Type Description Default
key str
required
byte_range ByteRequest

ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved.

  • RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned.
  • OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header.
  • SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.
None

Returns:

Type Description
Buffer
Source code in icechunk-python/python/icechunk/store.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None:
    """Retrieve the value associated with a given key.

    Parameters
    ----------
    key : str
    byte_range : ByteRequest, optional

        ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved.

        - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned.
        - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header.
        - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

    Returns
    -------
    Buffer
    """

    try:
        result = await self._store.get(key, _byte_request_to_tuple(byte_range))
    except KeyError as _e:
        # Zarr python expects None to be returned if the key does not exist
        # but an IcechunkStore returns an error if the key does not exist
        return None

    return prototype.buffer.from_bytes(result)

get_partial_values(prototype, key_ranges) async

Retrieve possibly partial values from given key_ranges.

Parameters:

Name Type Description Default
key_ranges Iterable[tuple[str, tuple[int | None, int | None]]]

Ordered set of key, range pairs, a key may occur multiple times with different ranges

required

Returns:

Type Description
list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in icechunk-python/python/icechunk/store.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    """Retrieve possibly partial values from given key_ranges.

    Parameters
    ----------
    key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]]
        Ordered set of key, range pairs, a key may occur multiple times with different ranges

    Returns
    -------
    list of values, in the order of the key_ranges, may contain null/none for missing keys
    """
    # NOTE: pyo3 has not implicit conversion from an Iterable to a rust iterable. So we convert it
    # to a list here first. Possible opportunity for optimization.
    ranges = [(k[0], _byte_request_to_tuple(k[1])) for k in key_ranges]
    result = await self._store.get_partial_values(list(ranges))
    return [prototype.buffer.from_bytes(r) for r in result]

is_empty(prefix) async

Check if the directory is empty.

Parameters:

Name Type Description Default
prefix str

Prefix of keys to check.

required

Returns:

Type Description
bool

True if the store is empty, False otherwise.

Source code in icechunk-python/python/icechunk/store.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    return await self._store.is_empty(prefix)

list()

Retrieve all keys in the store.

Returns:

Type Description
AsyncIterator[str, None]
Source code in icechunk-python/python/icechunk/store.py
def list(self) -> AsyncIterator[str]:
    """Retrieve all keys in the store.

    Returns
    -------
    AsyncIterator[str, None]
    """
    # This method should be async, like overridden methods in child classes.
    # However, that's not straightforward:
    # https://stackoverflow.com/questions/68905848

    # The zarr spec specefies that that this and other
    # listing methods should not be async, so we need to
    # wrap the async method in a sync method.
    return self._store.list()

list_dir(prefix)

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

Name Type Description Default
prefix str
required

Returns:

Type Description
AsyncIterator[str, None]
Source code in icechunk-python/python/icechunk/store.py
def list_dir(self, prefix: str) -> AsyncIterator[str]:
    """
    Retrieve all keys and prefixes with a given prefix and which do not contain the character
    “/” after the given prefix.

    Parameters
    ----------
    prefix : str

    Returns
    -------
    AsyncIterator[str, None]
    """
    # The zarr spec specefies that that this and other
    # listing methods should not be async, so we need to
    # wrap the async method in a sync method.
    return self._store.list_dir(prefix)

list_prefix(prefix)

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

Name Type Description Default
prefix str
required

Returns:

Type Description
AsyncIterator[str, None]
Source code in icechunk-python/python/icechunk/store.py
def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    """Retrieve all keys in the store that begin with a given prefix. Keys are returned relative
    to the root of the store.

    Parameters
    ----------
    prefix : str

    Returns
    -------
    AsyncIterator[str, None]
    """
    # The zarr spec specefies that that this and other
    # listing methods should not be async, so we need to
    # wrap the async method in a sync method.
    return self._store.list_prefix(prefix)

set(key, value) async

Store a (key, value) pair.

Parameters:

Name Type Description Default
key str
required
value Buffer
required
Source code in icechunk-python/python/icechunk/store.py
async def set(self, key: str, value: Buffer) -> None:
    """Store a (key, value) pair.

    Parameters
    ----------
    key : str
    value : Buffer
    """
    if not isinstance(value, Buffer):
        raise TypeError(
            f"IcechunkStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
        )
    return await self._store.set(key, value.to_bytes())

set_if_not_exists(key, value) async

Store a key to value if the key is not already present.

Parameters:

Name Type Description Default
key str
required
value Buffer
required
Source code in icechunk-python/python/icechunk/store.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    """
    Store a key to ``value`` if the key is not already present.

    Parameters
    -----------
    key : str
    value : Buffer
    """
    return await self._store.set_if_not_exists(key, value.to_bytes())

set_partial_values(key_start_values) async

Store values at a given key, starting at byte range_start.

Parameters:

Name Type Description Default
key_start_values list[tuple[str, int, BytesLike]]

set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key

required
Source code in icechunk-python/python/icechunk/store.py
async def set_partial_values(
    self, key_start_values: Iterable[tuple[str, int, BytesLike]]
) -> None:
    """Store values at a given key, starting at byte range_start.

    Parameters
    ----------
    key_start_values : list[tuple[str, int, BytesLike]]
        set of key, range_start, values triples, a key may occur multiple times with different
        range_starts, range_starts (considering the length of the respective values) must not
        specify overlapping ranges for the same key
    """
    # NOTE: pyo3 does not implicit conversion from an Iterable to a rust iterable. So we convert it
    # to a list here first. Possible opportunity for optimization.
    return await self._store.set_partial_values(list(key_start_values))

set_virtual_ref(key, location, *, offset, length, checksum=None, validate_container=False)

Store a virtual reference to a chunk.

Parameters:

Name Type Description Default
key str

The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'

required
location str

The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'

required
offset int

The offset in bytes from the start of the file location in storage the chunk starts at

required
length int

The length of the chunk in bytes, measured from the given offset

required
checksum str | datetime | None

The etag or last_medified_at field of the object

None
validate_container bool

If set to true, fail for locations that don't match any existing virtual chunk container

False
Source code in icechunk-python/python/icechunk/store.py
def set_virtual_ref(
    self,
    key: str,
    location: str,
    *,
    offset: int,
    length: int,
    checksum: str | datetime | None = None,
    validate_container: bool = False,
) -> None:
    """Store a virtual reference to a chunk.

    Parameters
    ----------
    key : str
        The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'
    location : str
        The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'
    offset : int
        The offset in bytes from the start of the file location in storage the chunk starts at
    length : int
        The length of the chunk in bytes, measured from the given offset
    checksum : str | datetime | None
        The etag or last_medified_at field of the object
    validate_container: bool
        If set to true, fail for locations that don't match any existing virtual chunk container
    """
    return self._store.set_virtual_ref(
        key, location, offset, length, checksum, validate_container
    )

sync_clear()

Clear the store.

This will remove all contents from the current session, including all groups and all arrays. But it will not modify the repository history.

Source code in icechunk-python/python/icechunk/store.py
def sync_clear(self) -> None:
    """Clear the store.

    This will remove all contents from the current session,
    including all groups and all arrays. But it will not modify the repository history.
    """
    return self._store.sync_clear()

ManifestConfig

Configuration for how Icechunk manifests

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class ManifestConfig:
    """Configuration for how Icechunk manifests"""

    def __init__(
        self,
        preload: ManifestPreloadConfig | None = None,
    ) -> None: ...
    @property
    def preload(self) -> ManifestPreloadConfig | None: ...
    @preload.setter
    def preload(self, value: ManifestPreloadConfig | None) -> None: ...

ManifestPreloadCondition

Configuration for conditions under which manifests will preload on session creation

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class ManifestPreloadCondition:
    """Configuration for conditions under which manifests will preload on session creation"""

    @staticmethod
    def or_conditions(
        conditions: list[ManifestPreloadCondition],
    ) -> ManifestPreloadCondition:
        """Create a preload condition that matches if any of `conditions` matches"""
        ...
    @staticmethod
    def and_conditions(
        conditions: list[ManifestPreloadCondition],
    ) -> ManifestPreloadCondition:
        """Create a preload condition that matches only if all passed `conditions` match"""
        ...
    @staticmethod
    def path_matches(regex: str) -> ManifestPreloadCondition:
        """Create a preload condition that matches if the full path to the array matches the passed regex.

        Array paths are absolute, as in `/path/to/my/array`
        """
        ...
    @staticmethod
    def name_matches(regex: str) -> ManifestPreloadCondition:
        """Create a preload condition that matches if the array's name matches the passed regex.

        Example, for an array  `/model/outputs/temperature`, the following will match:
        ```
        name_matches(".*temp.*")
        ```
        """
        ...
    @staticmethod
    def num_refs(from_refs: int | None, to_refs: int | None) -> ManifestPreloadCondition:
        """Create a preload condition that matches only if the number of chunk references in the manifest is within the given range.

        from_refs is inclusive, to_refs is exclusive.
        """
        ...
    @staticmethod
    def true() -> ManifestPreloadCondition:
        """Create a preload condition that always matches any manifest"""
        ...
    @staticmethod
    def false() -> ManifestPreloadCondition:
        """Create a preload condition that never matches any manifests"""
        ...

and_conditions(conditions) staticmethod

Create a preload condition that matches only if all passed conditions match

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
@staticmethod
def and_conditions(
    conditions: list[ManifestPreloadCondition],
) -> ManifestPreloadCondition:
    """Create a preload condition that matches only if all passed `conditions` match"""
    ...

false() staticmethod

Create a preload condition that never matches any manifests

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
@staticmethod
def false() -> ManifestPreloadCondition:
    """Create a preload condition that never matches any manifests"""
    ...

name_matches(regex) staticmethod

Create a preload condition that matches if the array's name matches the passed regex.

Example, for an array /model/outputs/temperature, the following will match:

name_matches(".*temp.*")

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
@staticmethod
def name_matches(regex: str) -> ManifestPreloadCondition:
    """Create a preload condition that matches if the array's name matches the passed regex.

    Example, for an array  `/model/outputs/temperature`, the following will match:
    ```
    name_matches(".*temp.*")
    ```
    """
    ...

num_refs(from_refs, to_refs) staticmethod

Create a preload condition that matches only if the number of chunk references in the manifest is within the given range.

from_refs is inclusive, to_refs is exclusive.

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
@staticmethod
def num_refs(from_refs: int | None, to_refs: int | None) -> ManifestPreloadCondition:
    """Create a preload condition that matches only if the number of chunk references in the manifest is within the given range.

    from_refs is inclusive, to_refs is exclusive.
    """
    ...

or_conditions(conditions) staticmethod

Create a preload condition that matches if any of conditions matches

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
@staticmethod
def or_conditions(
    conditions: list[ManifestPreloadCondition],
) -> ManifestPreloadCondition:
    """Create a preload condition that matches if any of `conditions` matches"""
    ...

path_matches(regex) staticmethod

Create a preload condition that matches if the full path to the array matches the passed regex.

Array paths are absolute, as in /path/to/my/array

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
@staticmethod
def path_matches(regex: str) -> ManifestPreloadCondition:
    """Create a preload condition that matches if the full path to the array matches the passed regex.

    Array paths are absolute, as in `/path/to/my/array`
    """
    ...

true() staticmethod

Create a preload condition that always matches any manifest

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
@staticmethod
def true() -> ManifestPreloadCondition:
    """Create a preload condition that always matches any manifest"""
    ...

ManifestPreloadConfig

Configuration for how Icechunk manifest preload on session creation

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class ManifestPreloadConfig:
    """Configuration for how Icechunk manifest preload on session creation"""

    def __init__(
        self,
        max_total_refs: int | None = None,
        preload_if: ManifestPreloadCondition | None = None,
    ) -> None: ...
    @property
    def max_total_refs(self) -> int | None: ...
    @max_total_refs.setter
    def max_total_refs(self, value: int | None) -> None: ...
    @property
    def preload_if(self) -> ManifestPreloadCondition | None: ...
    @preload_if.setter
    def preload_if(self, value: ManifestPreloadCondition | None) -> None: ...

RebaseFailedData

Data class for rebase failed errors. This describes the error that occurred when rebasing a session

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class RebaseFailedData:
    """Data class for rebase failed errors. This describes the error that occurred when rebasing a session"""

    @property
    def snapshot(self) -> str:
        """The snapshot ID that the session was rebased to"""
        ...

    @property
    def conflicts(self) -> list[Conflict]:
        """The conflicts that occurred during the rebase operation"""
        ...

conflicts: list[Conflict] property

The conflicts that occurred during the rebase operation

snapshot: str property

The snapshot ID that the session was rebased to

RebaseFailedError

Bases: Exception

Error raised when a rebase operation fails.

Source code in icechunk-python/python/icechunk/session.py
class RebaseFailedError(Exception):
    """Error raised when a rebase operation fails."""

    _error: RebaseFailedData

    def __init__(self, error: PyRebaseFailedError) -> None:
        self._error = error.args[0]

    def __str__(self) -> str:
        return str(self._error)

    @property
    def snapshot_id(self) -> str:
        """
        The snapshot ID that the rebase operation failed on.

        Returns
        -------
        str
            The snapshot ID that the rebase operation failed on.
        """
        return self._error.snapshot

    @property
    def conflicts(self) -> list[Conflict]:
        """
        List of conflicts that occurred during the rebase operation.

        Returns
        -------
        list of Conflict
            List of conflicts that occurred during the rebase operation.
        """
        return self._error.conflicts

conflicts: list[Conflict] property

List of conflicts that occurred during the rebase operation.

Returns:

Type Description
list of Conflict

List of conflicts that occurred during the rebase operation.

snapshot_id: str property

The snapshot ID that the rebase operation failed on.

Returns:

Type Description
str

The snapshot ID that the rebase operation failed on.

Repository

An Icechunk repository.

Source code in icechunk-python/python/icechunk/repository.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
class Repository:
    """An Icechunk repository."""

    _repository: PyRepository

    def __init__(self, repository: PyRepository):
        self._repository = repository

    @classmethod
    def create(
        cls,
        storage: Storage,
        config: RepositoryConfig | None = None,
        virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
    ) -> Self:
        """
        Create a new Icechunk repository.
        If one already exists at the given store location, an error will be raised.

        Parameters
        ----------
        storage : Storage
            The storage configuration for the repository.
        config : RepositoryConfig, optional
            The repository configuration. If not provided, a default configuration will be used.
        virtual_chunk_credentials : dict[str, AnyCredential], optional
            Credentials for virtual chunks.

        Returns
        -------
        Self
            An instance of the Repository class.
        """
        return cls(
            PyRepository.create(
                storage,
                config=config,
                virtual_chunk_credentials=virtual_chunk_credentials,
            )
        )

    @classmethod
    def open(
        cls,
        storage: Storage,
        config: RepositoryConfig | None = None,
        virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
    ) -> Self:
        """
        Open an existing Icechunk repository.

        If no repository exists at the given storage location, an error will be raised.

        Parameters
        ----------
        storage : Storage
            The storage configuration for the repository.
        config : RepositoryConfig, optional
            The repository settings. If not provided, a default configuration will be
            loaded from the repository.
        virtual_chunk_credentials : dict[str, AnyCredential], optional
            Credentials for virtual chunks.

        Returns
        -------
        Self
            An instance of the Repository class.
        """
        return cls(
            PyRepository.open(
                storage,
                config=config,
                virtual_chunk_credentials=virtual_chunk_credentials,
            )
        )

    @classmethod
    def open_or_create(
        cls,
        storage: Storage,
        config: RepositoryConfig | None = None,
        virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
    ) -> Self:
        """
        Open an existing Icechunk repository or create a new one if it does not exist.

        Parameters
        ----------
        storage : Storage
            The storage configuration for the repository.
        config : RepositoryConfig, optional
            The repository settings. If not provided, a default configuration will be
            loaded from the repository.
        virtual_chunk_credentials : dict[str, AnyCredential], optional
            Credentials for virtual chunks.

        Returns
        -------
        Self
            An instance of the Repository class.
        """
        return cls(
            PyRepository.open_or_create(
                storage,
                config=config,
                virtual_chunk_credentials=virtual_chunk_credentials,
            )
        )

    @staticmethod
    def exists(storage: Storage) -> bool:
        """
        Check if a repository exists at the given storage location.

        Parameters
        ----------
        storage : Storage
            The storage configuration for the repository.

        Returns
        -------
        bool
            True if the repository exists, False otherwise.
        """
        return PyRepository.exists(storage)

    @staticmethod
    def fetch_config(storage: Storage) -> RepositoryConfig | None:
        """
        Fetch the configuration for the repository saved in storage.

        Parameters
        ----------
        storage : Storage
            The storage configuration for the repository.

        Returns
        -------
        RepositoryConfig | None
            The repository configuration if it exists, None otherwise.
        """
        return PyRepository.fetch_config(storage)

    def save_config(self) -> None:
        """
        Save the repository configuration to storage, this configuration will be used in future calls to Repository.open.

        Returns
        -------
        None
        """
        return self._repository.save_config()

    @property
    def config(self) -> RepositoryConfig:
        """
        Get a copy of this repository's config.

        Returns
        -------
        RepositoryConfig
            The repository configuration.
        """
        return self._repository.config()

    @property
    def storage(self) -> Storage:
        """
        Get a copy of this repository's Storage instance.

        Returns
        -------
        Storage
            The repository storage instance.
        """
        return self._repository.storage()

    def ancestry(
        self,
        *,
        branch: str | None = None,
        tag: str | None = None,
        snapshot: str | None = None,
    ) -> list[SnapshotInfo]:
        """
        Get the ancestry of a snapshot.

        Parameters
        ----------
        branch : str, optional
            The branch to get the ancestry of.
        tag : str, optional
            The tag to get the ancestry of.
        snapshot : str, optional
            The snapshot ID to get the ancestry of.

        Returns
        -------
        list[SnapshotInfo]
            The ancestry of the snapshot, listing out the snapshots and their metadata.

        Notes
        -----
        Only one of the arguments can be specified.
        """
        return self._repository.ancestry(branch=branch, tag=tag, snapshot=snapshot)

    def async_ancestry(
        self,
        *,
        branch: str | None = None,
        tag: str | None = None,
        snapshot: str | None = None,
    ) -> AsyncIterator[SnapshotInfo]:
        """
        Get the ancestry of a snapshot.

        Parameters
        ----------
        branch : str, optional
            The branch to get the ancestry of.
        tag : str, optional
            The tag to get the ancestry of.
        snapshot : str, optional
            The snapshot ID to get the ancestry of.

        Returns
        -------
        list[SnapshotInfo]
            The ancestry of the snapshot, listing out the snapshots and their metadata.

        Notes
        -----
        Only one of the arguments can be specified.
        """
        return self._repository.async_ancestry(branch=branch, tag=tag, snapshot=snapshot)

    def create_branch(self, branch: str, snapshot_id: str) -> None:
        """
        Create a new branch at the given snapshot.

        Parameters
        ----------
        branch : str
            The name of the branch to create.
        snapshot_id : str
            The snapshot ID to create the branch at.

        Returns
        -------
        None
        """
        self._repository.create_branch(branch, snapshot_id)

    def list_branches(self) -> set[str]:
        """
        List the branches in the repository.

        Returns
        -------
        set[str]
            A set of branch names.
        """
        return self._repository.list_branches()

    def lookup_branch(self, branch: str) -> str:
        """
        Get the tip snapshot ID of a branch.

        Parameters
        ----------
        branch : str
            The branch to get the tip of.

        Returns
        -------
        str
            The snapshot ID of the tip of the branch.
        """
        return self._repository.lookup_branch(branch)

    def reset_branch(self, branch: str, snapshot_id: str) -> None:
        """
        Reset a branch to a specific snapshot.

        This will permanently alter the history of the branch such that the tip of
        the branch is the specified snapshot.

        Parameters
        ----------
        branch : str
            The branch to reset.
        snapshot_id : str
            The snapshot ID to reset the branch to.

        Returns
        -------
        None
        """
        self._repository.reset_branch(branch, snapshot_id)

    def delete_branch(self, branch: str) -> None:
        """
        Delete a branch.

        Parameters
        ----------
        branch : str
            The branch to delete.

        Returns
        -------
        None
        """
        self._repository.delete_branch(branch)

    def delete_tag(self, branch: str) -> None:
        """
        Delete a tag.

        Parameters
        ----------
        tag : str
            The tag to delete.

        Returns
        -------
        None
        """
        self._repository.delete_tag(branch)

    def create_tag(self, tag: str, snapshot_id: str) -> None:
        """
        Create a new tag at the given snapshot.

        Parameters
        ----------
        tag : str
            The name of the tag to create.
        snapshot_id : str
            The snapshot ID to create the tag at.

        Returns
        -------
        None
        """
        self._repository.create_tag(tag, snapshot_id)

    def list_tags(self) -> set[str]:
        """
        List the tags in the repository.

        Returns
        -------
        set[str]
            A set of tag names.
        """
        return self._repository.list_tags()

    def lookup_tag(self, tag: str) -> str:
        """
        Get the snapshot ID of a tag.

        Parameters
        ----------
        tag : str
            The tag to get the snapshot ID of.

        Returns
        -------
        str
            The snapshot ID of the tag.
        """
        return self._repository.lookup_tag(tag)

    def readonly_session(
        self,
        *,
        branch: str | None = None,
        tag: str | None = None,
        snapshot: str | None = None,
    ) -> Session:
        """
        Create a read-only session.

        This can be thought of as a read-only checkout of the repository at a given snapshot.
        When branch or tag are provided, the session will be based on the tip of the branch or
        the snapshot ID of the tag.

        Parameters
        ----------
        branch : str, optional
            If provided, the branch to create the session on.
        tag : str, optional
            If provided, the tag to create the session on.
        snapshot : str, optional
            If provided, the snapshot ID to create the session on.

        Returns
        -------
        Session
            The read-only session, pointing to the specified snapshot, tag, or branch.

        Notes
        -----
        Only one of the arguments can be specified.
        """
        return Session(
            self._repository.readonly_session(branch=branch, tag=tag, snapshot=snapshot)
        )

    def writable_session(self, branch: str) -> Session:
        """
        Create a writable session on a branch.

        Like the read-only session, this can be thought of as a checkout of the repository at the
        tip of the branch. However, this session is writable and can be used to make changes to the
        repository. When ready, the changes can be committed to the branch, after which the session will
        become a read-only session on the new snapshot.

        Parameters
        ----------
        branch : str
            The branch to create the session on.

        Returns
        -------
        Session
            The writable session on the branch.
        """
        return Session(self._repository.writable_session(branch))

    def expire_snapshots(
        self,
        older_than: datetime.datetime,
        *,
        delete_expired_branches: bool = False,
        delete_expired_tags: bool = False,
    ) -> set[str]:
        """Expire all snapshots older than a threshold.

        This processes snapshots found by navigating all references in
        the repo, tags first, branches leter, both in lexicographical order.

        Returns the ids of all snapshots considered expired and skipped
        from history. Notice that this snapshot are not necessarily
        available for garbage collection, they could still be pointed by
        ether refs.

        If delete_expired_* is set to True, branches or tags that, after the
        expiration process, point to expired snapshots directly, will be
        deleted.

        Warning: this is an administrative operation, it should be run
        carefully. The repository can still operate concurrently while
        `expire_snapshots` runs, but other readers can get inconsistent
        views of the repository history.
        """

        return self._repository.expire_snapshots(older_than)

    def garbage_collect(self, delete_object_older_than: datetime.datetime) -> GCSummary:
        """Delete any objects no longer accessible from any branches or tags.

        Warning: this is an administrative operation, it should be run
        carefully. The repository can still operate concurrently while
        `garbage_collect` runs, but other reades can get inconsistent
        views if they are trying to access the expired snapshots.
        """

        return self._repository.garbage_collect(delete_object_older_than)

config: RepositoryConfig property

Get a copy of this repository's config.

Returns:

Type Description
RepositoryConfig

The repository configuration.

storage: Storage property

Get a copy of this repository's Storage instance.

Returns:

Type Description
Storage

The repository storage instance.

ancestry(*, branch=None, tag=None, snapshot=None)

Get the ancestry of a snapshot.

Parameters:

Name Type Description Default
branch str

The branch to get the ancestry of.

None
tag str

The tag to get the ancestry of.

None
snapshot str

The snapshot ID to get the ancestry of.

None

Returns:

Type Description
list[SnapshotInfo]

The ancestry of the snapshot, listing out the snapshots and their metadata.

Notes

Only one of the arguments can be specified.

Source code in icechunk-python/python/icechunk/repository.py
def ancestry(
    self,
    *,
    branch: str | None = None,
    tag: str | None = None,
    snapshot: str | None = None,
) -> list[SnapshotInfo]:
    """
    Get the ancestry of a snapshot.

    Parameters
    ----------
    branch : str, optional
        The branch to get the ancestry of.
    tag : str, optional
        The tag to get the ancestry of.
    snapshot : str, optional
        The snapshot ID to get the ancestry of.

    Returns
    -------
    list[SnapshotInfo]
        The ancestry of the snapshot, listing out the snapshots and their metadata.

    Notes
    -----
    Only one of the arguments can be specified.
    """
    return self._repository.ancestry(branch=branch, tag=tag, snapshot=snapshot)

async_ancestry(*, branch=None, tag=None, snapshot=None)

Get the ancestry of a snapshot.

Parameters:

Name Type Description Default
branch str

The branch to get the ancestry of.

None
tag str

The tag to get the ancestry of.

None
snapshot str

The snapshot ID to get the ancestry of.

None

Returns:

Type Description
list[SnapshotInfo]

The ancestry of the snapshot, listing out the snapshots and their metadata.

Notes

Only one of the arguments can be specified.

Source code in icechunk-python/python/icechunk/repository.py
def async_ancestry(
    self,
    *,
    branch: str | None = None,
    tag: str | None = None,
    snapshot: str | None = None,
) -> AsyncIterator[SnapshotInfo]:
    """
    Get the ancestry of a snapshot.

    Parameters
    ----------
    branch : str, optional
        The branch to get the ancestry of.
    tag : str, optional
        The tag to get the ancestry of.
    snapshot : str, optional
        The snapshot ID to get the ancestry of.

    Returns
    -------
    list[SnapshotInfo]
        The ancestry of the snapshot, listing out the snapshots and their metadata.

    Notes
    -----
    Only one of the arguments can be specified.
    """
    return self._repository.async_ancestry(branch=branch, tag=tag, snapshot=snapshot)

create(storage, config=None, virtual_chunk_credentials=None) classmethod

Create a new Icechunk repository. If one already exists at the given store location, an error will be raised.

Parameters:

Name Type Description Default
storage Storage

The storage configuration for the repository.

required
config RepositoryConfig

The repository configuration. If not provided, a default configuration will be used.

None
virtual_chunk_credentials dict[str, AnyCredential]

Credentials for virtual chunks.

None

Returns:

Type Description
Self

An instance of the Repository class.

Source code in icechunk-python/python/icechunk/repository.py
@classmethod
def create(
    cls,
    storage: Storage,
    config: RepositoryConfig | None = None,
    virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
) -> Self:
    """
    Create a new Icechunk repository.
    If one already exists at the given store location, an error will be raised.

    Parameters
    ----------
    storage : Storage
        The storage configuration for the repository.
    config : RepositoryConfig, optional
        The repository configuration. If not provided, a default configuration will be used.
    virtual_chunk_credentials : dict[str, AnyCredential], optional
        Credentials for virtual chunks.

    Returns
    -------
    Self
        An instance of the Repository class.
    """
    return cls(
        PyRepository.create(
            storage,
            config=config,
            virtual_chunk_credentials=virtual_chunk_credentials,
        )
    )

create_branch(branch, snapshot_id)

Create a new branch at the given snapshot.

Parameters:

Name Type Description Default
branch str

The name of the branch to create.

required
snapshot_id str

The snapshot ID to create the branch at.

required

Returns:

Type Description
None
Source code in icechunk-python/python/icechunk/repository.py
def create_branch(self, branch: str, snapshot_id: str) -> None:
    """
    Create a new branch at the given snapshot.

    Parameters
    ----------
    branch : str
        The name of the branch to create.
    snapshot_id : str
        The snapshot ID to create the branch at.

    Returns
    -------
    None
    """
    self._repository.create_branch(branch, snapshot_id)

create_tag(tag, snapshot_id)

Create a new tag at the given snapshot.

Parameters:

Name Type Description Default
tag str

The name of the tag to create.

required
snapshot_id str

The snapshot ID to create the tag at.

required

Returns:

Type Description
None
Source code in icechunk-python/python/icechunk/repository.py
def create_tag(self, tag: str, snapshot_id: str) -> None:
    """
    Create a new tag at the given snapshot.

    Parameters
    ----------
    tag : str
        The name of the tag to create.
    snapshot_id : str
        The snapshot ID to create the tag at.

    Returns
    -------
    None
    """
    self._repository.create_tag(tag, snapshot_id)

delete_branch(branch)

Delete a branch.

Parameters:

Name Type Description Default
branch str

The branch to delete.

required

Returns:

Type Description
None
Source code in icechunk-python/python/icechunk/repository.py
def delete_branch(self, branch: str) -> None:
    """
    Delete a branch.

    Parameters
    ----------
    branch : str
        The branch to delete.

    Returns
    -------
    None
    """
    self._repository.delete_branch(branch)

delete_tag(branch)

Delete a tag.

Parameters:

Name Type Description Default
tag str

The tag to delete.

required

Returns:

Type Description
None
Source code in icechunk-python/python/icechunk/repository.py
def delete_tag(self, branch: str) -> None:
    """
    Delete a tag.

    Parameters
    ----------
    tag : str
        The tag to delete.

    Returns
    -------
    None
    """
    self._repository.delete_tag(branch)

exists(storage) staticmethod

Check if a repository exists at the given storage location.

Parameters:

Name Type Description Default
storage Storage

The storage configuration for the repository.

required

Returns:

Type Description
bool

True if the repository exists, False otherwise.

Source code in icechunk-python/python/icechunk/repository.py
@staticmethod
def exists(storage: Storage) -> bool:
    """
    Check if a repository exists at the given storage location.

    Parameters
    ----------
    storage : Storage
        The storage configuration for the repository.

    Returns
    -------
    bool
        True if the repository exists, False otherwise.
    """
    return PyRepository.exists(storage)

expire_snapshots(older_than, *, delete_expired_branches=False, delete_expired_tags=False)

Expire all snapshots older than a threshold.

This processes snapshots found by navigating all references in the repo, tags first, branches leter, both in lexicographical order.

Returns the ids of all snapshots considered expired and skipped from history. Notice that this snapshot are not necessarily available for garbage collection, they could still be pointed by ether refs.

If delete_expired_* is set to True, branches or tags that, after the expiration process, point to expired snapshots directly, will be deleted.

Warning: this is an administrative operation, it should be run carefully. The repository can still operate concurrently while expire_snapshots runs, but other readers can get inconsistent views of the repository history.

Source code in icechunk-python/python/icechunk/repository.py
def expire_snapshots(
    self,
    older_than: datetime.datetime,
    *,
    delete_expired_branches: bool = False,
    delete_expired_tags: bool = False,
) -> set[str]:
    """Expire all snapshots older than a threshold.

    This processes snapshots found by navigating all references in
    the repo, tags first, branches leter, both in lexicographical order.

    Returns the ids of all snapshots considered expired and skipped
    from history. Notice that this snapshot are not necessarily
    available for garbage collection, they could still be pointed by
    ether refs.

    If delete_expired_* is set to True, branches or tags that, after the
    expiration process, point to expired snapshots directly, will be
    deleted.

    Warning: this is an administrative operation, it should be run
    carefully. The repository can still operate concurrently while
    `expire_snapshots` runs, but other readers can get inconsistent
    views of the repository history.
    """

    return self._repository.expire_snapshots(older_than)

fetch_config(storage) staticmethod

Fetch the configuration for the repository saved in storage.

Parameters:

Name Type Description Default
storage Storage

The storage configuration for the repository.

required

Returns:

Type Description
RepositoryConfig | None

The repository configuration if it exists, None otherwise.

Source code in icechunk-python/python/icechunk/repository.py
@staticmethod
def fetch_config(storage: Storage) -> RepositoryConfig | None:
    """
    Fetch the configuration for the repository saved in storage.

    Parameters
    ----------
    storage : Storage
        The storage configuration for the repository.

    Returns
    -------
    RepositoryConfig | None
        The repository configuration if it exists, None otherwise.
    """
    return PyRepository.fetch_config(storage)

garbage_collect(delete_object_older_than)

Delete any objects no longer accessible from any branches or tags.

Warning: this is an administrative operation, it should be run carefully. The repository can still operate concurrently while garbage_collect runs, but other reades can get inconsistent views if they are trying to access the expired snapshots.

Source code in icechunk-python/python/icechunk/repository.py
def garbage_collect(self, delete_object_older_than: datetime.datetime) -> GCSummary:
    """Delete any objects no longer accessible from any branches or tags.

    Warning: this is an administrative operation, it should be run
    carefully. The repository can still operate concurrently while
    `garbage_collect` runs, but other reades can get inconsistent
    views if they are trying to access the expired snapshots.
    """

    return self._repository.garbage_collect(delete_object_older_than)

list_branches()

List the branches in the repository.

Returns:

Type Description
set[str]

A set of branch names.

Source code in icechunk-python/python/icechunk/repository.py
def list_branches(self) -> set[str]:
    """
    List the branches in the repository.

    Returns
    -------
    set[str]
        A set of branch names.
    """
    return self._repository.list_branches()

list_tags()

List the tags in the repository.

Returns:

Type Description
set[str]

A set of tag names.

Source code in icechunk-python/python/icechunk/repository.py
def list_tags(self) -> set[str]:
    """
    List the tags in the repository.

    Returns
    -------
    set[str]
        A set of tag names.
    """
    return self._repository.list_tags()

lookup_branch(branch)

Get the tip snapshot ID of a branch.

Parameters:

Name Type Description Default
branch str

The branch to get the tip of.

required

Returns:

Type Description
str

The snapshot ID of the tip of the branch.

Source code in icechunk-python/python/icechunk/repository.py
def lookup_branch(self, branch: str) -> str:
    """
    Get the tip snapshot ID of a branch.

    Parameters
    ----------
    branch : str
        The branch to get the tip of.

    Returns
    -------
    str
        The snapshot ID of the tip of the branch.
    """
    return self._repository.lookup_branch(branch)

lookup_tag(tag)

Get the snapshot ID of a tag.

Parameters:

Name Type Description Default
tag str

The tag to get the snapshot ID of.

required

Returns:

Type Description
str

The snapshot ID of the tag.

Source code in icechunk-python/python/icechunk/repository.py
def lookup_tag(self, tag: str) -> str:
    """
    Get the snapshot ID of a tag.

    Parameters
    ----------
    tag : str
        The tag to get the snapshot ID of.

    Returns
    -------
    str
        The snapshot ID of the tag.
    """
    return self._repository.lookup_tag(tag)

open(storage, config=None, virtual_chunk_credentials=None) classmethod

Open an existing Icechunk repository.

If no repository exists at the given storage location, an error will be raised.

Parameters:

Name Type Description Default
storage Storage

The storage configuration for the repository.

required
config RepositoryConfig

The repository settings. If not provided, a default configuration will be loaded from the repository.

None
virtual_chunk_credentials dict[str, AnyCredential]

Credentials for virtual chunks.

None

Returns:

Type Description
Self

An instance of the Repository class.

Source code in icechunk-python/python/icechunk/repository.py
@classmethod
def open(
    cls,
    storage: Storage,
    config: RepositoryConfig | None = None,
    virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
) -> Self:
    """
    Open an existing Icechunk repository.

    If no repository exists at the given storage location, an error will be raised.

    Parameters
    ----------
    storage : Storage
        The storage configuration for the repository.
    config : RepositoryConfig, optional
        The repository settings. If not provided, a default configuration will be
        loaded from the repository.
    virtual_chunk_credentials : dict[str, AnyCredential], optional
        Credentials for virtual chunks.

    Returns
    -------
    Self
        An instance of the Repository class.
    """
    return cls(
        PyRepository.open(
            storage,
            config=config,
            virtual_chunk_credentials=virtual_chunk_credentials,
        )
    )

open_or_create(storage, config=None, virtual_chunk_credentials=None) classmethod

Open an existing Icechunk repository or create a new one if it does not exist.

Parameters:

Name Type Description Default
storage Storage

The storage configuration for the repository.

required
config RepositoryConfig

The repository settings. If not provided, a default configuration will be loaded from the repository.

None
virtual_chunk_credentials dict[str, AnyCredential]

Credentials for virtual chunks.

None

Returns:

Type Description
Self

An instance of the Repository class.

Source code in icechunk-python/python/icechunk/repository.py
@classmethod
def open_or_create(
    cls,
    storage: Storage,
    config: RepositoryConfig | None = None,
    virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
) -> Self:
    """
    Open an existing Icechunk repository or create a new one if it does not exist.

    Parameters
    ----------
    storage : Storage
        The storage configuration for the repository.
    config : RepositoryConfig, optional
        The repository settings. If not provided, a default configuration will be
        loaded from the repository.
    virtual_chunk_credentials : dict[str, AnyCredential], optional
        Credentials for virtual chunks.

    Returns
    -------
    Self
        An instance of the Repository class.
    """
    return cls(
        PyRepository.open_or_create(
            storage,
            config=config,
            virtual_chunk_credentials=virtual_chunk_credentials,
        )
    )

readonly_session(*, branch=None, tag=None, snapshot=None)

Create a read-only session.

This can be thought of as a read-only checkout of the repository at a given snapshot. When branch or tag are provided, the session will be based on the tip of the branch or the snapshot ID of the tag.

Parameters:

Name Type Description Default
branch str

If provided, the branch to create the session on.

None
tag str

If provided, the tag to create the session on.

None
snapshot str

If provided, the snapshot ID to create the session on.

None

Returns:

Type Description
Session

The read-only session, pointing to the specified snapshot, tag, or branch.

Notes

Only one of the arguments can be specified.

Source code in icechunk-python/python/icechunk/repository.py
def readonly_session(
    self,
    *,
    branch: str | None = None,
    tag: str | None = None,
    snapshot: str | None = None,
) -> Session:
    """
    Create a read-only session.

    This can be thought of as a read-only checkout of the repository at a given snapshot.
    When branch or tag are provided, the session will be based on the tip of the branch or
    the snapshot ID of the tag.

    Parameters
    ----------
    branch : str, optional
        If provided, the branch to create the session on.
    tag : str, optional
        If provided, the tag to create the session on.
    snapshot : str, optional
        If provided, the snapshot ID to create the session on.

    Returns
    -------
    Session
        The read-only session, pointing to the specified snapshot, tag, or branch.

    Notes
    -----
    Only one of the arguments can be specified.
    """
    return Session(
        self._repository.readonly_session(branch=branch, tag=tag, snapshot=snapshot)
    )

reset_branch(branch, snapshot_id)

Reset a branch to a specific snapshot.

This will permanently alter the history of the branch such that the tip of the branch is the specified snapshot.

Parameters:

Name Type Description Default
branch str

The branch to reset.

required
snapshot_id str

The snapshot ID to reset the branch to.

required

Returns:

Type Description
None
Source code in icechunk-python/python/icechunk/repository.py
def reset_branch(self, branch: str, snapshot_id: str) -> None:
    """
    Reset a branch to a specific snapshot.

    This will permanently alter the history of the branch such that the tip of
    the branch is the specified snapshot.

    Parameters
    ----------
    branch : str
        The branch to reset.
    snapshot_id : str
        The snapshot ID to reset the branch to.

    Returns
    -------
    None
    """
    self._repository.reset_branch(branch, snapshot_id)

save_config()

Save the repository configuration to storage, this configuration will be used in future calls to Repository.open.

Returns:

Type Description
None
Source code in icechunk-python/python/icechunk/repository.py
def save_config(self) -> None:
    """
    Save the repository configuration to storage, this configuration will be used in future calls to Repository.open.

    Returns
    -------
    None
    """
    return self._repository.save_config()

writable_session(branch)

Create a writable session on a branch.

Like the read-only session, this can be thought of as a checkout of the repository at the tip of the branch. However, this session is writable and can be used to make changes to the repository. When ready, the changes can be committed to the branch, after which the session will become a read-only session on the new snapshot.

Parameters:

Name Type Description Default
branch str

The branch to create the session on.

required

Returns:

Type Description
Session

The writable session on the branch.

Source code in icechunk-python/python/icechunk/repository.py
def writable_session(self, branch: str) -> Session:
    """
    Create a writable session on a branch.

    Like the read-only session, this can be thought of as a checkout of the repository at the
    tip of the branch. However, this session is writable and can be used to make changes to the
    repository. When ready, the changes can be committed to the branch, after which the session will
    become a read-only session on the new snapshot.

    Parameters
    ----------
    branch : str
        The branch to create the session on.

    Returns
    -------
    Session
        The writable session on the branch.
    """
    return Session(self._repository.writable_session(branch))

RepositoryConfig

Configuration for an Icechunk repository

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class RepositoryConfig:
    """Configuration for an Icechunk repository"""

    def __init__(
        self,
        inline_chunk_threshold_bytes: int | None = None,
        unsafe_overwrite_refs: bool | None = None,
        get_partial_values_concurrency: int | None = None,
        compression: CompressionConfig | None = None,
        caching: CachingConfig | None = None,
        storage: StorageSettings | None = None,
        virtual_chunk_containers: dict[str, VirtualChunkContainer] | None = None,
        manifest: ManifestConfig | None = None,
    ) -> None: ...
    @staticmethod
    def default() -> RepositoryConfig: ...
    @property
    def inline_chunk_threshold_bytes(self) -> int | None: ...
    @inline_chunk_threshold_bytes.setter
    def inline_chunk_threshold_bytes(self, value: int | None) -> None: ...
    @property
    def unsafe_overwrite_refs(self) -> bool | None: ...
    @unsafe_overwrite_refs.setter
    def unsafe_overwrite_refs(self, value: bool | None) -> None: ...
    @property
    def get_partial_values_concurrency(self) -> int | None: ...
    @get_partial_values_concurrency.setter
    def get_partial_values_concurrency(self, value: int | None) -> None: ...
    @property
    def compression(self) -> CompressionConfig | None: ...
    @compression.setter
    def compression(self, value: CompressionConfig | None) -> None: ...
    @property
    def caching(self) -> CachingConfig | None: ...
    @caching.setter
    def caching(self, value: CachingConfig | None) -> None: ...
    @property
    def storage(self) -> StorageSettings | None: ...
    @storage.setter
    def storage(self, value: StorageSettings | None) -> None: ...
    @property
    def manifest(self) -> ManifestConfig | None: ...
    @manifest.setter
    def manifest(self, value: ManifestConfig | None) -> None: ...
    @property
    def virtual_chunk_containers(self) -> dict[str, VirtualChunkContainer] | None: ...
    def get_virtual_chunk_container(self, name: str) -> VirtualChunkContainer | None: ...
    def set_virtual_chunk_container(self, cont: VirtualChunkContainer) -> None: ...
    def clear_virtual_chunk_containers(self) -> None: ...

Session

A session object that allows for reading and writing data from an Icechunk repository.

Source code in icechunk-python/python/icechunk/session.py
class Session:
    """A session object that allows for reading and writing data from an Icechunk repository."""

    _session: PySession
    _allow_pickling: bool

    def __init__(self, session: PySession, _allow_pickling: bool = False):
        self._session = session
        self._allow_pickling = _allow_pickling

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, Session):
            return False
        return self._session == value._session

    def __getstate__(self) -> object:
        if not self._allow_pickling and not self.read_only:
            raise ValueError(
                "You must opt-in to pickle writable sessions in a distributed context "
                "using the `Session.allow_pickling` context manager. "
                # link to docs
                "If you are using xarray's `Dataset.to_zarr` method with dask arrays, "
                "please consider `icechunk.xarray.to_icechunk` instead."
            )
        state = {
            "_session": self._session.as_bytes(),
            "_allow_pickling": self._allow_pickling,
        }
        return state

    def __setstate__(self, state: object) -> None:
        if not isinstance(state, dict):
            raise ValueError("Invalid state")
        self._session = PySession.from_bytes(state["_session"])
        self._allow_pickling = state["_allow_pickling"]

    @contextlib.contextmanager
    def allow_pickling(self) -> Generator[None, None, None]:
        """
        Context manager to allow unpickling this store if writable.
        """
        try:
            self._allow_pickling = True
            yield
        finally:
            self._allow_pickling = False

    @property
    def read_only(self) -> bool:
        """
        Whether the session is read-only.

        Returns
        -------
        bool
            True if the session is read-only, False otherwise.
        """
        return self._session.read_only

    @property
    def snapshot_id(self) -> str:
        """
        The base snapshot ID of the session.

        Returns
        -------
        str
            The base snapshot ID of the session.
        """
        return self._session.snapshot_id

    @property
    def branch(self) -> str | None:
        """
        The branch that the session is based on. This is only set if the session is writable.

        Returns
        -------
        str or None
            The branch that the session is based on if the session is writable, None otherwise.
        """
        return self._session.branch

    @property
    def has_uncommitted_changes(self) -> bool:
        """
        Whether the session has uncommitted changes. This is only possibly true if the session is writable.

        Returns
        -------
        bool
            True if the session has uncommitted changes, False otherwise.
        """
        return self._session.has_uncommitted_changes

    def discard_changes(self) -> None:
        """
        When the session is writable, discard any uncommitted changes.
        """
        self._session.discard_changes()

    @property
    def store(self) -> IcechunkStore:
        """
        Get a zarr Store object for reading and writing data from the repository using zarr python.

        Returns
        -------
        IcechunkStore
            A zarr Store object for reading and writing data from the repository.
        """
        return IcechunkStore(self._session.store, self._allow_pickling)

    def all_virtual_chunk_locations(self) -> list[str]:
        """
        Return the location URLs of all virtual chunks.

        Returns
        -------
        list of str
            The location URLs of all virtual chunks.
        """
        return self._session.all_virtual_chunk_locations()

    async def chunk_coordinates(
        self, array_path: str, batch_size: int = 1000
    ) -> AsyncIterator[tuple[int, ...]]:
        """
        Return an async iterator to all initialized chunks for the array at array_path

        Returns
        -------
        an async iterator to chunk coordinates as tuples
        """
        # We do unbatching here to improve speed. Switching to rust to get
        # a batch is much faster than switching for every element
        async for batch in self._session.chunk_coordinates(array_path, batch_size):
            for coord in batch:
                yield tuple(coord)

    def merge(self, other: Self) -> None:
        """
        Merge the changes for this session with the changes from another session.

        Parameters
        ----------
        other : Self
            The other session to merge changes from.
        """
        self._session.merge(other._session)

    def commit(self, message: str, metadata: dict[str, Any] | None = None) -> str:
        """
        Commit the changes in the session to the repository.

        When successful, the writable session is completed and the session is now read-only and based on the new commit. The snapshot ID of the new commit is returned.

        If the session is out of date, this will raise a ConflictError exception depicting the conflict that occurred. The session will need to be rebased before committing.

        Parameters
        ----------
        message : str
            The message to write with the commit.

        Returns
        -------
        str
            The snapshot ID of the new commit.

        Raises
        ------
        ConflictError
            If the session is out of date and a conflict occurs.
        """
        try:
            return self._session.commit(message, metadata)
        except PyConflictError as e:
            raise ConflictError(e) from None

    def rebase(self, solver: ConflictSolver) -> None:
        """
        Rebase the session to the latest ancestry of the branch.

        This method will iteratively crawl the ancestry of the branch and apply the changes from the branch to the session. If a conflict is detected, the conflict solver will be used to optionally resolve the conflict. When complete, the session will be based on the latest commit of the branch and the session will be ready to attempt another commit.

        When a conflict is detected and a resolution is not possible with the provided solver, a RebaseFailed exception will be raised. This exception will contain the snapshot ID that the rebase failed on and a list of conflicts that occurred.

        Parameters
        ----------
        solver : ConflictSolver
            The conflict solver to use when a conflict is detected.

        Raises
        ------
        RebaseFailedError
            When a conflict is detected and the solver fails to resolve it.
        """
        try:
            self._session.rebase(solver)
        except PyRebaseFailedError as e:
            raise RebaseFailedError(e) from None

branch: str | None property

The branch that the session is based on. This is only set if the session is writable.

Returns:

Type Description
str or None

The branch that the session is based on if the session is writable, None otherwise.

has_uncommitted_changes: bool property

Whether the session has uncommitted changes. This is only possibly true if the session is writable.

Returns:

Type Description
bool

True if the session has uncommitted changes, False otherwise.

read_only: bool property

Whether the session is read-only.

Returns:

Type Description
bool

True if the session is read-only, False otherwise.

snapshot_id: str property

The base snapshot ID of the session.

Returns:

Type Description
str

The base snapshot ID of the session.

store: IcechunkStore property

Get a zarr Store object for reading and writing data from the repository using zarr python.

Returns:

Type Description
IcechunkStore

A zarr Store object for reading and writing data from the repository.

all_virtual_chunk_locations()

Return the location URLs of all virtual chunks.

Returns:

Type Description
list of str

The location URLs of all virtual chunks.

Source code in icechunk-python/python/icechunk/session.py
def all_virtual_chunk_locations(self) -> list[str]:
    """
    Return the location URLs of all virtual chunks.

    Returns
    -------
    list of str
        The location URLs of all virtual chunks.
    """
    return self._session.all_virtual_chunk_locations()

allow_pickling()

Context manager to allow unpickling this store if writable.

Source code in icechunk-python/python/icechunk/session.py
@contextlib.contextmanager
def allow_pickling(self) -> Generator[None, None, None]:
    """
    Context manager to allow unpickling this store if writable.
    """
    try:
        self._allow_pickling = True
        yield
    finally:
        self._allow_pickling = False

chunk_coordinates(array_path, batch_size=1000) async

Return an async iterator to all initialized chunks for the array at array_path

Returns:

Type Description
an async iterator to chunk coordinates as tuples
Source code in icechunk-python/python/icechunk/session.py
async def chunk_coordinates(
    self, array_path: str, batch_size: int = 1000
) -> AsyncIterator[tuple[int, ...]]:
    """
    Return an async iterator to all initialized chunks for the array at array_path

    Returns
    -------
    an async iterator to chunk coordinates as tuples
    """
    # We do unbatching here to improve speed. Switching to rust to get
    # a batch is much faster than switching for every element
    async for batch in self._session.chunk_coordinates(array_path, batch_size):
        for coord in batch:
            yield tuple(coord)

commit(message, metadata=None)

Commit the changes in the session to the repository.

When successful, the writable session is completed and the session is now read-only and based on the new commit. The snapshot ID of the new commit is returned.

If the session is out of date, this will raise a ConflictError exception depicting the conflict that occurred. The session will need to be rebased before committing.

Parameters:

Name Type Description Default
message str

The message to write with the commit.

required

Returns:

Type Description
str

The snapshot ID of the new commit.

Raises:

Type Description
ConflictError

If the session is out of date and a conflict occurs.

Source code in icechunk-python/python/icechunk/session.py
def commit(self, message: str, metadata: dict[str, Any] | None = None) -> str:
    """
    Commit the changes in the session to the repository.

    When successful, the writable session is completed and the session is now read-only and based on the new commit. The snapshot ID of the new commit is returned.

    If the session is out of date, this will raise a ConflictError exception depicting the conflict that occurred. The session will need to be rebased before committing.

    Parameters
    ----------
    message : str
        The message to write with the commit.

    Returns
    -------
    str
        The snapshot ID of the new commit.

    Raises
    ------
    ConflictError
        If the session is out of date and a conflict occurs.
    """
    try:
        return self._session.commit(message, metadata)
    except PyConflictError as e:
        raise ConflictError(e) from None

discard_changes()

When the session is writable, discard any uncommitted changes.

Source code in icechunk-python/python/icechunk/session.py
def discard_changes(self) -> None:
    """
    When the session is writable, discard any uncommitted changes.
    """
    self._session.discard_changes()

merge(other)

Merge the changes for this session with the changes from another session.

Parameters:

Name Type Description Default
other Self

The other session to merge changes from.

required
Source code in icechunk-python/python/icechunk/session.py
def merge(self, other: Self) -> None:
    """
    Merge the changes for this session with the changes from another session.

    Parameters
    ----------
    other : Self
        The other session to merge changes from.
    """
    self._session.merge(other._session)

rebase(solver)

Rebase the session to the latest ancestry of the branch.

This method will iteratively crawl the ancestry of the branch and apply the changes from the branch to the session. If a conflict is detected, the conflict solver will be used to optionally resolve the conflict. When complete, the session will be based on the latest commit of the branch and the session will be ready to attempt another commit.

When a conflict is detected and a resolution is not possible with the provided solver, a RebaseFailed exception will be raised. This exception will contain the snapshot ID that the rebase failed on and a list of conflicts that occurred.

Parameters:

Name Type Description Default
solver ConflictSolver

The conflict solver to use when a conflict is detected.

required

Raises:

Type Description
RebaseFailedError

When a conflict is detected and the solver fails to resolve it.

Source code in icechunk-python/python/icechunk/session.py
def rebase(self, solver: ConflictSolver) -> None:
    """
    Rebase the session to the latest ancestry of the branch.

    This method will iteratively crawl the ancestry of the branch and apply the changes from the branch to the session. If a conflict is detected, the conflict solver will be used to optionally resolve the conflict. When complete, the session will be based on the latest commit of the branch and the session will be ready to attempt another commit.

    When a conflict is detected and a resolution is not possible with the provided solver, a RebaseFailed exception will be raised. This exception will contain the snapshot ID that the rebase failed on and a list of conflicts that occurred.

    Parameters
    ----------
    solver : ConflictSolver
        The conflict solver to use when a conflict is detected.

    Raises
    ------
    RebaseFailedError
        When a conflict is detected and the solver fails to resolve it.
    """
    try:
        self._session.rebase(solver)
    except PyRebaseFailedError as e:
        raise RebaseFailedError(e) from None

SnapshotInfo

Metadata for a snapshot

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class SnapshotInfo:
    """Metadata for a snapshot"""
    @property
    def id(self) -> str:
        """The snapshot ID"""
        ...
    @property
    def parent_id(self) -> str | None:
        """The snapshot ID"""
        ...
    @property
    def written_at(self) -> datetime.datetime:
        """
        The timestamp when the snapshot was written
        """
        ...
    @property
    def message(self) -> str:
        """
        The commit message of the snapshot
        """
        ...
    @property
    def metadata(self) -> dict[str, Any]:
        """
        The metadata of the snapshot
        """
        ...

id: str property

The snapshot ID

message: str property

The commit message of the snapshot

metadata: dict[str, Any] property

The metadata of the snapshot

parent_id: str | None property

The snapshot ID

written_at: datetime.datetime property

The timestamp when the snapshot was written

Storage

Storage configuration for an IcechunkStore

Currently supports memory, filesystem S3, azure blob, and google cloud storage backends. Use the following methods to create a Storage object with the desired backend.

Ex:

storage = icechunk.in_memory_storage()
storage = icechunk.local_filesystem_storage("/path/to/root")
storage = icechunk.s3_storage("bucket", "prefix", ...)
storage = icechunk.gcs_storage("bucket", "prefix", ...)
storage = icechunk.azure_storage("container", "prefix", ...)

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class Storage:
    """Storage configuration for an IcechunkStore

    Currently supports memory, filesystem S3, azure blob, and google cloud storage backends.
    Use the following methods to create a Storage object with the desired backend.

    Ex:
    ```
    storage = icechunk.in_memory_storage()
    storage = icechunk.local_filesystem_storage("/path/to/root")
    storage = icechunk.s3_storage("bucket", "prefix", ...)
    storage = icechunk.gcs_storage("bucket", "prefix", ...)
    storage = icechunk.azure_storage("container", "prefix", ...)
    ```
    """

    @classmethod
    def new_s3(
        cls,
        config: S3Options,
        bucket: str,
        prefix: str | None,
        credentials: AnyS3Credential | None = None,
    ) -> Storage: ...
    @classmethod
    def new_tigris(
        cls,
        config: S3Options,
        bucket: str,
        prefix: str | None,
        credentials: AnyS3Credential | None = None,
    ) -> Storage: ...
    @classmethod
    def new_in_memory(cls) -> Storage: ...
    @classmethod
    def new_local_filesystem(cls, path: str) -> Storage: ...
    @classmethod
    def new_gcs(
        cls,
        bucket: str,
        prefix: str | None,
        credentials: AnyGcsCredential | None = None,
        *,
        config: dict[str, str] | None = None,
    ) -> Storage: ...
    @classmethod
    def new_azure_blob(
        cls,
        container: str,
        prefix: str,
        credentials: AnyAzureCredential | None = None,
        *,
        config: dict[str, str] | None = None,
    ) -> Storage: ...
    def default_settings(self) -> StorageSettings: ...

StorageConcurrencySettings

Configuration for how Icechunk uses its Storage instance

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class StorageConcurrencySettings:
    """Configuration for how Icechunk uses its Storage instance"""

    def __init__(
        self,
        max_concurrent_requests_for_object: int | None = None,
        ideal_concurrent_request_size: int | None = None,
    ) -> None: ...
    @property
    def max_concurrent_requests_for_object(self) -> int | None: ...
    @max_concurrent_requests_for_object.setter
    def max_concurrent_requests_for_object(self, value: int | None) -> None: ...
    @property
    def ideal_concurrent_request_size(self) -> int | None: ...
    @ideal_concurrent_request_size.setter
    def ideal_concurrent_request_size(self, value: int | None) -> None: ...

StorageSettings

Configuration for how Icechunk uses its Storage instance

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class StorageSettings:
    """Configuration for how Icechunk uses its Storage instance"""

    def __init__(self, concurrency: StorageConcurrencySettings | None = None) -> None: ...
    @property
    def concurrency(self) -> StorageConcurrencySettings | None: ...
    @concurrency.setter
    def concurrency(self, value: StorageConcurrencySettings | None) -> None: ...

VersionSelection

Bases: Enum

Enum for selecting the which version of a conflict

Source code in icechunk-python/python/icechunk/_icechunk_python.pyi
class VersionSelection(Enum):
    """Enum for selecting the which version of a conflict"""

    Fail = 0
    UseOurs = 1
    UseTheirs = 2

azure_credentials(*, access_key=None, sas_token=None, bearer_token=None, from_env=None)

Create credentials Azure Blob Storage object store.

If all arguments are None, credentials are fetched from the operative system environment.

Source code in icechunk-python/python/icechunk/credentials.py
def azure_credentials(
    *,
    access_key: str | None = None,
    sas_token: str | None = None,
    bearer_token: str | None = None,
    from_env: bool | None = None,
) -> AnyAzureCredential:
    """Create credentials Azure Blob Storage object store.

    If all arguments are None, credentials are fetched from the operative system environment.
    """
    if (from_env is None or from_env) and (
        access_key is None and sas_token is None and bearer_token is None
    ):
        return azure_from_env_credentials()

    if (access_key is not None or sas_token is not None or bearer_token is not None) and (
        from_env is None or not from_env
    ):
        return AzureCredentials.Static(
            azure_static_credentials(
                access_key=access_key,
                sas_token=sas_token,
                bearer_token=bearer_token,
            )
        )

    raise ValueError("Conflicting arguments to azure_credentials function")

azure_from_env_credentials()

Instruct Azure Blob Storage object store to fetch credentials from the operative system environment.

Source code in icechunk-python/python/icechunk/credentials.py
def azure_from_env_credentials() -> AzureCredentials.FromEnv:
    """Instruct Azure Blob Storage object store to fetch credentials from the operative system environment."""
    return AzureCredentials.FromEnv()

azure_static_credentials(*, access_key=None, sas_token=None, bearer_token=None)

Create static credentials Azure Blob Storage object store.

Source code in icechunk-python/python/icechunk/credentials.py
def azure_static_credentials(
    *,
    access_key: str | None = None,
    sas_token: str | None = None,
    bearer_token: str | None = None,
) -> AnyAzureStaticCredential:
    """Create static credentials Azure Blob Storage object store."""
    if [access_key, sas_token, bearer_token].count(None) != 2:
        raise ValueError("Conflicting arguments to azure_static_credentials function")
    if access_key is not None:
        return AzureStaticCredentials.AccessKey(access_key)
    if sas_token is not None:
        return AzureStaticCredentials.SasToken(sas_token)
    if bearer_token is not None:
        return AzureStaticCredentials.BearerToken(bearer_token)
    raise ValueError(
        "No valid static credential provided for Azure Blob Storage object store"
    )

azure_storage(*, container, prefix, access_key=None, sas_token=None, bearer_token=None, from_env=None, config=None)

Create a Storage instance that saves data in Azure Blob Storage object store.

Parameters:

Name Type Description Default
container str

The container where the repository will store its data

required
prefix str

The prefix within the container that is the root directory of the repository

required
access_key str | None

Azure Blob Storage credential access key

None
sas_token str | None

Azure Blob Storage credential SAS token

None
bearer_token str | None

Azure Blob Storage credential bearer token

None
from_env bool | None

Fetch credentials from the operative system environment

None
Source code in icechunk-python/python/icechunk/storage.py
def azure_storage(
    *,
    container: str,
    prefix: str,
    access_key: str | None = None,
    sas_token: str | None = None,
    bearer_token: str | None = None,
    from_env: bool | None = None,
    config: dict[str, str] | None = None,
) -> Storage:
    """Create a Storage instance that saves data in Azure Blob Storage object store.

    Parameters
    ----------
    container: str
        The container where the repository will store its data
    prefix: str
        The prefix within the container that is the root directory of the repository
    access_key: str | None
        Azure Blob Storage credential access key
    sas_token: str | None
        Azure Blob Storage credential SAS token
    bearer_token: str | None
        Azure Blob Storage credential bearer token
    from_env: bool | None
        Fetch credentials from the operative system environment
    """
    credentials = azure_credentials(
        access_key=access_key,
        sas_token=sas_token,
        bearer_token=bearer_token,
        from_env=from_env,
    )
    return Storage.new_azure_blob(
        container=container,
        prefix=prefix,
        credentials=credentials,
        config=config,
    )

containers_credentials(m={}, **kwargs)

Build a map of credentials for virtual chunk containers.

Example usage:

import icechunk as ic

config = ic.RepositoryConfig.default()
config.inline_chunk_threshold_bytes = 512

virtual_store_config = ic.s3_store(
    region="us-east-1",
    endpoint_url="http://localhost:9000",
    allow_http=True,
    s3_compatible=True,
)
container = ic.VirtualChunkContainer("s3", "s3://", virtual_store_config)
config.set_virtual_chunk_container(container)
credentials = ic.containers_credentials(
    s3=ic.s3_credentials(access_key_id="ACCESS_KEY", secret_access_key="SECRET")
)

repo = ic.Repository.create(
    storage=ic.local_filesystem_storage(store_path),
    config=config,
    virtual_chunk_credentials=credentials,
)

Parameters:

Name Type Description Default
m Mapping[str, AnyS3Credential]

A mapping from container name to credentials.

{}
Source code in icechunk-python/python/icechunk/credentials.py
def containers_credentials(
    m: Mapping[str, AnyS3Credential] = {}, **kwargs: AnyS3Credential
) -> dict[str, Credentials.S3]:
    """Build a map of credentials for virtual chunk containers.

    Example usage:
    ```
    import icechunk as ic

    config = ic.RepositoryConfig.default()
    config.inline_chunk_threshold_bytes = 512

    virtual_store_config = ic.s3_store(
        region="us-east-1",
        endpoint_url="http://localhost:9000",
        allow_http=True,
        s3_compatible=True,
    )
    container = ic.VirtualChunkContainer("s3", "s3://", virtual_store_config)
    config.set_virtual_chunk_container(container)
    credentials = ic.containers_credentials(
        s3=ic.s3_credentials(access_key_id="ACCESS_KEY", secret_access_key="SECRET")
    )

    repo = ic.Repository.create(
        storage=ic.local_filesystem_storage(store_path),
        config=config,
        virtual_chunk_credentials=credentials,
    )
    ```

    Parameters
    ----------
    m: Mapping[str, AnyS3Credential]
        A mapping from container name to credentials.
    """
    res = {}
    for name, cred in {**m, **kwargs}.items():
        if isinstance(cred, AnyS3Credential):
            res[name] = Credentials.S3(cred)
        else:
            raise ValueError(f"Unknown credential type {type(cred)}")
    return res

gcs_credentials(*, service_account_file=None, service_account_key=None, application_credentials=None, from_env=None)

Create credentials Google Cloud Storage object store.

If all arguments are None, credentials are fetched from the operative system environment.

Source code in icechunk-python/python/icechunk/credentials.py
def gcs_credentials(
    *,
    service_account_file: str | None = None,
    service_account_key: str | None = None,
    application_credentials: str | None = None,
    from_env: bool | None = None,
) -> AnyGcsCredential:
    """Create credentials Google Cloud Storage object store.

    If all arguments are None, credentials are fetched from the operative system environment.
    """
    if (from_env is None or from_env) and (
        service_account_file is None
        and service_account_key is None
        and application_credentials is None
    ):
        return gcs_from_env_credentials()

    if (
        service_account_file is not None
        or service_account_key is not None
        or application_credentials is not None
    ) and (from_env is None or not from_env):
        return GcsCredentials.Static(
            gcs_static_credentials(
                service_account_file=service_account_file,
                service_account_key=service_account_key,
                application_credentials=application_credentials,
            )
        )

    raise ValueError("Conflicting arguments to gcs_credentials function")

gcs_from_env_credentials()

Instruct Google Cloud Storage object store to fetch credentials from the operative system environment.

Source code in icechunk-python/python/icechunk/credentials.py
def gcs_from_env_credentials() -> GcsCredentials.FromEnv:
    """Instruct Google Cloud Storage object store to fetch credentials from the operative system environment."""
    return GcsCredentials.FromEnv()

gcs_static_credentials(*, service_account_file=None, service_account_key=None, application_credentials=None)

Create static credentials Google Cloud Storage object store.

Source code in icechunk-python/python/icechunk/credentials.py
def gcs_static_credentials(
    *,
    service_account_file: str | None = None,
    service_account_key: str | None = None,
    application_credentials: str | None = None,
) -> AnyGcsStaticCredential:
    """Create static credentials Google Cloud Storage object store."""
    if service_account_file is not None:
        return GcsStaticCredentials.ServiceAccount(service_account_file)
    if service_account_key is not None:
        return GcsStaticCredentials.ServiceAccountKey(service_account_key)
    if application_credentials is not None:
        return GcsStaticCredentials.ApplicationCredentials(application_credentials)
    raise ValueError("Conflicting arguments to gcs_static_credentials function")

gcs_storage(*, bucket, prefix, service_account_file=None, service_account_key=None, application_credentials=None, from_env=None, config=None)

Create a Storage instance that saves data in Google Cloud Storage object store.

Parameters:

Name Type Description Default
bucket str

The bucket where the repository will store its data

required
prefix str | None

The prefix within the bucket that is the root directory of the repository

required
from_env bool | None

Fetch credentials from the operative system environment

None
Source code in icechunk-python/python/icechunk/storage.py
def gcs_storage(
    *,
    bucket: str,
    prefix: str | None,
    service_account_file: str | None = None,
    service_account_key: str | None = None,
    application_credentials: str | None = None,
    from_env: bool | None = None,
    config: dict[str, str] | None = None,
) -> Storage:
    """Create a Storage instance that saves data in Google Cloud Storage object store.

    Parameters
    ----------
    bucket: str
        The bucket where the repository will store its data
    prefix: str | None
        The prefix within the bucket that is the root directory of the repository
    from_env: bool | None
        Fetch credentials from the operative system environment
    """
    credentials = gcs_credentials(
        service_account_file=service_account_file,
        service_account_key=service_account_key,
        application_credentials=application_credentials,
        from_env=from_env,
    )
    return Storage.new_gcs(
        bucket=bucket,
        prefix=prefix,
        credentials=credentials,
        config=config,
    )

in_memory_storage()

Create a Storage instance that saves data in memory.

This Storage implementation is used for tests. Data will be lost after the process finishes, and can only be accesses through the Storage instance returned. Different instances don't share data.

Source code in icechunk-python/python/icechunk/storage.py
def in_memory_storage() -> Storage:
    """Create a Storage instance that saves data in memory.

    This Storage implementation is used for tests. Data will be lost after the process finishes, and can only be accesses through the Storage instance returned. Different instances don't share data."""
    return Storage.new_in_memory()

local_filesystem_storage(path)

Create a Storage instance that saves data in the local file system.

This Storage instance is not recommended for production data

Source code in icechunk-python/python/icechunk/storage.py
def local_filesystem_storage(path: str) -> Storage:
    """Create a Storage instance that saves data in the local file system.

    This Storage instance is not recommended for production data
    """
    return Storage.new_local_filesystem(path)

s3_anonymous_credentials()

Create no-signature credentials for S3 and S3 compatible object stores.

Source code in icechunk-python/python/icechunk/credentials.py
def s3_anonymous_credentials() -> S3Credentials.Anonymous:
    """Create no-signature credentials for S3 and S3 compatible object stores."""
    return S3Credentials.Anonymous()

s3_credentials(*, access_key_id=None, secret_access_key=None, session_token=None, expires_after=None, anonymous=None, from_env=None, get_credentials=None)

Create credentials for S3 and S3 compatible object stores.

If all arguments are None, credentials are fetched from the environment.

Parameters:

Name Type Description Default
access_key_id str | None

S3 credential access key

None
secret_access_key str | None

S3 credential secret access key

None
session_token str | None

Optional S3 credential session token

None
expires_after datetime | None

Optional expiration for the object store credentials

None
anonymous bool | None

If set to True requests to the object store will not be signed

None
from_env bool | None

Fetch credentials from the operative system environment

None
get_credentials Callable[[], S3StaticCredentials] | None

Use this function to get and refresh object store credentials

None
Source code in icechunk-python/python/icechunk/credentials.py
def s3_credentials(
    *,
    access_key_id: str | None = None,
    secret_access_key: str | None = None,
    session_token: str | None = None,
    expires_after: datetime | None = None,
    anonymous: bool | None = None,
    from_env: bool | None = None,
    get_credentials: Callable[[], S3StaticCredentials] | None = None,
) -> AnyS3Credential:
    """Create credentials for S3 and S3 compatible object stores.

    If all arguments are None, credentials are fetched from the environment.

    Parameters
    ----------
    access_key_id: str | None
        S3 credential access key
    secret_access_key: str | None
        S3 credential secret access key
    session_token: str | None
        Optional S3 credential session token
    expires_after: datetime | None
        Optional expiration for the object store credentials
    anonymous: bool | None
        If set to True requests to the object store will not be signed
    from_env: bool | None
        Fetch credentials from the operative system environment
    get_credentials: Callable[[], S3StaticCredentials] | None
        Use this function to get and refresh object store credentials
    """
    if (
        (from_env is None or from_env)
        and access_key_id is None
        and secret_access_key is None
        and session_token is None
        and expires_after is None
        and not anonymous
        and get_credentials is None
    ):
        return s3_from_env_credentials()

    if (
        anonymous
        and access_key_id is None
        and secret_access_key is None
        and session_token is None
        and expires_after is None
        and not from_env
        and get_credentials is None
    ):
        return s3_anonymous_credentials()

    if (
        get_credentials is not None
        and access_key_id is None
        and secret_access_key is None
        and session_token is None
        and expires_after is None
        and not from_env
        and not anonymous
    ):
        return s3_refreshable_credentials(get_credentials)

    if (
        access_key_id
        and secret_access_key
        and not from_env
        and not anonymous
        and get_credentials is None
    ):
        return s3_static_credentials(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            expires_after=expires_after,
        )

    raise ValueError("Conflicting arguments to s3_credentials function")

s3_from_env_credentials()

Instruct S3 and S3 compatible object stores to gather credentials from the operative system environment.

Source code in icechunk-python/python/icechunk/credentials.py
def s3_from_env_credentials() -> S3Credentials.FromEnv:
    """Instruct S3 and S3 compatible object stores to gather credentials from the operative system environment."""
    return S3Credentials.FromEnv()

s3_refreshable_credentials(get_credentials)

Create refreshable credentials for S3 and S3 compatible object stores.

Parameters:

Name Type Description Default
get_credentials Callable[[], S3StaticCredentials]

Use this function to get and refresh the credentials. The function must be pickable.

required
Source code in icechunk-python/python/icechunk/credentials.py
def s3_refreshable_credentials(
    get_credentials: Callable[[], S3StaticCredentials],
) -> S3Credentials.Refreshable:
    """Create refreshable credentials for S3 and S3 compatible object stores.


    Parameters
    ----------
    get_credentials: Callable[[], S3StaticCredentials]
        Use this function to get and refresh the credentials. The function must be pickable.
    """
    return S3Credentials.Refreshable(pickle.dumps(get_credentials))

s3_static_credentials(*, access_key_id, secret_access_key, session_token=None, expires_after=None)

Create static credentials for S3 and S3 compatible object stores.

Parameters:

Name Type Description Default
access_key_id str

S3 credential access key

required
secret_access_key str

S3 credential secret access key

required
session_token str | None

Optional S3 credential session token

None
expires_after datetime | None

Optional expiration for the object store credentials

None
Source code in icechunk-python/python/icechunk/credentials.py
def s3_static_credentials(
    *,
    access_key_id: str,
    secret_access_key: str,
    session_token: str | None = None,
    expires_after: datetime | None = None,
) -> S3Credentials.Static:
    """Create static credentials for S3 and S3 compatible object stores.

    Parameters
    ----------
    access_key_id: str | None
        S3 credential access key
    secret_access_key: str | None
        S3 credential secret access key
    session_token: str | None
        Optional S3 credential session token
    expires_after: datetime | None
        Optional expiration for the object store credentials
    """
    return S3Credentials.Static(
        S3StaticCredentials(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            expires_after=expires_after,
        )
    )

s3_storage(*, bucket, prefix, region=None, endpoint_url=None, allow_http=False, access_key_id=None, secret_access_key=None, session_token=None, expires_after=None, anonymous=None, from_env=None, get_credentials=None)

Create a Storage instance that saves data in S3 or S3 compatible object stores.

Parameters:

Name Type Description Default
bucket str

The bucket where the repository will store its data

required
prefix str | None

The prefix within the bucket that is the root directory of the repository

required
region str | None

The region to use in the object store, if None a default region will be used

None
endpoint_url str | None

Optional endpoint where the object store serves data, example: http://localhost:9000

None
allow_http bool

If the object store can be accessed using http protocol instead of https

False
access_key_id str | None

S3 credential access key

None
secret_access_key str | None

S3 credential secret access key

None
session_token str | None

Optional S3 credential session token

None
expires_after datetime | None

Optional expiration for the object store credentials

None
anonymous bool | None

If set to True requests to the object store will not be signed

None
from_env bool | None

Fetch credentials from the operative system environment

None
get_credentials Callable[[], S3StaticCredentials] | None

Use this function to get and refresh object store credentials

None
Source code in icechunk-python/python/icechunk/storage.py
def s3_storage(
    *,
    bucket: str,
    prefix: str | None,
    region: str | None = None,
    endpoint_url: str | None = None,
    allow_http: bool = False,
    access_key_id: str | None = None,
    secret_access_key: str | None = None,
    session_token: str | None = None,
    expires_after: datetime | None = None,
    anonymous: bool | None = None,
    from_env: bool | None = None,
    get_credentials: Callable[[], S3StaticCredentials] | None = None,
) -> Storage:
    """Create a Storage instance that saves data in S3 or S3 compatible object stores.

    Parameters
    ----------
    bucket: str
        The bucket where the repository will store its data
    prefix: str | None
        The prefix within the bucket that is the root directory of the repository
    region: str | None
        The region to use in the object store, if `None` a default region will be used
    endpoint_url: str | None
        Optional endpoint where the object store serves data, example: http://localhost:9000
    allow_http: bool
        If the object store can be accessed using http protocol instead of https
    access_key_id: str | None
        S3 credential access key
    secret_access_key: str | None
        S3 credential secret access key
    session_token: str | None
        Optional S3 credential session token
    expires_after: datetime | None
        Optional expiration for the object store credentials
    anonymous: bool | None
        If set to True requests to the object store will not be signed
    from_env: bool | None
        Fetch credentials from the operative system environment
    get_credentials: Callable[[], S3StaticCredentials] | None
        Use this function to get and refresh object store credentials
    """
    credentials = s3_credentials(
        access_key_id=access_key_id,
        secret_access_key=secret_access_key,
        session_token=session_token,
        expires_after=expires_after,
        anonymous=anonymous,
        from_env=from_env,
        get_credentials=get_credentials,
    )
    options = S3Options(region=region, endpoint_url=endpoint_url, allow_http=allow_http)
    return Storage.new_s3(
        config=options,
        bucket=bucket,
        prefix=prefix,
        credentials=credentials,
    )

s3_store(region=None, endpoint_url=None, allow_http=False, anonymous=False, s3_compatible=False)

Build an ObjectStoreConfig instance for S3 or S3 compatible object stores.

Source code in icechunk-python/python/icechunk/storage.py
def s3_store(
    region: str | None = None,
    endpoint_url: str | None = None,
    allow_http: bool = False,
    anonymous: bool = False,
    s3_compatible: bool = False,
) -> ObjectStoreConfig.S3Compatible | ObjectStoreConfig.S3:
    """Build an ObjectStoreConfig instance for S3 or S3 compatible object stores."""
    options = S3Options(region=region, endpoint_url=endpoint_url, allow_http=allow_http)
    return (
        ObjectStoreConfig.S3Compatible(options)
        if s3_compatible
        else ObjectStoreConfig.S3(options)
    )

tigris_storage(*, bucket, prefix, region=None, endpoint_url=None, allow_http=False, access_key_id=None, secret_access_key=None, session_token=None, expires_after=None, anonymous=None, from_env=None, get_credentials=None)

Create a Storage instance that saves data in Tigris object store.

Parameters:

Name Type Description Default
bucket str

The bucket where the repository will store its data

required
prefix str | None

The prefix within the bucket that is the root directory of the repository

required
region str | None

The region to use in the object store, if None a default region will be used

None
endpoint_url str | None

Optional endpoint where the object store serves data, example: http://localhost:9000

None
allow_http bool

If the object store can be accessed using http protocol instead of https

False
access_key_id str | None

S3 credential access key

None
secret_access_key str | None

S3 credential secret access key

None
session_token str | None

Optional S3 credential session token

None
expires_after datetime | None

Optional expiration for the object store credentials

None
anonymous bool | None

If set to True requests to the object store will not be signed

None
from_env bool | None

Fetch credentials from the operative system environment

None
get_credentials Callable[[], S3StaticCredentials] | None

Use this function to get and refresh object store credentials

None
Source code in icechunk-python/python/icechunk/storage.py
def tigris_storage(
    *,
    bucket: str,
    prefix: str | None,
    region: str | None = None,
    endpoint_url: str | None = None,
    allow_http: bool = False,
    access_key_id: str | None = None,
    secret_access_key: str | None = None,
    session_token: str | None = None,
    expires_after: datetime | None = None,
    anonymous: bool | None = None,
    from_env: bool | None = None,
    get_credentials: Callable[[], S3StaticCredentials] | None = None,
) -> Storage:
    """Create a Storage instance that saves data in Tigris object store.

    Parameters
    ----------
    bucket: str
        The bucket where the repository will store its data
    prefix: str | None
        The prefix within the bucket that is the root directory of the repository
    region: str | None
        The region to use in the object store, if `None` a default region will be used
    endpoint_url: str | None
        Optional endpoint where the object store serves data, example: http://localhost:9000
    allow_http: bool
        If the object store can be accessed using http protocol instead of https
    access_key_id: str | None
        S3 credential access key
    secret_access_key: str | None
        S3 credential secret access key
    session_token: str | None
        Optional S3 credential session token
    expires_after: datetime | None
        Optional expiration for the object store credentials
    anonymous: bool | None
        If set to True requests to the object store will not be signed
    from_env: bool | None
        Fetch credentials from the operative system environment
    get_credentials: Callable[[], S3StaticCredentials] | None
        Use this function to get and refresh object store credentials
    """
    credentials = s3_credentials(
        access_key_id=access_key_id,
        secret_access_key=secret_access_key,
        session_token=session_token,
        expires_after=expires_after,
        anonymous=anonymous,
        from_env=from_env,
        get_credentials=get_credentials,
    )
    options = S3Options(region=region, endpoint_url=endpoint_url, allow_http=allow_http)
    return Storage.new_tigris(
        config=options,
        bucket=bucket,
        prefix=prefix,
        credentials=credentials,
    )

XarrayDatasetWriter dataclass

Write Xarray Datasets to a group in an Icechunk store.

This class is private API. Please do not use it.

Source code in icechunk-python/python/icechunk/xarray.py
@dataclass
class XarrayDatasetWriter:
    """
    Write Xarray Datasets to a group in an Icechunk store.

    This class is private API. Please do not use it.
    """

    dataset: Dataset = field(repr=False)
    store: IcechunkStore = field(kw_only=True)

    safe_chunks: bool = field(kw_only=True, default=True)

    _initialized: bool = field(default=False, repr=False)

    xarray_store: ZarrStore = field(init=False, repr=False)
    writer: LazyArrayWriter = field(init=False, repr=False)

    def __post_init__(self) -> None:
        if not isinstance(self.store, IcechunkStore):
            raise ValueError(
                f"Please pass in an icechunk.Session. Received {type(self.store)!r} instead."
            )

    def _open_group(
        self,
        *,
        group: str | None,
        mode: ZarrWriteModes | None,
        append_dim: Hashable | None,
        region: Region,
    ) -> None:
        concrete_mode: ZarrWriteModes = _choose_default_mode(
            mode=mode, append_dim=append_dim, region=region
        )

        self.xarray_store = ZarrStore.open_group(
            store=self.store,
            group=group,
            mode=concrete_mode,
            zarr_format=3,
            append_dim=append_dim,
            write_region=region,
            safe_chunks=self.safe_chunks,
            synchronizer=None,
            consolidated=False,
            consolidate_on_close=False,
            zarr_version=None,
        )
        self.dataset = self.xarray_store._validate_and_autodetect_region(self.dataset)

    def write_metadata(self, encoding: Mapping[Any, Any] | None = None) -> None:
        """
        This method creates new Zarr arrays when necessary, writes attributes,
        and any in-memory arrays.
        """
        from xarray.backends.api import _validate_dataset_names, dump_to_store

        # validate Dataset keys, DataArray names
        _validate_dataset_names(self.dataset)

        if encoding is None:
            encoding = {}
        self.xarray_store._validate_encoding(encoding)

        # This writes the metadata (zarr.json) for all arrays
        # This also will resize arrays for any appends
        self.writer = LazyArrayWriter()
        dump_to_store(self.dataset, self.xarray_store, self.writer, encoding=encoding)  # type: ignore[no-untyped-call]

        self._initialized = True

    def write_eager(self) -> None:
        """
        Write in-memory variables to store.

        Returns
        -------
        None
        """
        if not self._initialized:
            raise ValueError("Please call `write_metadata` first.")
        self.writer.write_eager()

    def write_lazy(
        self,
        chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
        split_every: int | None = None,
    ) -> None:
        """
        Write lazy arrays (e.g. dask) to store.
        """
        if not self._initialized:
            raise ValueError("Please call `write_metadata` first.")

        if not self.writer.sources:
            return

        chunkmanager_store_kwargs = chunkmanager_store_kwargs or {}
        chunkmanager_store_kwargs["load_stored"] = False
        chunkmanager_store_kwargs["return_stored"] = True

        # This calls dask.array.store, and we receive a dask array where each chunk is a Zarr array
        # each of those zarr.Array.store contains the changesets we need
        stored_arrays = self.writer.sync(
            compute=False, chunkmanager_store_kwargs=chunkmanager_store_kwargs
        )  # type: ignore[no-untyped-call]

        # Now we tree-reduce all changesets
        merged_session = stateful_store_reduce(
            stored_arrays,
            prefix="ice-changeset",
            chunk=extract_session,
            aggregate=merge_sessions,
            split_every=split_every,
            compute=True,
            **chunkmanager_store_kwargs,
        )
        self.store.session.merge(merged_session)

write_eager()

Write in-memory variables to store.

Returns:

Type Description
None
Source code in icechunk-python/python/icechunk/xarray.py
def write_eager(self) -> None:
    """
    Write in-memory variables to store.

    Returns
    -------
    None
    """
    if not self._initialized:
        raise ValueError("Please call `write_metadata` first.")
    self.writer.write_eager()

write_lazy(chunkmanager_store_kwargs=None, split_every=None)

Write lazy arrays (e.g. dask) to store.

Source code in icechunk-python/python/icechunk/xarray.py
def write_lazy(
    self,
    chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
    split_every: int | None = None,
) -> None:
    """
    Write lazy arrays (e.g. dask) to store.
    """
    if not self._initialized:
        raise ValueError("Please call `write_metadata` first.")

    if not self.writer.sources:
        return

    chunkmanager_store_kwargs = chunkmanager_store_kwargs or {}
    chunkmanager_store_kwargs["load_stored"] = False
    chunkmanager_store_kwargs["return_stored"] = True

    # This calls dask.array.store, and we receive a dask array where each chunk is a Zarr array
    # each of those zarr.Array.store contains the changesets we need
    stored_arrays = self.writer.sync(
        compute=False, chunkmanager_store_kwargs=chunkmanager_store_kwargs
    )  # type: ignore[no-untyped-call]

    # Now we tree-reduce all changesets
    merged_session = stateful_store_reduce(
        stored_arrays,
        prefix="ice-changeset",
        chunk=extract_session,
        aggregate=merge_sessions,
        split_every=split_every,
        compute=True,
        **chunkmanager_store_kwargs,
    )
    self.store.session.merge(merged_session)

write_metadata(encoding=None)

This method creates new Zarr arrays when necessary, writes attributes, and any in-memory arrays.

Source code in icechunk-python/python/icechunk/xarray.py
def write_metadata(self, encoding: Mapping[Any, Any] | None = None) -> None:
    """
    This method creates new Zarr arrays when necessary, writes attributes,
    and any in-memory arrays.
    """
    from xarray.backends.api import _validate_dataset_names, dump_to_store

    # validate Dataset keys, DataArray names
    _validate_dataset_names(self.dataset)

    if encoding is None:
        encoding = {}
    self.xarray_store._validate_encoding(encoding)

    # This writes the metadata (zarr.json) for all arrays
    # This also will resize arrays for any appends
    self.writer = LazyArrayWriter()
    dump_to_store(self.dataset, self.xarray_store, self.writer, encoding=encoding)  # type: ignore[no-untyped-call]

    self._initialized = True

make_dataset(obj)

make_dataset(obj: DataArray) -> Dataset
make_dataset(obj: Dataset) -> Dataset

Copied from DataArray.to_zarr

Source code in icechunk-python/python/icechunk/xarray.py
def make_dataset(obj: DataArray | Dataset) -> Dataset:
    """Copied from DataArray.to_zarr"""
    DATAARRAY_NAME = "__xarray_dataarray_name__"
    DATAARRAY_VARIABLE = "__xarray_dataarray_variable__"

    if isinstance(obj, Dataset):
        return obj

    assert isinstance(obj, DataArray)

    if obj.name is None:
        # If no name is set then use a generic xarray name
        dataset = obj.to_dataset(name=DATAARRAY_VARIABLE)
    elif obj.name in obj.coords or obj.name in obj.dims:
        # The name is the same as one of the coords names, which the netCDF data model
        # does not support, so rename it but keep track of the old name
        dataset = obj.to_dataset(name=DATAARRAY_VARIABLE)
        dataset.attrs[DATAARRAY_NAME] = obj.name
    else:
        # No problems with the name - so we're fine!
        dataset = obj.to_dataset()
    return dataset

to_icechunk(obj, session, *, group=None, mode=None, safe_chunks=True, append_dim=None, region=None, encoding=None, chunkmanager_store_kwargs=None, split_every=None)

Write an Xarray object to a group of an Icechunk store.

Parameters:

Name Type Description Default
obj DataArray | Dataset

Xarray object to write

required
session Session

Writable Icechunk Session

required
mode "w", "w-", "a", "a-", r+", None

Persistence mode: "w" means create (overwrite if exists); "w-" means create (fail if exists); "a" means override all existing variables including dimension coordinates (create if does not exist); "a-" means only append those variables that have append_dim. "r+" means modify existing array values only (raise an error if any metadata or shapes would change). The default mode is "a" if append_dim is set. Otherwise, it is "r+" if region is set and w- otherwise.

"w"
group str

Group path. (a.k.a. path in zarr terminology.)

None
encoding dict

Nested dictionary with variable names as keys and dictionaries of variable specific encodings as values, e.g., {"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}

None
append_dim hashable

If set, the dimension along which the data will be appended. All other dimensions on overridden variables must remain the same size.

None
region dict or auto

Optional mapping from dimension names to either a) "auto", or b) integer slices, indicating the region of existing zarr array(s) in which to write this dataset's data.

If "auto" is provided the existing store will be opened and the region inferred by matching indexes. "auto" can be used as a single string, which will automatically infer the region for all dimensions, or as dictionary values for specific dimensions mixed together with explicit slices for other dimensions.

Alternatively integer slices can be provided; for example, {'x': slice(0, 1000), 'y': slice(10000, 11000)} would indicate that values should be written to the region 0:1000 along x and 10000:11000 along y.

Users are expected to ensure that the specified region aligns with Zarr chunk boundaries, and that dask chunks are also aligned. Xarray makes limited checks that these multiple chunk boundaries line up. It is possible to write incomplete chunks and corrupt the data with this option if you are not careful.

None
safe_chunks bool

If True, only allow writes to when there is a many-to-one relationship between Zarr chunks (specified in encoding) and Dask chunks. Set False to override this restriction; however, data may become corrupted if Zarr arrays are written in parallel. In addition to the many-to-one relationship validation, it also detects partial chunks writes when using the region parameter, these partial chunks are considered unsafe in the mode "r+" but safe in the mode "a". Note: Even with these validations it can still be unsafe to write two or more chunked arrays in the same location in parallel if they are not writing in independent regions.

True
chunkmanager_store_kwargs dict

Additional keyword arguments passed on to the ChunkManager.store method used to store chunked arrays. For example for a dask array additional kwargs will be passed eventually to dask.array.store(). Experimental API that should not be relied upon.

None
split_every int | None

Number of tasks to merge at every level of the tree reduction.

None

Returns:

Type Description
None
Notes

Two restrictions apply to the use of region:

  • If region is set, all variables in a dataset must have at least one dimension in common with the region. Other variables should be written in a separate single call to to_icechunk().
  • Dimensions cannot be included in both region and append_dim at the same time. To create empty arrays to fill in with region, use the XarrayDatasetWriter directly.
Source code in icechunk-python/python/icechunk/xarray.py
def to_icechunk(
    obj: DataArray | Dataset,
    session: Session,
    *,
    group: str | None = None,
    mode: ZarrWriteModes | None = None,
    safe_chunks: bool = True,
    append_dim: Hashable | None = None,
    region: Region = None,
    encoding: Mapping[Any, Any] | None = None,
    chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
    split_every: int | None = None,
) -> None:
    """
    Write an Xarray object to a group of an Icechunk store.

    Parameters
    ----------
    obj: DataArray or Dataset
        Xarray object to write
    session : icechunk.Session
        Writable Icechunk Session
    mode : {"w", "w-", "a", "a-", r+", None}, optional
        Persistence mode: "w" means create (overwrite if exists);
        "w-" means create (fail if exists);
        "a" means override all existing variables including dimension coordinates (create if does not exist);
        "a-" means only append those variables that have ``append_dim``.
        "r+" means modify existing array *values* only (raise an error if
        any metadata or shapes would change).
        The default mode is "a" if ``append_dim`` is set. Otherwise, it is
        "r+" if ``region`` is set and ``w-`` otherwise.
    group : str, optional
        Group path. (a.k.a. `path` in zarr terminology.)
    encoding : dict, optional
        Nested dictionary with variable names as keys and dictionaries of
        variable specific encodings as values, e.g.,
        ``{"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}``
    append_dim : hashable, optional
        If set, the dimension along which the data will be appended. All
        other dimensions on overridden variables must remain the same size.
    region : dict or "auto", optional
        Optional mapping from dimension names to either a) ``"auto"``, or b) integer
        slices, indicating the region of existing zarr array(s) in which to write
        this dataset's data.

        If ``"auto"`` is provided the existing store will be opened and the region
        inferred by matching indexes. ``"auto"`` can be used as a single string,
        which will automatically infer the region for all dimensions, or as
        dictionary values for specific dimensions mixed together with explicit
        slices for other dimensions.

        Alternatively integer slices can be provided; for example, ``{'x': slice(0,
        1000), 'y': slice(10000, 11000)}`` would indicate that values should be
        written to the region ``0:1000`` along ``x`` and ``10000:11000`` along
        ``y``.

        Users are expected to ensure that the specified region aligns with
        Zarr chunk boundaries, and that dask chunks are also aligned.
        Xarray makes limited checks that these multiple chunk boundaries line up.
        It is possible to write incomplete chunks and corrupt the data with this
        option if you are not careful.
    safe_chunks : bool, default: True
        If True, only allow writes to when there is a many-to-one relationship
        between Zarr chunks (specified in encoding) and Dask chunks.
        Set False to override this restriction; however, data may become corrupted
        if Zarr arrays are written in parallel.
        In addition to the many-to-one relationship validation, it also detects partial
        chunks writes when using the region parameter,
        these partial chunks are considered unsafe in the mode "r+" but safe in
        the mode "a".
        Note: Even with these validations it can still be unsafe to write
        two or more chunked arrays in the same location in parallel if they are
        not writing in independent regions.
    chunkmanager_store_kwargs : dict, optional
        Additional keyword arguments passed on to the `ChunkManager.store` method used to store
        chunked arrays. For example for a dask array additional kwargs will be passed eventually to
        `dask.array.store()`. Experimental API that should not be relied upon.
    split_every: int, optional
        Number of tasks to merge at every level of the tree reduction.

    Returns
    -------
    None

    Notes
    -----
    Two restrictions apply to the use of ``region``:

      - If ``region`` is set, _all_ variables in a dataset must have at
        least one dimension in common with the region. Other variables
        should be written in a separate single call to ``to_icechunk()``.
      - Dimensions cannot be included in both ``region`` and
        ``append_dim`` at the same time. To create empty arrays to fill
        in with ``region``, use the `XarrayDatasetWriter` directly.
    """

    as_dataset = make_dataset(obj)
    with session.allow_pickling():
        store = session.store
        writer = XarrayDatasetWriter(as_dataset, store=store, safe_chunks=safe_chunks)

        writer._open_group(group=group, mode=mode, append_dim=append_dim, region=region)

        # write metadata
        writer.write_metadata(encoding)
        # write in-memory arrays
        writer.write_eager()
        # eagerly write dask arrays
        writer.write_lazy(chunkmanager_store_kwargs=chunkmanager_store_kwargs)