Skip to content

API Reference

Home / icechunk-python / reference

BasicConflictSolver

Bases: ConflictSolver

A basic conflict solver that allows for simple configuration of resolution behavior

This conflict solver allows for simple configuration of resolution behavior for conflicts that may occur during a rebase operation. It will attempt to resolve a limited set of conflicts based on the configuration options provided.

  • When a user attribute conflict is encountered, the behavior is determined by the on_user_attributes_conflict option
  • When a chunk conflict is encountered, the behavior is determined by the on_chunk_conflict option
  • When an array is deleted that has been updated, fail_on_delete_of_updated_array will determine whether to fail the rebase operation
  • When a group is deleted that has been updated, fail_on_delete_of_updated_group will determine whether to fail the rebase operation
Source code in icechunk/_icechunk_python.pyi
class BasicConflictSolver(ConflictSolver):
    """A basic conflict solver that allows for simple configuration of resolution behavior

    This conflict solver allows for simple configuration of resolution behavior for conflicts that may occur during a rebase operation.
    It will attempt to resolve a limited set of conflicts based on the configuration options provided.

    - When a user attribute conflict is encountered, the behavior is determined by the `on_user_attributes_conflict` option
    - When a chunk conflict is encountered, the behavior is determined by the `on_chunk_conflict` option
    - When an array is deleted that has been updated, `fail_on_delete_of_updated_array` will determine whether to fail the rebase operation
    - When a group is deleted that has been updated, `fail_on_delete_of_updated_group` will determine whether to fail the rebase operation
    """

    def __init__(
        self,
        *,
        on_user_attributes_conflict: VersionSelection = VersionSelection.UseOurs,
        on_chunk_conflict: VersionSelection = VersionSelection.UseOurs,
        fail_on_delete_of_updated_array: bool = False,
        fail_on_delete_of_updated_group: bool = False,
    ) -> None:
        """Create a BasicConflictSolver object with the given configuration options
        Parameters:
        on_user_attributes_conflict: VersionSelection
            The behavior to use when a user attribute conflict is encountered, by default VersionSelection.use_ours()
        on_chunk_conflict: VersionSelection
            The behavior to use when a chunk conflict is encountered, by default VersionSelection.use_theirs()
        fail_on_delete_of_updated_array: bool
            Whether to fail when a chunk is deleted that has been updated, by default False
        fail_on_delete_of_updated_group: bool
            Whether to fail when a group is deleted that has been updated, by default False
        """
        ...

__init__(*, on_user_attributes_conflict=VersionSelection.UseOurs, on_chunk_conflict=VersionSelection.UseOurs, fail_on_delete_of_updated_array=False, fail_on_delete_of_updated_group=False)

Create a BasicConflictSolver object with the given configuration options Parameters: on_user_attributes_conflict: VersionSelection The behavior to use when a user attribute conflict is encountered, by default VersionSelection.use_ours() on_chunk_conflict: VersionSelection The behavior to use when a chunk conflict is encountered, by default VersionSelection.use_theirs() fail_on_delete_of_updated_array: bool Whether to fail when a chunk is deleted that has been updated, by default False fail_on_delete_of_updated_group: bool Whether to fail when a group is deleted that has been updated, by default False

Source code in icechunk/_icechunk_python.pyi
def __init__(
    self,
    *,
    on_user_attributes_conflict: VersionSelection = VersionSelection.UseOurs,
    on_chunk_conflict: VersionSelection = VersionSelection.UseOurs,
    fail_on_delete_of_updated_array: bool = False,
    fail_on_delete_of_updated_group: bool = False,
) -> None:
    """Create a BasicConflictSolver object with the given configuration options
    Parameters:
    on_user_attributes_conflict: VersionSelection
        The behavior to use when a user attribute conflict is encountered, by default VersionSelection.use_ours()
    on_chunk_conflict: VersionSelection
        The behavior to use when a chunk conflict is encountered, by default VersionSelection.use_theirs()
    fail_on_delete_of_updated_array: bool
        Whether to fail when a chunk is deleted that has been updated, by default False
    fail_on_delete_of_updated_group: bool
        Whether to fail when a group is deleted that has been updated, by default False
    """
    ...

CachingConfig

Configuration for how Icechunk caches its metadata files

Source code in icechunk/_icechunk_python.pyi
class CachingConfig:
    """Configuration for how Icechunk caches its metadata files"""

    def __init__(
        self,
        snapshots_cache_size: int,
        manifests_cache_size: int,
        transactions_cache_size: int,
        attributes_cache_size: int,
        chunks_cache_size: int,
    ) -> None: ...
    @property
    def snapshots_cache_size(self) -> int: ...
    @snapshots_cache_size.setter
    def snapshots_cache_size(self, value: int) -> None: ...
    @property
    def manifests_cache_size(self) -> int: ...
    @manifests_cache_size.setter
    def manifests_cache_size(self, value: int) -> None: ...
    @property
    def transactions_cache_size(self) -> int: ...
    @transactions_cache_size.setter
    def transactions_cache_size(self, value: int) -> None: ...
    @property
    def attributes_cache_size(self) -> int: ...
    @attributes_cache_size.setter
    def attributes_cache_size(self, value: int) -> None: ...
    @property
    def chunks_cache_size(self) -> int: ...
    @chunks_cache_size.setter
    def chunks_cache_size(self, value: int) -> None: ...
    @staticmethod
    def default() -> CachingConfig: ...

CompressionAlgorithm

Bases: Enum

Enum for selecting the compression algorithm used by Icechunk to write its metadata files

Source code in icechunk/_icechunk_python.pyi
class CompressionAlgorithm(Enum):
    """Enum for selecting the compression algorithm used by Icechunk to write its metadata files"""

    Zstd = 0

    @staticmethod
    def default() -> CompressionAlgorithm: ...

CompressionConfig

Configuration for how Icechunk compresses its metadata files

Source code in icechunk/_icechunk_python.pyi
class CompressionConfig:
    """Configuration for how Icechunk compresses its metadata files"""
    @property
    def algorithm(self) -> CompressionAlgorithm: ...
    @algorithm.setter
    def algorithm(self, value: CompressionAlgorithm) -> None: ...
    @property
    def level(self) -> int: ...
    @level.setter
    def level(self, value: int) -> None: ...
    @staticmethod
    def default() -> CompressionConfig: ...

Conflict

A conflict detected between snapshots

Source code in icechunk/_icechunk_python.pyi
class Conflict:
    """A conflict detected between snapshots"""

    @property
    def conflict_type(self) -> ConflictType:
        """The type of conflict detected"""
        ...

    @property
    def path(self) -> str:
        """The path of the node that caused the conflict"""
        ...

    @property
    def conflicted_chunks(self) -> list[list[int]] | None:
        """If the conflict is a chunk conflict, this will return the list of chunk indices that are in conflict"""
        ...

conflict_type: ConflictType property

The type of conflict detected

conflicted_chunks: list[list[int]] | None property

If the conflict is a chunk conflict, this will return the list of chunk indices that are in conflict

path: str property

The path of the node that caused the conflict

ConflictDetector

Bases: ConflictSolver

A conflict solver that can be used to detect conflicts between two stores, but does not resolve them

Where the BasicConflictSolver will attempt to resolve conflicts, the ConflictDetector will only detect them. This means that during a rebase operation the ConflictDetector will raise a RebaseFailed error if any conflicts are detected, and allow the rebase operation to be retried with a different conflict resolution strategy. Otherwise, if no conflicts are detected the rebase operation will succeed.

Source code in icechunk/_icechunk_python.pyi
class ConflictDetector(ConflictSolver):
    """A conflict solver that can be used to detect conflicts between two stores, but does not resolve them

    Where the `BasicConflictSolver` will attempt to resolve conflicts, the `ConflictDetector` will only detect them. This means
    that during a rebase operation the `ConflictDetector` will raise a `RebaseFailed` error if any conflicts are detected, and
    allow the rebase operation to be retried with a different conflict resolution strategy. Otherwise, if no conflicts are detected
    the rebase operation will succeed.
    """

    def __init__(self) -> None: ...

ConflictError

Bases: Exception

Error raised when a commit operation fails due to a conflict.

Source code in icechunk/session.py
class ConflictError(Exception):
    """Error raised when a commit operation fails due to a conflict."""

    _error: ConflictErrorData

    def __init__(self, error: PyConflictError) -> None:
        self._error = error.args[0]

    def __str__(self) -> str:
        return str(self._error)

    @property
    def expected_parent(self) -> str:
        """
        The expected parent snapshot ID.

        This is the snapshot ID that the session was based on when the
        commit operation was called.
        """
        return self._error.expected_parent

    @property
    def actual_parent(self) -> str:
        """
        The actual parent snapshot ID of the branch that the session attempted to commit to.

        When the session is based on a branch, this is the snapshot ID of the branch tip. If this
        error is raised, it means the branch was modified and committed by another session after
        the session was created.
        """
        return self._error.actual_parent

actual_parent: str property

The actual parent snapshot ID of the branch that the session attempted to commit to.

When the session is based on a branch, this is the snapshot ID of the branch tip. If this error is raised, it means the branch was modified and committed by another session after the session was created.

expected_parent: str property

The expected parent snapshot ID.

This is the snapshot ID that the session was based on when the commit operation was called.

ConflictErrorData

Data class for conflict errors. This describes the snapshot conflict detected when committing a session

If this error is raised, it means the branch was modified and committed by another session after the session was created.

Source code in icechunk/_icechunk_python.pyi
class ConflictErrorData:
    """Data class for conflict errors. This describes the snapshot conflict detected when committing a session

    If this error is raised, it means the branch was modified and committed by another session after the session was created.
    """
    @property
    def expected_parent(self) -> str:
        """The expected parent snapshot ID.

        This is the snapshot ID that the session was based on when the
        commit operation was called.
        """
        ...
    @property
    def actual_parent(self) -> str:
        """
        The actual parent snapshot ID of the branch that the session attempted to commit to.

        When the session is based on a branch, this is the snapshot ID of the branch tip. If this
        error is raised, it means the branch was modified and committed by another session after
        the session was created.
        """
        ...

actual_parent: str property

The actual parent snapshot ID of the branch that the session attempted to commit to.

When the session is based on a branch, this is the snapshot ID of the branch tip. If this error is raised, it means the branch was modified and committed by another session after the session was created.

expected_parent: str property

The expected parent snapshot ID.

This is the snapshot ID that the session was based on when the commit operation was called.

ConflictSolver

An abstract conflict solver that can be used to detect or resolve conflicts between two stores

This should never be used directly, but should be subclassed to provide specific conflict resolution behavior

Source code in icechunk/_icechunk_python.pyi
class ConflictSolver:
    """An abstract conflict solver that can be used to detect or resolve conflicts between two stores

    This should never be used directly, but should be subclassed to provide specific conflict resolution behavior
    """

    ...

ConflictType

Bases: Enum

Type of conflict detected

Source code in icechunk/_icechunk_python.pyi
class ConflictType(Enum):
    """Type of conflict detected"""

    NewNodeConflictsWithExistingNode = 1
    NewNodeInInvalidGroup = 2
    ZarrMetadataDoubleUpdate = 3
    ZarrMetadataUpdateOfDeletedArray = 4
    UserAttributesDoubleUpdate = 5
    UserAttributesUpdateOfDeletedNode = 6
    ChunkDoubleUpdate = 7
    ChunksUpdatedInDeletedArray = 8
    ChunksUpdatedInUpdatedArray = 9
    DeleteOfUpdatedArray = 10
    DeleteOfUpdatedGroup = 11

IcechunkError

Bases: Exception

Base class for all Icechunk errors

Source code in icechunk/_icechunk_python.pyi
class IcechunkError(Exception):
    """Base class for all Icechunk errors"""

    ...

IcechunkStore

Bases: Store, SyncMixin

Source code in icechunk/store.py
class IcechunkStore(Store, SyncMixin):
    _store: PyStore

    def __init__(
        self,
        store: PyStore,
        *args: Any,
        **kwargs: Any,
    ):
        """Create a new IcechunkStore.

        This should not be called directly, instead use the `create`, `open_existing` or `open_or_create` class methods.
        """
        super().__init__(read_only=store.read_only)
        if store is None:
            raise ValueError(
                "An IcechunkStore should not be created with the default constructor, instead use either the create or open_existing class methods."
            )
        self._store = store
        self._is_open = True

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, IcechunkStore):
            return False
        return self._store == value._store

    def __getstate__(self) -> object:
        # we serialize the Rust store as bytes
        d = self.__dict__.copy()
        d["_store"] = self._store.as_bytes()
        return d

    def __setstate__(self, state: Any) -> None:
        # we have to deserialize the bytes of the Rust store
        store_repr = state["_store"]
        state["_store"] = PyStore.from_bytes(store_repr)
        state["_read_only"] = state["_store"].read_only
        self.__dict__ = state

    @property
    def session(self) -> "Session":
        from icechunk import Session

        return Session(self._store.session)

    async def clear(self) -> None:
        """Clear the store.

        This will remove all contents from the current session,
        including all groups and all arrays. But it will not modify the repository history.
        """
        return await self._store.clear()

    def sync_clear(self) -> None:
        """Clear the store.

        This will remove all contents from the current session,
        including all groups and all arrays. But it will not modify the repository history.
        """
        return self._store.sync_clear()

    async def is_empty(self, prefix: str) -> bool:
        """
        Check if the directory is empty.

        Parameters
        ----------
        prefix : str
            Prefix of keys to check.

        Returns
        -------
        bool
            True if the store is empty, False otherwise.
        """
        return await self._store.is_empty(prefix)

    async def get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: ByteRequest | None = None,
    ) -> Buffer | None:
        """Retrieve the value associated with a given key.

        Parameters
        ----------
        key : str
        byte_range : ByteRequest, optional

            ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved.

            - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned.
            - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header.
            - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

        Returns
        -------
        Buffer
        """

        try:
            result = await self._store.get(key, _byte_request_to_tuple(byte_range))
        except KeyError as _e:
            # Zarr python expects None to be returned if the key does not exist
            # but an IcechunkStore returns an error if the key does not exist
            return None

        return prototype.buffer.from_bytes(result)

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRequest | None]],
    ) -> list[Buffer | None]:
        """Retrieve possibly partial values from given key_ranges.

        Parameters
        ----------
        key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]]
            Ordered set of key, range pairs, a key may occur multiple times with different ranges

        Returns
        -------
        list of values, in the order of the key_ranges, may contain null/none for missing keys
        """
        # NOTE: pyo3 has not implicit conversion from an Iterable to a rust iterable. So we convert it
        # to a list here first. Possible opportunity for optimization.
        ranges = [(k[0], _byte_request_to_tuple(k[1])) for k in key_ranges]
        result = await self._store.get_partial_values(list(ranges))
        return [prototype.buffer.from_bytes(r) for r in result]

    async def exists(self, key: str) -> bool:
        """Check if a key exists in the store.

        Parameters
        ----------
        key : str

        Returns
        -------
        bool
        """
        return await self._store.exists(key)

    @property
    def supports_writes(self) -> bool:
        """Does the store support writes?"""
        return self._store.supports_writes

    async def set(self, key: str, value: Buffer) -> None:
        """Store a (key, value) pair.

        Parameters
        ----------
        key : str
        value : Buffer
        """
        return await self._store.set(key, value.to_bytes())

    async def set_if_not_exists(self, key: str, value: Buffer) -> None:
        """
        Store a key to ``value`` if the key is not already present.

        Parameters
        -----------
        key : str
        value : Buffer
        """
        return await self._store.set_if_not_exists(key, value.to_bytes())

    def set_virtual_ref(
        self,
        key: str,
        location: str,
        *,
        offset: int,
        length: int,
        checksum: str | datetime | None = None,
        validate_container: bool = False,
    ) -> None:
        """Store a virtual reference to a chunk.

        Parameters
        ----------
        key : str
            The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'
        location : str
            The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'
        offset : int
            The offset in bytes from the start of the file location in storage the chunk starts at
        length : int
            The length of the chunk in bytes, measured from the given offset
        checksum : str | datetime | None
            The etag or last_medified_at field of the object
        validate_container: bool
            If set to true, fail for locations that don't match any existing virtual chunk container
        """
        return self._store.set_virtual_ref(
            key, location, offset, length, checksum, validate_container
        )

    async def delete(self, key: str) -> None:
        """Remove a key from the store

        Parameters
        ----------
        key : str
        """
        return await self._store.delete(key)

    async def delete_dir(self, prefix: str) -> None:
        """Delete a prefix

        Parameters
        ----------
        key : str
        """
        return await self._store.delete_dir(prefix)

    @property
    def supports_partial_writes(self) -> bool:
        """Does the store support partial writes?"""
        return self._store.supports_partial_writes

    async def set_partial_values(
        self, key_start_values: Iterable[tuple[str, int, BytesLike]]
    ) -> None:
        """Store values at a given key, starting at byte range_start.

        Parameters
        ----------
        key_start_values : list[tuple[str, int, BytesLike]]
            set of key, range_start, values triples, a key may occur multiple times with different
            range_starts, range_starts (considering the length of the respective values) must not
            specify overlapping ranges for the same key
        """
        # NOTE: pyo3 does not implicit conversion from an Iterable to a rust iterable. So we convert it
        # to a list here first. Possible opportunity for optimization.
        return await self._store.set_partial_values(list(key_start_values))

    @property
    def supports_listing(self) -> bool:
        """Does the store support listing?"""
        return self._store.supports_listing

    @property
    def supports_deletes(self) -> bool:
        return self._store.supports_deletes

    def list(self) -> AsyncIterator[str]:
        """Retrieve all keys in the store.

        Returns
        -------
        AsyncIterator[str, None]
        """
        # This method should be async, like overridden methods in child classes.
        # However, that's not straightforward:
        # https://stackoverflow.com/questions/68905848

        # The zarr spec specefies that that this and other
        # listing methods should not be async, so we need to
        # wrap the async method in a sync method.
        return self._store.list()

    def list_prefix(self, prefix: str) -> AsyncIterator[str]:
        """Retrieve all keys in the store that begin with a given prefix. Keys are returned relative
        to the root of the store.

        Parameters
        ----------
        prefix : str

        Returns
        -------
        AsyncIterator[str, None]
        """
        # The zarr spec specefies that that this and other
        # listing methods should not be async, so we need to
        # wrap the async method in a sync method.
        return self._store.list_prefix(prefix)

    def list_dir(self, prefix: str) -> AsyncIterator[str]:
        """
        Retrieve all keys and prefixes with a given prefix and which do not contain the character
        “/” after the given prefix.

        Parameters
        ----------
        prefix : str

        Returns
        -------
        AsyncIterator[str, None]
        """
        # The zarr spec specefies that that this and other
        # listing methods should not be async, so we need to
        # wrap the async method in a sync method.
        return self._store.list_dir(prefix)

supports_listing: bool property

Does the store support listing?

supports_partial_writes: bool property

Does the store support partial writes?

supports_writes: bool property

Does the store support writes?

__init__(store, *args, **kwargs)

Create a new IcechunkStore.

This should not be called directly, instead use the create, open_existing or open_or_create class methods.

Source code in icechunk/store.py
def __init__(
    self,
    store: PyStore,
    *args: Any,
    **kwargs: Any,
):
    """Create a new IcechunkStore.

    This should not be called directly, instead use the `create`, `open_existing` or `open_or_create` class methods.
    """
    super().__init__(read_only=store.read_only)
    if store is None:
        raise ValueError(
            "An IcechunkStore should not be created with the default constructor, instead use either the create or open_existing class methods."
        )
    self._store = store
    self._is_open = True

clear() async

Clear the store.

This will remove all contents from the current session, including all groups and all arrays. But it will not modify the repository history.

Source code in icechunk/store.py
async def clear(self) -> None:
    """Clear the store.

    This will remove all contents from the current session,
    including all groups and all arrays. But it will not modify the repository history.
    """
    return await self._store.clear()

delete(key) async

Remove a key from the store

Parameters:

Name Type Description Default
key str
required
Source code in icechunk/store.py
async def delete(self, key: str) -> None:
    """Remove a key from the store

    Parameters
    ----------
    key : str
    """
    return await self._store.delete(key)

delete_dir(prefix) async

Delete a prefix

Parameters:

Name Type Description Default
key str
required
Source code in icechunk/store.py
async def delete_dir(self, prefix: str) -> None:
    """Delete a prefix

    Parameters
    ----------
    key : str
    """
    return await self._store.delete_dir(prefix)

exists(key) async

Check if a key exists in the store.

Parameters:

Name Type Description Default
key str
required

Returns:

Type Description
bool
Source code in icechunk/store.py
async def exists(self, key: str) -> bool:
    """Check if a key exists in the store.

    Parameters
    ----------
    key : str

    Returns
    -------
    bool
    """
    return await self._store.exists(key)

get(key, prototype, byte_range=None) async

Retrieve the value associated with a given key.

Parameters:

Name Type Description Default
key str
required
byte_range ByteRequest

ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved.

  • RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned.
  • OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header.
  • SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.
None

Returns:

Type Description
Buffer
Source code in icechunk/store.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None:
    """Retrieve the value associated with a given key.

    Parameters
    ----------
    key : str
    byte_range : ByteRequest, optional

        ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved.

        - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned.
        - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header.
        - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

    Returns
    -------
    Buffer
    """

    try:
        result = await self._store.get(key, _byte_request_to_tuple(byte_range))
    except KeyError as _e:
        # Zarr python expects None to be returned if the key does not exist
        # but an IcechunkStore returns an error if the key does not exist
        return None

    return prototype.buffer.from_bytes(result)

get_partial_values(prototype, key_ranges) async

Retrieve possibly partial values from given key_ranges.

Parameters:

Name Type Description Default
key_ranges Iterable[tuple[str, tuple[int | None, int | None]]]

Ordered set of key, range pairs, a key may occur multiple times with different ranges

required

Returns:

Type Description
list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in icechunk/store.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    """Retrieve possibly partial values from given key_ranges.

    Parameters
    ----------
    key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]]
        Ordered set of key, range pairs, a key may occur multiple times with different ranges

    Returns
    -------
    list of values, in the order of the key_ranges, may contain null/none for missing keys
    """
    # NOTE: pyo3 has not implicit conversion from an Iterable to a rust iterable. So we convert it
    # to a list here first. Possible opportunity for optimization.
    ranges = [(k[0], _byte_request_to_tuple(k[1])) for k in key_ranges]
    result = await self._store.get_partial_values(list(ranges))
    return [prototype.buffer.from_bytes(r) for r in result]

is_empty(prefix) async

Check if the directory is empty.

Parameters:

Name Type Description Default
prefix str

Prefix of keys to check.

required

Returns:

Type Description
bool

True if the store is empty, False otherwise.

Source code in icechunk/store.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    return await self._store.is_empty(prefix)

list()

Retrieve all keys in the store.

Returns:

Type Description
AsyncIterator[str, None]
Source code in icechunk/store.py
def list(self) -> AsyncIterator[str]:
    """Retrieve all keys in the store.

    Returns
    -------
    AsyncIterator[str, None]
    """
    # This method should be async, like overridden methods in child classes.
    # However, that's not straightforward:
    # https://stackoverflow.com/questions/68905848

    # The zarr spec specefies that that this and other
    # listing methods should not be async, so we need to
    # wrap the async method in a sync method.
    return self._store.list()

list_dir(prefix)

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

Name Type Description Default
prefix str
required

Returns:

Type Description
AsyncIterator[str, None]
Source code in icechunk/store.py
def list_dir(self, prefix: str) -> AsyncIterator[str]:
    """
    Retrieve all keys and prefixes with a given prefix and which do not contain the character
    “/” after the given prefix.

    Parameters
    ----------
    prefix : str

    Returns
    -------
    AsyncIterator[str, None]
    """
    # The zarr spec specefies that that this and other
    # listing methods should not be async, so we need to
    # wrap the async method in a sync method.
    return self._store.list_dir(prefix)

list_prefix(prefix)

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

Name Type Description Default
prefix str
required

Returns:

Type Description
AsyncIterator[str, None]
Source code in icechunk/store.py
def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    """Retrieve all keys in the store that begin with a given prefix. Keys are returned relative
    to the root of the store.

    Parameters
    ----------
    prefix : str

    Returns
    -------
    AsyncIterator[str, None]
    """
    # The zarr spec specefies that that this and other
    # listing methods should not be async, so we need to
    # wrap the async method in a sync method.
    return self._store.list_prefix(prefix)

set(key, value) async

Store a (key, value) pair.

Parameters:

Name Type Description Default
key str
required
value Buffer
required
Source code in icechunk/store.py
async def set(self, key: str, value: Buffer) -> None:
    """Store a (key, value) pair.

    Parameters
    ----------
    key : str
    value : Buffer
    """
    return await self._store.set(key, value.to_bytes())

set_if_not_exists(key, value) async

Store a key to value if the key is not already present.

Parameters:

Name Type Description Default
key str
required
value Buffer
required
Source code in icechunk/store.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    """
    Store a key to ``value`` if the key is not already present.

    Parameters
    -----------
    key : str
    value : Buffer
    """
    return await self._store.set_if_not_exists(key, value.to_bytes())

set_partial_values(key_start_values) async

Store values at a given key, starting at byte range_start.

Parameters:

Name Type Description Default
key_start_values list[tuple[str, int, BytesLike]]

set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key

required
Source code in icechunk/store.py
async def set_partial_values(
    self, key_start_values: Iterable[tuple[str, int, BytesLike]]
) -> None:
    """Store values at a given key, starting at byte range_start.

    Parameters
    ----------
    key_start_values : list[tuple[str, int, BytesLike]]
        set of key, range_start, values triples, a key may occur multiple times with different
        range_starts, range_starts (considering the length of the respective values) must not
        specify overlapping ranges for the same key
    """
    # NOTE: pyo3 does not implicit conversion from an Iterable to a rust iterable. So we convert it
    # to a list here first. Possible opportunity for optimization.
    return await self._store.set_partial_values(list(key_start_values))

set_virtual_ref(key, location, *, offset, length, checksum=None, validate_container=False)

Store a virtual reference to a chunk.

Parameters:

Name Type Description Default
key str

The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'

required
location str

The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'

required
offset int

The offset in bytes from the start of the file location in storage the chunk starts at

required
length int

The length of the chunk in bytes, measured from the given offset

required
checksum str | datetime | None

The etag or last_medified_at field of the object

None
validate_container bool

If set to true, fail for locations that don't match any existing virtual chunk container

False
Source code in icechunk/store.py
def set_virtual_ref(
    self,
    key: str,
    location: str,
    *,
    offset: int,
    length: int,
    checksum: str | datetime | None = None,
    validate_container: bool = False,
) -> None:
    """Store a virtual reference to a chunk.

    Parameters
    ----------
    key : str
        The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'
    location : str
        The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'
    offset : int
        The offset in bytes from the start of the file location in storage the chunk starts at
    length : int
        The length of the chunk in bytes, measured from the given offset
    checksum : str | datetime | None
        The etag or last_medified_at field of the object
    validate_container: bool
        If set to true, fail for locations that don't match any existing virtual chunk container
    """
    return self._store.set_virtual_ref(
        key, location, offset, length, checksum, validate_container
    )

sync_clear()

Clear the store.

This will remove all contents from the current session, including all groups and all arrays. But it will not modify the repository history.

Source code in icechunk/store.py
def sync_clear(self) -> None:
    """Clear the store.

    This will remove all contents from the current session,
    including all groups and all arrays. But it will not modify the repository history.
    """
    return self._store.sync_clear()

RebaseFailedData

Data class for rebase failed errors. This describes the error that occurred when rebasing a session

Source code in icechunk/_icechunk_python.pyi
class RebaseFailedData:
    """Data class for rebase failed errors. This describes the error that occurred when rebasing a session"""

    @property
    def snapshot(self) -> str:
        """The snapshot ID that the session was rebased to"""
        ...

    @property
    def conflicts(self) -> list[Conflict]:
        """The conflicts that occurred during the rebase operation"""
        ...

conflicts: list[Conflict] property

The conflicts that occurred during the rebase operation

snapshot: str property

The snapshot ID that the session was rebased to

RebaseFailedError

Bases: Exception

Error raised when a rebase operation fails.

Source code in icechunk/session.py
class RebaseFailedError(Exception):
    """Error raised when a rebase operation fails."""

    _error: RebaseFailedData

    def __init__(self, error: PyRebaseFailedError) -> None:
        self._error = error.args[0]

    def __str__(self) -> str:
        return str(self._error)

    @property
    def snapshot_id(self) -> str:
        """
        The snapshot ID that the rebase operation failed on.
        """
        return self._error.snapshot

    @property
    def conflicts(self) -> list[Conflict]:
        """
        List of conflicts that occurred during the rebase operation.
        """
        return self._error.conflicts

conflicts: list[Conflict] property

List of conflicts that occurred during the rebase operation.

snapshot_id: str property

The snapshot ID that the rebase operation failed on.

Repository

An Icechunk repository.

Source code in icechunk/repository.py
class Repository:
    """An Icechunk repository."""

    _repository: PyRepository

    def __init__(self, repository: PyRepository):
        self._repository = repository

    @classmethod
    def create(
        cls,
        storage: Storage,
        config: RepositoryConfig | None = None,
        virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
    ) -> Self:
        """Create a new Icechunk repository.

        If one already exists at the given store location, an error will be raised.

        Args:
            storage: The storage configuration for the repository.
            config: The repository configuration. If not provided, a default configuration will be used.
        """
        return cls(
            PyRepository.create(
                storage,
                config=config,
                virtual_chunk_credentials=virtual_chunk_credentials,
            )
        )

    @classmethod
    def open(
        cls,
        storage: Storage,
        config: RepositoryConfig | None = None,
        virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
    ) -> Self:
        """Open an existing Icechunk repository.

        If no repository exists at the given storage location, an error will be raised.

        Args:
            storage: The storage configuration for the repository.
            config: The repository settings. If not provided, a default configuration will be
            loaded from the repository
        """
        return cls(
            PyRepository.open(
                storage,
                config=config,
                virtual_chunk_credentials=virtual_chunk_credentials,
            )
        )

    @classmethod
    def open_or_create(
        cls,
        storage: Storage,
        config: RepositoryConfig | None = None,
        virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
    ) -> Self:
        """Open an existing Icechunk repository or create a new one if it does not exist.

        Args:
            storage: The storage configuration for the repository.
            config: The repository settings. If not provided, a default configuration will be
            loaded from the repository
        """
        return cls(
            PyRepository.open_or_create(
                storage,
                config=config,
                virtual_chunk_credentials=virtual_chunk_credentials,
            )
        )

    @staticmethod
    def exists(storage: Storage) -> bool:
        """Check if a repository exists at the given storage location.

        Args:
            storage: The storage configuration for the repository.
        """
        return PyRepository.exists(storage)

    @staticmethod
    def fetch_config(storage: Storage) -> RepositoryConfig | None:
        """Fetch the configuration for the repository saved in storage"""
        return PyRepository.fetch_config(storage)

    def save_config(self) -> None:
        """Save the repository configuration to storage, this configuration will be used in future calls to Repository.open."""
        return self._repository.save_config()

    def ancestry(
        self,
        *,
        branch: str | None = None,
        tag: str | None = None,
        snapshot: str | None = None,
    ) -> list[SnapshotMetadata]:
        """Get the ancestry of a snapshot.

        Args:
            branch: The branch to get the ancestry of.
            tag: The tag to get the ancestry of.
            snapshot: The snapshot ID to get the ancestry of.

        Returns:
            list[SnapshotMetadata]: The ancestry of the snapshot, listing out the snapshots and their metadata

        Only one of the arguments can be specified.
        """
        return self._repository.ancestry(branch=branch, tag=tag, snapshot=snapshot)

    def create_branch(self, branch: str, snapshot_id: str) -> None:
        """Create a new branch at the given snapshot.

        Args:
            branch: The name of the branch to create.
            snapshot_id: The snapshot ID to create the branch at.
        """
        self._repository.create_branch(branch, snapshot_id)

    def list_branches(self) -> set[str]:
        """List the branches in the repository."""
        return self._repository.list_branches()

    def lookup_branch(self, branch: str) -> str:
        """Get the tip snapshot ID of a branch.

        Args:
            branch: The branch to get the tip of.

        Returns:
            str: The snapshot ID of the tip of the branch
        """
        return self._repository.lookup_branch(branch)

    def reset_branch(self, branch: str, snapshot_id: str) -> None:
        """Reset a branch to a specific snapshot.

        This will permanently alter the history of the branch such that the tip of
        the branch is the specified snapshot.

        Args:
            branch: The branch to reset.
            snapshot_id: The snapshot ID to reset the branch to.
        """
        self._repository.reset_branch(branch, snapshot_id)

    def delete_branch(self, branch: str) -> None:
        """Delete a branch.

        Args:
            branch: The branch to delete.
        """
        self._repository.delete_branch(branch)

    def create_tag(self, tag: str, snapshot_id: str) -> None:
        """Create a new tag at the given snapshot.

        Args:
            tag: The name of the tag to create.
            snapshot_id: The snapshot ID to create the tag at.
        """
        self._repository.create_tag(tag, snapshot_id)

    def list_tags(self) -> set[str]:
        """List the tags in the repository."""
        return self._repository.list_tags()

    def lookup_tag(self, tag: str) -> str:
        """Get the snapshot ID of a tag.

        Args:
            tag: The tag to get the snapshot ID of.

        Returns:
            str: The snapshot ID of the tag.
        """
        return self._repository.lookup_tag(tag)

    def readonly_session(
        self,
        *,
        branch: str | None = None,
        tag: str | None = None,
        snapshot: str | None = None,
    ) -> Session:
        """Create a read-only session.

        This can be thought of as a read-only checkout of the repository at a given snapshot.
        When branch or tag are provided, the session will be based on the tip of the branch or
        the snapshot ID of the tag.

        Args:
            branch: If provided, the branch to create the session on.
            tag: If provided, the tag to create the session on.
            snapshot: If provided, the snapshot ID to create the session on.

        Returns:
            Session: The read-only session, pointing to the specified snapshot, tag, or branch.

        Only one of the arguments can be specified.
        """
        return Session(
            self._repository.readonly_session(branch=branch, tag=tag, snapshot=snapshot)
        )

    def writable_session(self, branch: str) -> Session:
        """Create a writable session on a branch

        Like the read-only session, this can be thought of as a checkout of the repository at the
        tip of the branch. However, this session is writable and can be used to make changes to the
        repository. When ready, the changes can be committed to the branch, after which the session will
        become a read-only session on the new snapshot.

        Args:
            branch: The branch to create the session on.

        Returns:
            Session: The writable session on the branch.

        """
        return Session(self._repository.writable_session(branch))

ancestry(*, branch=None, tag=None, snapshot=None)

Get the ancestry of a snapshot.

Args: branch: The branch to get the ancestry of. tag: The tag to get the ancestry of. snapshot: The snapshot ID to get the ancestry of.

Returns: list[SnapshotMetadata]: The ancestry of the snapshot, listing out the snapshots and their metadata

Only one of the arguments can be specified.

Source code in icechunk/repository.py
def ancestry(
    self,
    *,
    branch: str | None = None,
    tag: str | None = None,
    snapshot: str | None = None,
) -> list[SnapshotMetadata]:
    """Get the ancestry of a snapshot.

    Args:
        branch: The branch to get the ancestry of.
        tag: The tag to get the ancestry of.
        snapshot: The snapshot ID to get the ancestry of.

    Returns:
        list[SnapshotMetadata]: The ancestry of the snapshot, listing out the snapshots and their metadata

    Only one of the arguments can be specified.
    """
    return self._repository.ancestry(branch=branch, tag=tag, snapshot=snapshot)

create(storage, config=None, virtual_chunk_credentials=None) classmethod

Create a new Icechunk repository.

If one already exists at the given store location, an error will be raised.

Args: storage: The storage configuration for the repository. config: The repository configuration. If not provided, a default configuration will be used.

Source code in icechunk/repository.py
@classmethod
def create(
    cls,
    storage: Storage,
    config: RepositoryConfig | None = None,
    virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
) -> Self:
    """Create a new Icechunk repository.

    If one already exists at the given store location, an error will be raised.

    Args:
        storage: The storage configuration for the repository.
        config: The repository configuration. If not provided, a default configuration will be used.
    """
    return cls(
        PyRepository.create(
            storage,
            config=config,
            virtual_chunk_credentials=virtual_chunk_credentials,
        )
    )

create_branch(branch, snapshot_id)

Create a new branch at the given snapshot.

Args: branch: The name of the branch to create. snapshot_id: The snapshot ID to create the branch at.

Source code in icechunk/repository.py
def create_branch(self, branch: str, snapshot_id: str) -> None:
    """Create a new branch at the given snapshot.

    Args:
        branch: The name of the branch to create.
        snapshot_id: The snapshot ID to create the branch at.
    """
    self._repository.create_branch(branch, snapshot_id)

create_tag(tag, snapshot_id)

Create a new tag at the given snapshot.

Args: tag: The name of the tag to create. snapshot_id: The snapshot ID to create the tag at.

Source code in icechunk/repository.py
def create_tag(self, tag: str, snapshot_id: str) -> None:
    """Create a new tag at the given snapshot.

    Args:
        tag: The name of the tag to create.
        snapshot_id: The snapshot ID to create the tag at.
    """
    self._repository.create_tag(tag, snapshot_id)

delete_branch(branch)

Delete a branch.

Args: branch: The branch to delete.

Source code in icechunk/repository.py
def delete_branch(self, branch: str) -> None:
    """Delete a branch.

    Args:
        branch: The branch to delete.
    """
    self._repository.delete_branch(branch)

exists(storage) staticmethod

Check if a repository exists at the given storage location.

Args: storage: The storage configuration for the repository.

Source code in icechunk/repository.py
@staticmethod
def exists(storage: Storage) -> bool:
    """Check if a repository exists at the given storage location.

    Args:
        storage: The storage configuration for the repository.
    """
    return PyRepository.exists(storage)

fetch_config(storage) staticmethod

Fetch the configuration for the repository saved in storage

Source code in icechunk/repository.py
@staticmethod
def fetch_config(storage: Storage) -> RepositoryConfig | None:
    """Fetch the configuration for the repository saved in storage"""
    return PyRepository.fetch_config(storage)

list_branches()

List the branches in the repository.

Source code in icechunk/repository.py
def list_branches(self) -> set[str]:
    """List the branches in the repository."""
    return self._repository.list_branches()

list_tags()

List the tags in the repository.

Source code in icechunk/repository.py
def list_tags(self) -> set[str]:
    """List the tags in the repository."""
    return self._repository.list_tags()

lookup_branch(branch)

Get the tip snapshot ID of a branch.

Args: branch: The branch to get the tip of.

Returns: str: The snapshot ID of the tip of the branch

Source code in icechunk/repository.py
def lookup_branch(self, branch: str) -> str:
    """Get the tip snapshot ID of a branch.

    Args:
        branch: The branch to get the tip of.

    Returns:
        str: The snapshot ID of the tip of the branch
    """
    return self._repository.lookup_branch(branch)

lookup_tag(tag)

Get the snapshot ID of a tag.

Args: tag: The tag to get the snapshot ID of.

Returns: str: The snapshot ID of the tag.

Source code in icechunk/repository.py
def lookup_tag(self, tag: str) -> str:
    """Get the snapshot ID of a tag.

    Args:
        tag: The tag to get the snapshot ID of.

    Returns:
        str: The snapshot ID of the tag.
    """
    return self._repository.lookup_tag(tag)

open(storage, config=None, virtual_chunk_credentials=None) classmethod

Open an existing Icechunk repository.

If no repository exists at the given storage location, an error will be raised.

Args: storage: The storage configuration for the repository. config: The repository settings. If not provided, a default configuration will be loaded from the repository

Source code in icechunk/repository.py
@classmethod
def open(
    cls,
    storage: Storage,
    config: RepositoryConfig | None = None,
    virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
) -> Self:
    """Open an existing Icechunk repository.

    If no repository exists at the given storage location, an error will be raised.

    Args:
        storage: The storage configuration for the repository.
        config: The repository settings. If not provided, a default configuration will be
        loaded from the repository
    """
    return cls(
        PyRepository.open(
            storage,
            config=config,
            virtual_chunk_credentials=virtual_chunk_credentials,
        )
    )

open_or_create(storage, config=None, virtual_chunk_credentials=None) classmethod

Open an existing Icechunk repository or create a new one if it does not exist.

Args: storage: The storage configuration for the repository. config: The repository settings. If not provided, a default configuration will be loaded from the repository

Source code in icechunk/repository.py
@classmethod
def open_or_create(
    cls,
    storage: Storage,
    config: RepositoryConfig | None = None,
    virtual_chunk_credentials: dict[str, AnyCredential] | None = None,
) -> Self:
    """Open an existing Icechunk repository or create a new one if it does not exist.

    Args:
        storage: The storage configuration for the repository.
        config: The repository settings. If not provided, a default configuration will be
        loaded from the repository
    """
    return cls(
        PyRepository.open_or_create(
            storage,
            config=config,
            virtual_chunk_credentials=virtual_chunk_credentials,
        )
    )

readonly_session(*, branch=None, tag=None, snapshot=None)

Create a read-only session.

This can be thought of as a read-only checkout of the repository at a given snapshot. When branch or tag are provided, the session will be based on the tip of the branch or the snapshot ID of the tag.

Args: branch: If provided, the branch to create the session on. tag: If provided, the tag to create the session on. snapshot: If provided, the snapshot ID to create the session on.

Returns: Session: The read-only session, pointing to the specified snapshot, tag, or branch.

Only one of the arguments can be specified.

Source code in icechunk/repository.py
def readonly_session(
    self,
    *,
    branch: str | None = None,
    tag: str | None = None,
    snapshot: str | None = None,
) -> Session:
    """Create a read-only session.

    This can be thought of as a read-only checkout of the repository at a given snapshot.
    When branch or tag are provided, the session will be based on the tip of the branch or
    the snapshot ID of the tag.

    Args:
        branch: If provided, the branch to create the session on.
        tag: If provided, the tag to create the session on.
        snapshot: If provided, the snapshot ID to create the session on.

    Returns:
        Session: The read-only session, pointing to the specified snapshot, tag, or branch.

    Only one of the arguments can be specified.
    """
    return Session(
        self._repository.readonly_session(branch=branch, tag=tag, snapshot=snapshot)
    )

reset_branch(branch, snapshot_id)

Reset a branch to a specific snapshot.

This will permanently alter the history of the branch such that the tip of the branch is the specified snapshot.

Args: branch: The branch to reset. snapshot_id: The snapshot ID to reset the branch to.

Source code in icechunk/repository.py
def reset_branch(self, branch: str, snapshot_id: str) -> None:
    """Reset a branch to a specific snapshot.

    This will permanently alter the history of the branch such that the tip of
    the branch is the specified snapshot.

    Args:
        branch: The branch to reset.
        snapshot_id: The snapshot ID to reset the branch to.
    """
    self._repository.reset_branch(branch, snapshot_id)

save_config()

Save the repository configuration to storage, this configuration will be used in future calls to Repository.open.

Source code in icechunk/repository.py
def save_config(self) -> None:
    """Save the repository configuration to storage, this configuration will be used in future calls to Repository.open."""
    return self._repository.save_config()

writable_session(branch)

Create a writable session on a branch

Like the read-only session, this can be thought of as a checkout of the repository at the tip of the branch. However, this session is writable and can be used to make changes to the repository. When ready, the changes can be committed to the branch, after which the session will become a read-only session on the new snapshot.

Args: branch: The branch to create the session on.

Returns: Session: The writable session on the branch.

Source code in icechunk/repository.py
def writable_session(self, branch: str) -> Session:
    """Create a writable session on a branch

    Like the read-only session, this can be thought of as a checkout of the repository at the
    tip of the branch. However, this session is writable and can be used to make changes to the
    repository. When ready, the changes can be committed to the branch, after which the session will
    become a read-only session on the new snapshot.

    Args:
        branch: The branch to create the session on.

    Returns:
        Session: The writable session on the branch.

    """
    return Session(self._repository.writable_session(branch))

RepositoryConfig

Configuration for an Icechunk repository

Source code in icechunk/_icechunk_python.pyi
class RepositoryConfig:
    """Configuration for an Icechunk repository"""

    @staticmethod
    def default() -> RepositoryConfig: ...
    @property
    def inline_chunk_threshold_bytes(self) -> int: ...
    @inline_chunk_threshold_bytes.setter
    def inline_chunk_threshold_bytes(self, value: int) -> None: ...
    @property
    def unsafe_overwrite_refs(self) -> bool: ...
    @unsafe_overwrite_refs.setter
    def unsafe_overwrite_refs(self, value: bool) -> None: ...
    @property
    def get_partial_values_concurrency(self) -> int: ...
    @get_partial_values_concurrency.setter
    def get_partial_values_concurrency(self, value: int) -> None: ...
    @property
    def compression(self) -> CompressionConfig: ...
    @compression.setter
    def compression(self, value: CompressionConfig) -> None: ...
    @property
    def caching(self) -> CachingConfig: ...
    @caching.setter
    def caching(self, value: CachingConfig) -> None: ...
    @property
    def storage(self) -> Storage: ...
    @storage.setter
    def storage(self, value: Storage) -> None: ...
    @property
    def virtual_chunk_containers(self) -> dict[str, VirtualChunkContainer]: ...
    @virtual_chunk_containers.setter
    def virtual_chunk_containers(
        self, value: dict[str, VirtualChunkContainer]
    ) -> None: ...
    def set_virtual_chunk_container(self, cont: VirtualChunkContainer) -> None: ...
    def clear_virtual_chunk_containers(self) -> None: ...

Session

A session object that allows for reading and writing data from an Icechunk repository.

Source code in icechunk/session.py
class Session:
    """A session object that allows for reading and writing data from an Icechunk repository."""

    _session: PySession

    def __init__(self, session: PySession):
        self._session = session
        self._allow_distributed_write = False

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, Session):
            return False
        return self._session == value._session

    def __getstate__(self) -> object:
        state = {
            "_session": self._session.as_bytes(),
        }
        return state

    def __setstate__(self, state: object) -> None:
        if not isinstance(state, dict):
            raise ValueError("Invalid state")
        self._session = PySession.from_bytes(state["_session"])

    @property
    def read_only(self) -> bool:
        """Whether the session is read-only."""
        return self._session.read_only

    @property
    def snapshot_id(self) -> str:
        """The base snapshot ID of the session"""
        return self._session.snapshot_id

    @property
    def branch(self) -> str | None:
        """The branch that the session is based on. This is only set if the
        session is writable"""
        return self._session.branch

    @property
    def has_uncommitted_changes(self) -> bool:
        """Whether the session has uncommitted changes. This is only possibly
        true if the session is writable"""
        return self._session.has_uncommitted_changes

    def discard_changes(self) -> None:
        """When the session is writable, discard any uncommitted changes"""
        self._session.discard_changes()

    @property
    def store(self) -> IcechunkStore:
        """Get a zarr Store object for reading and writing data from the repository using zarr python"""
        return IcechunkStore(self._session.store)

    def all_virtual_chunk_locations(self) -> list[str]:
        """Return the location URLs of all virtual chunks"""
        return self._session.all_virtual_chunk_locations()

    def merge(self, other: Self) -> None:
        """Merge the changes for this session with the changes from another session"""
        self._session.merge(other._session)

    def commit(self, message: str) -> str:
        """Commit the changes in the session to the repository

        When successful, the writable session is completed and the session is now read-only
        and based on the new commit. The snapshot ID of the new commit is returned.

        If the session is out of date, this will raise a ConflictError exception depicting
        the conflict that occurred. The session will need to be rebased before committing.

        Args:
            message (str): The message to write with the commit

        Returns:
            str: The snapshot ID of the new commit
        """
        try:
            return self._session.commit(message)
        except PyConflictError as e:
            raise ConflictError(e) from None

    def rebase(self, solver: ConflictSolver) -> None:
        """Rebase the session to the latest ancestry of the branch.

        This method will iteratively crawl the ancestry of the branch and apply the changes
        from the branch to the session. If a conflict is detected, the conflict solver will
        be used to optionally resolve the conflict. When complete, the session will be based
        on the latest commit of the branch and the session will be ready to attempt another
        commit.

        When a conflict is detected and a resolution is not possible with the proivided
        solver, a RebaseFailed exception will be raised. This exception will contain the
        snapshot ID that the rebase failed on and a list of conflicts that occurred.

        Args:
            solver (ConflictSolver): The conflict solver to use when a conflict is detected

        Raises:
            RebaseFailed: When a conflict is detected and the solver fails to resolve it

        """
        try:
            self._session.rebase(solver)
        except PyRebaseFailedError as e:
            raise RebaseFailedError(e) from None

branch: str | None property

The branch that the session is based on. This is only set if the session is writable

has_uncommitted_changes: bool property

Whether the session has uncommitted changes. This is only possibly true if the session is writable

read_only: bool property

Whether the session is read-only.

snapshot_id: str property

The base snapshot ID of the session

store: IcechunkStore property

Get a zarr Store object for reading and writing data from the repository using zarr python

all_virtual_chunk_locations()

Return the location URLs of all virtual chunks

Source code in icechunk/session.py
def all_virtual_chunk_locations(self) -> list[str]:
    """Return the location URLs of all virtual chunks"""
    return self._session.all_virtual_chunk_locations()

commit(message)

Commit the changes in the session to the repository

When successful, the writable session is completed and the session is now read-only and based on the new commit. The snapshot ID of the new commit is returned.

If the session is out of date, this will raise a ConflictError exception depicting the conflict that occurred. The session will need to be rebased before committing.

Args: message (str): The message to write with the commit

Returns: str: The snapshot ID of the new commit

Source code in icechunk/session.py
def commit(self, message: str) -> str:
    """Commit the changes in the session to the repository

    When successful, the writable session is completed and the session is now read-only
    and based on the new commit. The snapshot ID of the new commit is returned.

    If the session is out of date, this will raise a ConflictError exception depicting
    the conflict that occurred. The session will need to be rebased before committing.

    Args:
        message (str): The message to write with the commit

    Returns:
        str: The snapshot ID of the new commit
    """
    try:
        return self._session.commit(message)
    except PyConflictError as e:
        raise ConflictError(e) from None

discard_changes()

When the session is writable, discard any uncommitted changes

Source code in icechunk/session.py
def discard_changes(self) -> None:
    """When the session is writable, discard any uncommitted changes"""
    self._session.discard_changes()

merge(other)

Merge the changes for this session with the changes from another session

Source code in icechunk/session.py
def merge(self, other: Self) -> None:
    """Merge the changes for this session with the changes from another session"""
    self._session.merge(other._session)

rebase(solver)

Rebase the session to the latest ancestry of the branch.

This method will iteratively crawl the ancestry of the branch and apply the changes from the branch to the session. If a conflict is detected, the conflict solver will be used to optionally resolve the conflict. When complete, the session will be based on the latest commit of the branch and the session will be ready to attempt another commit.

When a conflict is detected and a resolution is not possible with the proivided solver, a RebaseFailed exception will be raised. This exception will contain the snapshot ID that the rebase failed on and a list of conflicts that occurred.

Args: solver (ConflictSolver): The conflict solver to use when a conflict is detected

Raises: RebaseFailed: When a conflict is detected and the solver fails to resolve it

Source code in icechunk/session.py
def rebase(self, solver: ConflictSolver) -> None:
    """Rebase the session to the latest ancestry of the branch.

    This method will iteratively crawl the ancestry of the branch and apply the changes
    from the branch to the session. If a conflict is detected, the conflict solver will
    be used to optionally resolve the conflict. When complete, the session will be based
    on the latest commit of the branch and the session will be ready to attempt another
    commit.

    When a conflict is detected and a resolution is not possible with the proivided
    solver, a RebaseFailed exception will be raised. This exception will contain the
    snapshot ID that the rebase failed on and a list of conflicts that occurred.

    Args:
        solver (ConflictSolver): The conflict solver to use when a conflict is detected

    Raises:
        RebaseFailed: When a conflict is detected and the solver fails to resolve it

    """
    try:
        self._session.rebase(solver)
    except PyRebaseFailedError as e:
        raise RebaseFailedError(e) from None

SnapshotMetadata

Metadata for a snapshot

Source code in icechunk/_icechunk_python.pyi
class SnapshotMetadata:
    """Metadata for a snapshot"""
    @property
    def id(self) -> str:
        """The snapshot ID"""
        ...
    @property
    def written_at(self) -> datetime.datetime:
        """
        The timestamp when the snapshot was written
        """
        ...
    @property
    def message(self) -> str:
        """
        The commit message of the snapshot
        """
        ...

id: str property

The snapshot ID

message: str property

The commit message of the snapshot

written_at: datetime.datetime property

The timestamp when the snapshot was written

Storage

Storage configuration for an IcechunkStore

Currently supports memory, filesystem, and S3 storage backends. Use the class methods to create a StorageConfig object with the desired backend.

Ex:

storage_config = StorageConfig.memory("prefix")
storage_config = StorageConfig.filesystem("/path/to/root")
storage_config = StorageConfig.object_store("s3://bucket/prefix", vec!["my", "options"])
storage_config = StorageConfig.s3_from_env("bucket", "prefix")
storage_config = StorageConfig.s3_from_config("bucket", "prefix", ...)

Source code in icechunk/_icechunk_python.pyi
class Storage:
    """Storage configuration for an IcechunkStore

    Currently supports memory, filesystem, and S3 storage backends.
    Use the class methods to create a StorageConfig object with the desired backend.

    Ex:
    ```
    storage_config = StorageConfig.memory("prefix")
    storage_config = StorageConfig.filesystem("/path/to/root")
    storage_config = StorageConfig.object_store("s3://bucket/prefix", vec!["my", "options"])
    storage_config = StorageConfig.s3_from_env("bucket", "prefix")
    storage_config = StorageConfig.s3_from_config("bucket", "prefix", ...)
    ```
    """

    @classmethod
    def new_s3(
        cls,
        config: S3Options,
        bucket: str,
        prefix: str | None,
        credentials: AnyS3Credential | None = None,
    ) -> Storage: ...
    @classmethod
    def new_in_memory(cls) -> Storage: ...
    @classmethod
    def new_local_filesystem(cls, path: str) -> Storage: ...
    @classmethod
    def new_gcs(
        cls,
        bucket: str,
        prefix: str | None,
        credentials: AnyGcsCredential | None = None,
        *,
        config: dict[str, str] | None = None,
    ) -> Storage: ...
    @classmethod
    def new_azure_blob(
        cls,
        container: str,
        prefix: str,
        credentials: AnyAzureCredential | None = None,
        *,
        config: dict[str, str] | None = None,
    ) -> Storage: ...

StorageConcurrencySettings

Configuration for how Icechunk uses its Storage instance

Source code in icechunk/_icechunk_python.pyi
class StorageConcurrencySettings:
    """Configuration for how Icechunk uses its Storage instance"""

    def __init__(
        self, max_concurrent_requests_for_object: int, ideal_concurrent_request_size: int
    ) -> None: ...
    @property
    def max_concurrent_requests_for_object(self) -> int: ...
    @max_concurrent_requests_for_object.setter
    def max_concurrent_requests_for_object(self, value: int) -> None: ...
    @property
    def ideal_concurrent_request_size(self) -> int: ...
    @ideal_concurrent_request_size.setter
    def ideal_concurrent_request_size(self, value: int) -> None: ...

StorageSettings

Configuration for how Icechunk uses its Storage instance

Source code in icechunk/_icechunk_python.pyi
class StorageSettings:
    """Configuration for how Icechunk uses its Storage instance"""

    def __init__(self, concurrency: StorageConcurrencySettings) -> None: ...
    @property
    def concurrency(self) -> StorageConcurrencySettings: ...
    @concurrency.setter
    def concurrency(self, value: StorageConcurrencySettings) -> None: ...

VersionSelection

Bases: Enum

Enum for selecting the which version of a conflict

Source code in icechunk/_icechunk_python.pyi
class VersionSelection(Enum):
    """Enum for selecting the which version of a conflict"""

    Fail = 0
    UseOurs = 1
    UseTheirs = 2

azure_credentials(*, access_key=None, sas_token=None, bearer_token=None, from_env=None)

Create credentials Azure Blob Storage object store.

If all arguments are None, credentials are fetched from the operative system environment.

Source code in icechunk/credentials.py
def azure_credentials(
    *,
    access_key: str | None = None,
    sas_token: str | None = None,
    bearer_token: str | None = None,
    from_env: bool | None = None,
) -> AnyAzureCredential:
    """Create credentials Azure Blob Storage object store.

    If all arguments are None, credentials are fetched from the operative system environment.
    """
    if (from_env is None or from_env) and (
        access_key is None and sas_token is None and bearer_token is None
    ):
        return azure_from_env_credentials()

    if (access_key is not None or sas_token is not None or bearer_token is not None) and (
        from_env is None or not from_env
    ):
        return AzureCredentials.Static(
            azure_static_credentials(
                access_key=access_key,
                sas_token=sas_token,
                bearer_token=bearer_token,
            )
        )

    raise ValueError("Conflicting arguments to azure_credentials function")

azure_from_env_credentials()

Instruct Azure Blob Storage object store to fetch credentials from the operative system environment.

Source code in icechunk/credentials.py
def azure_from_env_credentials() -> AzureCredentials.FromEnv:
    """Instruct Azure Blob Storage object store to fetch credentials from the operative system environment."""
    return AzureCredentials.FromEnv()

azure_static_credentials(*, access_key=None, sas_token=None, bearer_token=None)

Create static credentials Azure Blob Storage object store.

Source code in icechunk/credentials.py
def azure_static_credentials(
    *,
    access_key: str | None = None,
    sas_token: str | None = None,
    bearer_token: str | None = None,
) -> AnyAzureStaticCredential:
    """Create static credentials Azure Blob Storage object store."""
    if [access_key, sas_token, bearer_token].count(None) != 2:
        raise ValueError("Conflicting arguments to azure_static_credentials function")
    if access_key is not None:
        return AzureStaticCredentials.AccessKey(access_key)
    if sas_token is not None:
        return AzureStaticCredentials.SasToken(sas_token)
    if bearer_token is not None:
        return AzureStaticCredentials.BearerToken(bearer_token)
    raise ValueError(
        "No valid static credential provided for Azure Blob Storage object store"
    )

azure_storage(*, container, prefix, access_key=None, sas_token=None, bearer_token=None, from_env=None, config=None)

Create a Storage instance that saves data in Azure Blob Storage object store.

Parameters:

Name Type Description Default
container str

The container where the repository will store its data

required
prefix str

The prefix within the container that is the root directory of the repository

required
access_key str | None

Azure Blob Storage credential access key

None
sas_token str | None

Azure Blob Storage credential SAS token

None
bearer_token str | None

Azure Blob Storage credential bearer token

None
from_env bool | None

Fetch credentials from the operative system environment

None
Source code in icechunk/storage.py
def azure_storage(
    *,
    container: str,
    prefix: str,
    access_key: str | None = None,
    sas_token: str | None = None,
    bearer_token: str | None = None,
    from_env: bool | None = None,
    config: dict[str, str] | None = None,
) -> Storage:
    """Create a Storage instance that saves data in Azure Blob Storage object store.

    Parameters
    ----------
    container: str
        The container where the repository will store its data
    prefix: str
        The prefix within the container that is the root directory of the repository
    access_key: str | None
        Azure Blob Storage credential access key
    sas_token: str | None
        Azure Blob Storage credential SAS token
    bearer_token: str | None
        Azure Blob Storage credential bearer token
    from_env: bool | None
        Fetch credentials from the operative system environment
    """
    credentials = azure_credentials(
        access_key=access_key,
        sas_token=sas_token,
        bearer_token=bearer_token,
        from_env=from_env,
    )
    return Storage.new_azure_blob(
        container=container,
        prefix=prefix,
        credentials=credentials,
        config=config,
    )

containers_credentials(m={}, **kwargs)

Build a map of credentials for virtual chunk containers.

Example usage:

import icechunk as ic

config = ic.RepositoryConfig.default()
config.inline_chunk_threshold_bytes = 512

virtual_store_config = ic.s3_store(
    region="us-east-1",
    endpoint_url="http://localhost:9000",
    allow_http=True,
    s3_compatible=True,
)
container = ic.VirtualChunkContainer("s3", "s3://", virtual_store_config)
config.set_virtual_chunk_container(container)
credentials = ic.containers_credentials(
    s3=ic.s3_credentials(access_key_id="ACCESS_KEY", secret_access_key="SECRET")
)

repo = ic.Repository.create(
    storage=ic.local_filesystem_storage(store_path),
    config=config,
    virtual_chunk_credentials=credentials,
)

Parameters:

Name Type Description Default
m Mapping[str, AnyS3Credential]

A mapping from container name to credentials.

{}
Source code in icechunk/credentials.py
def containers_credentials(
    m: Mapping[str, AnyS3Credential] = {}, **kwargs: AnyS3Credential
) -> dict[str, Credentials.S3]:
    """Build a map of credentials for virtual chunk containers.

    Example usage:
    ```
    import icechunk as ic

    config = ic.RepositoryConfig.default()
    config.inline_chunk_threshold_bytes = 512

    virtual_store_config = ic.s3_store(
        region="us-east-1",
        endpoint_url="http://localhost:9000",
        allow_http=True,
        s3_compatible=True,
    )
    container = ic.VirtualChunkContainer("s3", "s3://", virtual_store_config)
    config.set_virtual_chunk_container(container)
    credentials = ic.containers_credentials(
        s3=ic.s3_credentials(access_key_id="ACCESS_KEY", secret_access_key="SECRET")
    )

    repo = ic.Repository.create(
        storage=ic.local_filesystem_storage(store_path),
        config=config,
        virtual_chunk_credentials=credentials,
    )
    ```

    Parameters
    ----------
    m: Mapping[str, AnyS3Credential]
        A mapping from container name to credentials.
    """
    res = {}
    for name, cred in {**m, **kwargs}.items():
        if isinstance(cred, AnyS3Credential):
            res[name] = Credentials.S3(cred)
        else:
            raise ValueError(f"Unknown credential type {type(cred)}")
    return res

gcs_credentials(*, service_account_file=None, service_account_key=None, application_credentials=None, from_env=None)

Create credentials Google Cloud Storage object store.

If all arguments are None, credentials are fetched from the operative system environment.

Source code in icechunk/credentials.py
def gcs_credentials(
    *,
    service_account_file: str | None = None,
    service_account_key: str | None = None,
    application_credentials: str | None = None,
    from_env: bool | None = None,
) -> AnyGcsCredential:
    """Create credentials Google Cloud Storage object store.

    If all arguments are None, credentials are fetched from the operative system environment.
    """
    if (from_env is None or from_env) and (
        service_account_file is None
        and service_account_key is None
        and application_credentials is None
    ):
        return gcs_from_env_credentials()

    if (
        service_account_file is not None
        or service_account_key is not None
        or application_credentials is not None
    ) and (from_env is None or not from_env):
        return GcsCredentials.Static(
            gcs_static_credentials(
                service_account_file=service_account_file,
                service_account_key=service_account_key,
                application_credentials=application_credentials,
            )
        )

    raise ValueError("Conflicting arguments to gcs_credentials function")

gcs_from_env_credentials()

Instruct Google Cloud Storage object store to fetch credentials from the operative system environment.

Source code in icechunk/credentials.py
def gcs_from_env_credentials() -> GcsCredentials.FromEnv:
    """Instruct Google Cloud Storage object store to fetch credentials from the operative system environment."""
    return GcsCredentials.FromEnv()

gcs_static_credentials(*, service_account_file=None, service_account_key=None, application_credentials=None)

Create static credentials Google Cloud Storage object store.

Source code in icechunk/credentials.py
def gcs_static_credentials(
    *,
    service_account_file: str | None = None,
    service_account_key: str | None = None,
    application_credentials: str | None = None,
) -> AnyGcsStaticCredential:
    """Create static credentials Google Cloud Storage object store."""
    if service_account_file is not None:
        return GcsStaticCredentials.ServiceAccount(service_account_file)
    if service_account_key is not None:
        return GcsStaticCredentials.ServiceAccountKey(service_account_key)
    if application_credentials is not None:
        return GcsStaticCredentials.ApplicationCredentials(application_credentials)
    raise ValueError("Conflicting arguments to gcs_static_credentials function")

gcs_storage(*, bucket, prefix, service_account_file=None, service_account_key=None, application_credentials=None, from_env=None, config=None)

Create a Storage instance that saves data in Google Cloud Storage object store.

Parameters:

Name Type Description Default
bucket str

The bucket where the repository will store its data

required
prefix str | None

The prefix within the bucket that is the root directory of the repository

required
from_env bool | None

Fetch credentials from the operative system environment

None
Source code in icechunk/storage.py
def gcs_storage(
    *,
    bucket: str,
    prefix: str | None,
    service_account_file: str | None = None,
    service_account_key: str | None = None,
    application_credentials: str | None = None,
    from_env: bool | None = None,
    config: dict[str, str] | None = None,
) -> Storage:
    """Create a Storage instance that saves data in Google Cloud Storage object store.

    Parameters
    ----------
    bucket: str
        The bucket where the repository will store its data
    prefix: str | None
        The prefix within the bucket that is the root directory of the repository
    from_env: bool | None
        Fetch credentials from the operative system environment
    """
    credentials = gcs_credentials(
        service_account_file=service_account_file,
        service_account_key=service_account_key,
        application_credentials=application_credentials,
        from_env=from_env,
    )
    return Storage.new_gcs(
        bucket=bucket,
        prefix=prefix,
        credentials=credentials,
        config=config,
    )

in_memory_storage()

Create a Storage instance that saves data in memory.

This Storage implementation is used for tests. Data will be lost after the process finishes, and can only be accesses through the Storage instance returned. Different instances don't share data.

Source code in icechunk/storage.py
def in_memory_storage() -> Storage:
    """Create a Storage instance that saves data in memory.

    This Storage implementation is used for tests. Data will be lost after the process finishes, and can only be accesses through the Storage instance returned. Different instances don't share data."""
    return Storage.new_in_memory()

local_filesystem_storage(path)

Create a Storage instance that saves data in the local file system.

This Storage instance is not recommended for production data

Source code in icechunk/storage.py
def local_filesystem_storage(path: str) -> Storage:
    """Create a Storage instance that saves data in the local file system.

    This Storage instance is not recommended for production data
    """
    return Storage.new_local_filesystem(path)

s3_anonymous_credentials()

Create no-signature credentials for S3 and S3 compatible object stores.

Source code in icechunk/credentials.py
def s3_anonymous_credentials() -> S3Credentials.Anonymous:
    """Create no-signature credentials for S3 and S3 compatible object stores."""
    return S3Credentials.Anonymous()

s3_credentials(*, access_key_id=None, secret_access_key=None, session_token=None, expires_after=None, anonymous=None, from_env=None, get_credentials=None)

Create credentials for S3 and S3 compatible object stores.

If all arguments are None, credentials are fetched from the environment.

Parameters:

Name Type Description Default
access_key_id str | None

S3 credential access key

None
secret_access_key str | None

S3 credential secret access key

None
session_token str | None

Optional S3 credential session token

None
expires_after datetime | None

Optional expiration for the object store credentials

None
anonymous bool | None

If set to True requests to the object store will not be signed

None
from_env bool | None

Fetch credentials from the operative system environment

None
get_credentials Callable[[], S3StaticCredentials] | None

Use this function to get and refresh object store credentials

None
Source code in icechunk/credentials.py
def s3_credentials(
    *,
    access_key_id: str | None = None,
    secret_access_key: str | None = None,
    session_token: str | None = None,
    expires_after: datetime | None = None,
    anonymous: bool | None = None,
    from_env: bool | None = None,
    get_credentials: Callable[[], S3StaticCredentials] | None = None,
) -> AnyS3Credential:
    """Create credentials for S3 and S3 compatible object stores.

    If all arguments are None, credentials are fetched from the environment.

    Parameters
    ----------
    access_key_id: str | None
        S3 credential access key
    secret_access_key: str | None
        S3 credential secret access key
    session_token: str | None
        Optional S3 credential session token
    expires_after: datetime | None
        Optional expiration for the object store credentials
    anonymous: bool | None
        If set to True requests to the object store will not be signed
    from_env: bool | None
        Fetch credentials from the operative system environment
    get_credentials: Callable[[], S3StaticCredentials] | None
        Use this function to get and refresh object store credentials
    """
    if (
        (from_env is None or from_env)
        and access_key_id is None
        and secret_access_key is None
        and session_token is None
        and expires_after is None
        and not anonymous
        and get_credentials is None
    ):
        return s3_from_env_credentials()

    if (
        anonymous
        and access_key_id is None
        and secret_access_key is None
        and session_token is None
        and expires_after is None
        and not from_env
        and get_credentials is None
    ):
        return s3_anonymous_credentials()

    if (
        get_credentials is not None
        and access_key_id is None
        and secret_access_key is None
        and session_token is None
        and expires_after is None
        and not from_env
        and not anonymous
    ):
        return s3_refreshable_credentials(get_credentials)

    if (
        access_key_id
        and secret_access_key
        and not from_env
        and not anonymous
        and get_credentials is None
    ):
        return s3_static_credentials(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            expires_after=expires_after,
        )

    raise ValueError("Conflicting arguments to s3_credentials function")

s3_from_env_credentials()

Instruct S3 and S3 compatible object stores to gather credentials from the operative system environment.

Source code in icechunk/credentials.py
def s3_from_env_credentials() -> S3Credentials.FromEnv:
    """Instruct S3 and S3 compatible object stores to gather credentials from the operative system environment."""
    return S3Credentials.FromEnv()

s3_refreshable_credentials(get_credentials)

Create refreshable credentials for S3 and S3 compatible object stores.

Parameters:

Name Type Description Default
get_credentials Callable[[], S3StaticCredentials]

Use this function to get and refresh the credentials. The function must be pickable.

required
Source code in icechunk/credentials.py
def s3_refreshable_credentials(
    get_credentials: Callable[[], S3StaticCredentials],
) -> S3Credentials.Refreshable:
    """Create refreshable credentials for S3 and S3 compatible object stores.


    Parameters
    ----------
    get_credentials: Callable[[], S3StaticCredentials]
        Use this function to get and refresh the credentials. The function must be pickable.
    """
    return S3Credentials.Refreshable(pickle.dumps(get_credentials))

s3_static_credentials(*, access_key_id, secret_access_key, session_token=None, expires_after=None)

Create static credentials for S3 and S3 compatible object stores.

Parameters:

Name Type Description Default
access_key_id str

S3 credential access key

required
secret_access_key str

S3 credential secret access key

required
session_token str | None

Optional S3 credential session token

None
expires_after datetime | None

Optional expiration for the object store credentials

None
Source code in icechunk/credentials.py
def s3_static_credentials(
    *,
    access_key_id: str,
    secret_access_key: str,
    session_token: str | None = None,
    expires_after: datetime | None = None,
) -> S3Credentials.Static:
    """Create static credentials for S3 and S3 compatible object stores.

    Parameters
    ----------
    access_key_id: str | None
        S3 credential access key
    secret_access_key: str | None
        S3 credential secret access key
    session_token: str | None
        Optional S3 credential session token
    expires_after: datetime | None
        Optional expiration for the object store credentials
    """
    return S3Credentials.Static(
        S3StaticCredentials(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            expires_after=expires_after,
        )
    )

s3_storage(*, bucket, prefix, region=None, endpoint_url=None, allow_http=False, access_key_id=None, secret_access_key=None, session_token=None, expires_after=None, anonymous=None, from_env=None, get_credentials=None)

Create a Storage instance that saves data in S3 or S3 compatible object stores.

Parameters:

Name Type Description Default
bucket str

The bucket where the repository will store its data

required
prefix str | None

The prefix within the bucket that is the root directory of the repository

required
region str | None

The region to use in the object store, if None a default region will be used

None
endpoint_url str | None

Optional endpoint where the object store serves data, example: http://localhost:9000

None
allow_http bool

If the object store can be accessed using http protocol instead of https

False
access_key_id str | None

S3 credential access key

None
secret_access_key str | None

S3 credential secret access key

None
session_token str | None

Optional S3 credential session token

None
expires_after datetime | None

Optional expiration for the object store credentials

None
anonymous bool | None

If set to True requests to the object store will not be signed

None
from_env bool | None

Fetch credentials from the operative system environment

None
get_credentials Callable[[], S3StaticCredentials] | None

Use this function to get and refresh object store credentials

None
Source code in icechunk/storage.py
def s3_storage(
    *,
    bucket: str,
    prefix: str | None,
    region: str | None = None,
    endpoint_url: str | None = None,
    allow_http: bool = False,
    access_key_id: str | None = None,
    secret_access_key: str | None = None,
    session_token: str | None = None,
    expires_after: datetime | None = None,
    anonymous: bool | None = None,
    from_env: bool | None = None,
    get_credentials: Callable[[], S3StaticCredentials] | None = None,
) -> Storage:
    """Create a Storage instance that saves data in S3 or S3 compatible object stores.

    Parameters
    ----------
    bucket: str
        The bucket where the repository will store its data
    prefix: str | None
        The prefix within the bucket that is the root directory of the repository
    region: str | None
        The region to use in the object store, if `None` a default region will be used
    endpoint_url: str | None
        Optional endpoint where the object store serves data, example: http://localhost:9000
    allow_http: bool
        If the object store can be accessed using http protocol instead of https
    access_key_id: str | None
        S3 credential access key
    secret_access_key: str | None
        S3 credential secret access key
    session_token: str | None
        Optional S3 credential session token
    expires_after: datetime | None
        Optional expiration for the object store credentials
    anonymous: bool | None
        If set to True requests to the object store will not be signed
    from_env: bool | None
        Fetch credentials from the operative system environment
    get_credentials: Callable[[], S3StaticCredentials] | None
        Use this function to get and refresh object store credentials
    """
    credentials = s3_credentials(
        access_key_id=access_key_id,
        secret_access_key=secret_access_key,
        session_token=session_token,
        expires_after=expires_after,
        anonymous=anonymous,
        from_env=from_env,
        get_credentials=get_credentials,
    )
    options = S3Options(region=region, endpoint_url=endpoint_url, allow_http=allow_http)
    return Storage.new_s3(
        config=options,
        bucket=bucket,
        prefix=prefix,
        credentials=credentials,
    )

s3_store(region=None, endpoint_url=None, allow_http=False, anonymous=False, s3_compatible=False)

Build an ObjectStoreConfig instance for S3 or S3 compatible object stores.

Source code in icechunk/storage.py
def s3_store(
    region: str | None = None,
    endpoint_url: str | None = None,
    allow_http: bool = False,
    anonymous: bool = False,
    s3_compatible: bool = False,
) -> ObjectStoreConfig.S3Compatible | ObjectStoreConfig.S3:
    """Build an ObjectStoreConfig instance for S3 or S3 compatible object stores."""
    options = S3Options(region=region, endpoint_url=endpoint_url, allow_http=allow_http)
    return (
        ObjectStoreConfig.S3Compatible(options)
        if s3_compatible
        else ObjectStoreConfig.S3(options)
    )

XarrayDatasetWriter dataclass

Write Xarray Datasets to a group in an Icechunk store.

This class is private API. Please do not use it.

Source code in icechunk/xarray.py
@dataclass
class XarrayDatasetWriter:
    """
    Write Xarray Datasets to a group in an Icechunk store.

    This class is private API. Please do not use it.
    """

    dataset: Dataset = field(repr=False)
    store: IcechunkStore = field(kw_only=True)

    safe_chunks: bool = field(kw_only=True, default=True)
    # TODO: uncomment when Zarr has support
    # write_empty_chunks: bool = field(kw_only=True, default=True)

    _initialized: bool = field(default=False, repr=False)

    xarray_store: ZarrStore = field(init=False, repr=False)
    writer: LazyArrayWriter = field(init=False, repr=False)

    def __post_init__(self) -> None:
        if not isinstance(self.store, IcechunkStore):
            raise ValueError(
                f"Please pass in an icechunk.Session. Received {type(self.store)!r} instead."
            )

    def _open_group(
        self,
        *,
        group: str | None,
        mode: ZarrWriteModes | None,
        append_dim: Hashable | None,
        region: Region,
    ) -> None:
        concrete_mode: ZarrWriteModes = _choose_default_mode(
            mode=mode, append_dim=append_dim, region=region
        )

        self.xarray_store = ZarrStore.open_group(
            store=self.store,
            group=group,
            mode=concrete_mode,
            zarr_format=3,
            append_dim=append_dim,
            write_region=region,
            safe_chunks=self.safe_chunks,
            # TODO: uncomment when Zarr has support
            # write_empty=self.write_empty_chunks,
            synchronizer=None,
            consolidated=False,
            consolidate_on_close=False,
            zarr_version=None,
        )

    def write_metadata(self, encoding: Mapping[Any, Any] | None = None) -> None:
        """
        This method creates new Zarr arrays when necessary, writes attributes,
        and any in-memory arrays.
        """
        from xarray.backends.api import _validate_dataset_names, dump_to_store

        # validate Dataset keys, DataArray names
        _validate_dataset_names(self.dataset)

        if encoding is None:
            encoding = {}
        self.xarray_store._validate_encoding(encoding)

        # This writes the metadata (zarr.json) for all arrays
        # This also will resize arrays for any appends
        self.writer = LazyArrayWriter()
        dump_to_store(self.dataset, self.xarray_store, self.writer, encoding=encoding)  # type: ignore[no-untyped-call]

        self._initialized = True

    def write_eager(self) -> None:
        """
        Write in-memory variables to store.

        Returns
        -------
        None
        """
        if not self._initialized:
            raise ValueError("Please call `write_metadata` first.")
        self.writer.write_eager()

    def write_lazy(
        self,
        chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
        split_every: int | None = None,
    ) -> None:
        """
        Write lazy arrays (e.g. dask) to store.
        """
        if not self._initialized:
            raise ValueError("Please call `write_metadata` first.")

        if not self.writer.sources:
            return

        chunkmanager_store_kwargs = chunkmanager_store_kwargs or {}
        chunkmanager_store_kwargs["load_stored"] = False
        chunkmanager_store_kwargs["return_stored"] = True

        # This calls dask.array.store, and we receive a dask array where each chunk is a Zarr array
        # each of those zarr.Array.store contains the changesets we need
        stored_arrays = self.writer.sync(
            compute=False, chunkmanager_store_kwargs=chunkmanager_store_kwargs
        )  # type: ignore[no-untyped-call]

        # Now we tree-reduce all changesets
        merged_session = stateful_store_reduce(
            stored_arrays,
            prefix="ice-changeset",
            chunk=extract_session,
            aggregate=merge_sessions,
            split_every=split_every,
            compute=True,
            **chunkmanager_store_kwargs,
        )
        self.store.session.merge(merged_session)

write_eager()

Write in-memory variables to store.

Returns:

Type Description
None
Source code in icechunk/xarray.py
def write_eager(self) -> None:
    """
    Write in-memory variables to store.

    Returns
    -------
    None
    """
    if not self._initialized:
        raise ValueError("Please call `write_metadata` first.")
    self.writer.write_eager()

write_lazy(chunkmanager_store_kwargs=None, split_every=None)

Write lazy arrays (e.g. dask) to store.

Source code in icechunk/xarray.py
def write_lazy(
    self,
    chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
    split_every: int | None = None,
) -> None:
    """
    Write lazy arrays (e.g. dask) to store.
    """
    if not self._initialized:
        raise ValueError("Please call `write_metadata` first.")

    if not self.writer.sources:
        return

    chunkmanager_store_kwargs = chunkmanager_store_kwargs or {}
    chunkmanager_store_kwargs["load_stored"] = False
    chunkmanager_store_kwargs["return_stored"] = True

    # This calls dask.array.store, and we receive a dask array where each chunk is a Zarr array
    # each of those zarr.Array.store contains the changesets we need
    stored_arrays = self.writer.sync(
        compute=False, chunkmanager_store_kwargs=chunkmanager_store_kwargs
    )  # type: ignore[no-untyped-call]

    # Now we tree-reduce all changesets
    merged_session = stateful_store_reduce(
        stored_arrays,
        prefix="ice-changeset",
        chunk=extract_session,
        aggregate=merge_sessions,
        split_every=split_every,
        compute=True,
        **chunkmanager_store_kwargs,
    )
    self.store.session.merge(merged_session)

write_metadata(encoding=None)

This method creates new Zarr arrays when necessary, writes attributes, and any in-memory arrays.

Source code in icechunk/xarray.py
def write_metadata(self, encoding: Mapping[Any, Any] | None = None) -> None:
    """
    This method creates new Zarr arrays when necessary, writes attributes,
    and any in-memory arrays.
    """
    from xarray.backends.api import _validate_dataset_names, dump_to_store

    # validate Dataset keys, DataArray names
    _validate_dataset_names(self.dataset)

    if encoding is None:
        encoding = {}
    self.xarray_store._validate_encoding(encoding)

    # This writes the metadata (zarr.json) for all arrays
    # This also will resize arrays for any appends
    self.writer = LazyArrayWriter()
    dump_to_store(self.dataset, self.xarray_store, self.writer, encoding=encoding)  # type: ignore[no-untyped-call]

    self._initialized = True

to_icechunk(dataset, store, *, group=None, mode=None, safe_chunks=True, append_dim=None, region=None, encoding=None, chunkmanager_store_kwargs=None, split_every=None, **kwargs)

Write an Xarray Dataset to a group of an icechunk store.

Parameters:

Name Type Description Default
store (MutableMapping, str or path - like)

Store or path to directory in local or remote file system.

required
mode "w", "w-", "a", "a-", r+", None

Persistence mode: "w" means create (overwrite if exists); "w-" means create (fail if exists); "a" means override all existing variables including dimension coordinates (create if does not exist); "a-" means only append those variables that have append_dim. "r+" means modify existing array values only (raise an error if any metadata or shapes would change). The default mode is "a" if append_dim is set. Otherwise, it is "r+" if region is set and w- otherwise.

"w"
group str

Group path. (a.k.a. path in zarr terminology.)

None
encoding dict

Nested dictionary with variable names as keys and dictionaries of variable specific encodings as values, e.g., {"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}

None
append_dim hashable

If set, the dimension along which the data will be appended. All other dimensions on overridden variables must remain the same size.

None
region dict or auto

Optional mapping from dimension names to either a) "auto", or b) integer slices, indicating the region of existing zarr array(s) in which to write this dataset's data.

If "auto" is provided the existing store will be opened and the region inferred by matching indexes. "auto" can be used as a single string, which will automatically infer the region for all dimensions, or as dictionary values for specific dimensions mixed together with explicit slices for other dimensions.

Alternatively integer slices can be provided; for example, {'x': slice(0, 1000), 'y': slice(10000, 11000)} would indicate that values should be written to the region 0:1000 along x and 10000:11000 along y.

Users are expected to ensure that the specified region aligns with Zarr chunk boundaries, and that dask chunks are also aligned. Xarray makes limited checks that these multiple chunk boundaries line up. It is possible to write incomplete chunks and corrupt the data with this option if you are not careful.

None
safe_chunks bool

If True, only allow writes to when there is a many-to-one relationship between Zarr chunks (specified in encoding) and Dask chunks. Set False to override this restriction; however, data may become corrupted if Zarr arrays are written in parallel. In addition to the many-to-one relationship validation, it also detects partial chunks writes when using the region parameter, these partial chunks are considered unsafe in the mode "r+" but safe in the mode "a". Note: Even with these validations it can still be unsafe to write two or more chunked arrays in the same location in parallel if they are not writing in independent regions.

True
chunkmanager_store_kwargs dict

Additional keyword arguments passed on to the ChunkManager.store method used to store chunked arrays. For example for a dask array additional kwargs will be passed eventually to dask.array.store(). Experimental API that should not be relied upon.

None
split_every int | None

Number of tasks to merge at every level of the tree reduction.

None

Returns:

Type Description
None
Notes

Two restrictions apply to the use of region:

  • If region is set, all variables in a dataset must have at least one dimension in common with the region. Other variables should be written in a separate single call to to_zarr().
  • Dimensions cannot be included in both region and append_dim at the same time. To create empty arrays to fill in with region, use the XarrayDatasetWriter directly.
Source code in icechunk/xarray.py
def to_icechunk(
    dataset: Dataset,
    store: IcechunkStore,
    *,
    group: str | None = None,
    mode: ZarrWriteModes | None = None,
    # TODO: uncomment when Zarr has support
    # write_empty_chunks: bool | None = None,
    safe_chunks: bool = True,
    append_dim: Hashable | None = None,
    region: Region = None,
    encoding: Mapping[Any, Any] | None = None,
    chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
    split_every: int | None = None,
    **kwargs: Any,
) -> None:
    """
    Write an Xarray Dataset to a group of an icechunk store.

    Parameters
    ----------
    store : MutableMapping, str or path-like, optional
        Store or path to directory in local or remote file system.
    mode : {"w", "w-", "a", "a-", r+", None}, optional
        Persistence mode: "w" means create (overwrite if exists);
        "w-" means create (fail if exists);
        "a" means override all existing variables including dimension coordinates (create if does not exist);
        "a-" means only append those variables that have ``append_dim``.
        "r+" means modify existing array *values* only (raise an error if
        any metadata or shapes would change).
        The default mode is "a" if ``append_dim`` is set. Otherwise, it is
        "r+" if ``region`` is set and ``w-`` otherwise.
    group : str, optional
        Group path. (a.k.a. `path` in zarr terminology.)
    encoding : dict, optional
        Nested dictionary with variable names as keys and dictionaries of
        variable specific encodings as values, e.g.,
        ``{"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}``
    append_dim : hashable, optional
        If set, the dimension along which the data will be appended. All
        other dimensions on overridden variables must remain the same size.
    region : dict or "auto", optional
        Optional mapping from dimension names to either a) ``"auto"``, or b) integer
        slices, indicating the region of existing zarr array(s) in which to write
        this dataset's data.

        If ``"auto"`` is provided the existing store will be opened and the region
        inferred by matching indexes. ``"auto"`` can be used as a single string,
        which will automatically infer the region for all dimensions, or as
        dictionary values for specific dimensions mixed together with explicit
        slices for other dimensions.

        Alternatively integer slices can be provided; for example, ``{'x': slice(0,
        1000), 'y': slice(10000, 11000)}`` would indicate that values should be
        written to the region ``0:1000`` along ``x`` and ``10000:11000`` along
        ``y``.

        Users are expected to ensure that the specified region aligns with
        Zarr chunk boundaries, and that dask chunks are also aligned.
        Xarray makes limited checks that these multiple chunk boundaries line up.
        It is possible to write incomplete chunks and corrupt the data with this
        option if you are not careful.
    safe_chunks : bool, default: True
        If True, only allow writes to when there is a many-to-one relationship
        between Zarr chunks (specified in encoding) and Dask chunks.
        Set False to override this restriction; however, data may become corrupted
        if Zarr arrays are written in parallel.
        In addition to the many-to-one relationship validation, it also detects partial
        chunks writes when using the region parameter,
        these partial chunks are considered unsafe in the mode "r+" but safe in
        the mode "a".
        Note: Even with these validations it can still be unsafe to write
        two or more chunked arrays in the same location in parallel if they are
        not writing in independent regions.
    chunkmanager_store_kwargs : dict, optional
        Additional keyword arguments passed on to the `ChunkManager.store` method used to store
        chunked arrays. For example for a dask array additional kwargs will be passed eventually to
        `dask.array.store()`. Experimental API that should not be relied upon.
    split_every: int, optional
        Number of tasks to merge at every level of the tree reduction.

    Returns
    -------
    None

    Notes
    -----
    Two restrictions apply to the use of ``region``:

      - If ``region`` is set, _all_ variables in a dataset must have at
        least one dimension in common with the region. Other variables
        should be written in a separate single call to ``to_zarr()``.
      - Dimensions cannot be included in both ``region`` and
        ``append_dim`` at the same time. To create empty arrays to fill
        in with ``region``, use the `XarrayDatasetWriter` directly.
    """
    writer = XarrayDatasetWriter(dataset, store=store)

    writer._open_group(group=group, mode=mode, append_dim=append_dim, region=region)

    # write metadata
    writer.write_metadata(encoding)
    # write in-memory arrays
    writer.write_eager()
    # eagerly write dask arrays
    writer.write_lazy(chunkmanager_store_kwargs=chunkmanager_store_kwargs)