Skip to content

API Reference

Home / icechunk-python / reference

IcechunkStore

Bases: Store, SyncMixin

Source code in icechunk/__init__.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
class IcechunkStore(Store, SyncMixin):
    _store: PyIcechunkStore
    _pickle_preserves_read_only: bool

    @classmethod
    async def open(cls, *args: Any, **kwargs: Any) -> Self:
        """This method is called by zarr-python, it's not intended for users.

        Use one of `IcechunkStore.open_existing`, `IcechunkStore.create` or `IcechunkStore.open_or_create` instead.
        """
        return cls.open_or_create(*args, **kwargs)

    @classmethod
    def open_or_create(cls, *args: Any, **kwargs: Any) -> Self:
        if "read_only" in kwargs:
            read_only = kwargs.pop("read_only")
        else:
            read_only = False

        if "storage" in kwargs:
            storage = kwargs.pop("storage")
        else:
            raise ValueError(
                "Storage configuration is required. Pass a Storage object to construct an IcechunkStore"
            )

        store = None
        if read_only:
            store = cls.open_existing(storage, read_only, *args, **kwargs)
        else:
            if pyicechunk_store_exists(storage):
                store = cls.open_existing(storage, read_only, *args, **kwargs)
            else:
                store = cls.create(storage, read_only, *args, **kwargs)

        assert store
        # We dont want to call _open() because icechunk handles the opening, etc.
        # if we have gotten this far we can mark it as open
        store._is_open = True

        return store

    def __init__(
        self,
        store: PyIcechunkStore,
        read_only: bool = False,
        *args: Any,
        **kwargs: Any,
    ):
        """Create a new IcechunkStore.

        This should not be called directly, instead use the `create`, `open_existing` or `open_or_create` class methods.
        """
        super().__init__(read_only=read_only)
        if store is None:
            raise ValueError(
                "An IcechunkStore should not be created with the default constructor, instead use either the create or open_existing class methods."
            )
        self._store = store
        self._pickle_preserves_read_only = False

    @classmethod
    def open_existing(
        cls,
        storage: StorageConfig,
        read_only: bool = False,
        config: StoreConfig | None = None,
        *args: Any,
        **kwargs: Any,
    ) -> Self:
        """Open an existing IcechunkStore from the given storage.

        If there is not store at the given location, an error will be raised.

        It is recommended to use the cached storage option for better performance. If cached=True,
        this will be configured automatically with the provided storage_config as the underlying
        storage backend.
        """
        config = config or StoreConfig()
        # We have delayed checking if the repository exists, to avoid the delay in the happy case
        # So we need to check now if open fails, to provide a nice error message
        try:
            store = pyicechunk_store_open_existing(
                storage, read_only=read_only, config=config
            )
        # TODO: we should have an exception type to catch here, for the case of non-existing repo
        except Exception as e:
            if pyicechunk_store_exists(storage):
                # if the repo exists, this is an actual error we need to raise
                raise e
            else:
                # if the repo doesn't exists, we want to point users to that issue instead
                raise ValueError(
                    "No Icechunk repository at the provided location, try opening in create mode or changing the location"
                ) from None
        return cls(store=store, read_only=read_only, args=args, kwargs=kwargs)

    @classmethod
    def create(
        cls,
        storage: StorageConfig,
        read_only: bool = False,
        config: StoreConfig | None = None,
        *args: Any,
        **kwargs: Any,
    ) -> Self:
        """Create a new IcechunkStore with the given storage configuration.

        If a store already exists at the given location, an error will be raised.
        """
        config = config or StoreConfig()
        store = pyicechunk_store_create(storage, config=config)
        return cls(store=store, read_only=read_only, args=args, kwargs=kwargs)

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, self.__class__):
            return False
        return self._store == value._store

    def __getstate__(self) -> object:
        # we serialize the Rust store as bytes
        d = self.__dict__.copy()
        d["_store"] = self._store.as_bytes()
        if not self._pickle_preserves_read_only:
            d["_read_only"] = True
        return d

    def __setstate__(self, state: Any) -> None:
        # we have to deserialize the bytes of the Rust store
        read_only = state["_read_only"]
        store_repr = state["_store"]
        state["_store"] = pyicechunk_store_from_bytes(store_repr, read_only)
        self.__dict__ = state

    @contextlib.contextmanager
    def preserve_read_only(self) -> Generator[None, None, None]:
        """
        Context manager to allow unpickling this store preserving `read_only` status.
        By default, stores are set to read-only after unpickling.
        """
        try:
            self._pickle_preserves_read_only = True
            yield
        finally:
            self._pickle_preserves_read_only = False

    def as_read_only(self) -> Self:
        """Return a read-only version of this store."""
        new_store = self._store.with_read_only(read_only=True)
        return self.__class__(store=new_store, read_only=True)

    def as_writeable(self) -> Self:
        """Return a writeable version of this store."""
        new_store = self._store.with_read_only(read_only=False)
        return self.__class__(store=new_store, read_only=False)

    def set_read_only(self) -> None:
        """Set the store to read-only mode."""
        self._store.set_read_only(read_only=True)
        self._read_only = True

    def set_writeable(self) -> None:
        """Set the store to writeable mode."""
        self._store.set_read_only(read_only=False)
        self._read_only = False

    @property
    def snapshot_id(self) -> str:
        """Return the current snapshot id."""
        return self._store.snapshot_id

    def change_set_bytes(self) -> bytes:
        """Get the complete list of changes applied in this session, serialized to bytes.

        This method is useful in combination with `IcechunkStore.distributed_commit`. When a
        write session is too large to execute in a single machine, it could be useful to
        distribute it across multiple workers. Each worker can write their changes independently
        (map) and then a single commit is executed by a coordinator (reduce).

        This methods provides a way to send back to gather a "description" of the
        changes applied by a worker. Resulting bytes, together with the `change_set_bytes` of
        other workers, can be fed to `distributed_commit`.

        This API is subject to change, it will be replaced by a merge operation at the Store level.
        """
        return self._store.change_set_bytes()

    @property
    def branch(self) -> str | None:
        """Return the current branch name."""
        return self._store.branch

    def checkout(
        self,
        snapshot_id: str | None = None,
        branch: str | None = None,
        tag: str | None = None,
    ) -> None:
        """Checkout a branch, tag, or specific snapshot.

        If a branch is checked out, any following `commit` attempts will update that branch
        reference if successful. If a tag or snapshot_id are checked out, the repository
        won't allow commits.
        """
        if snapshot_id is not None:
            if branch is not None or tag is not None:
                raise ValueError(
                    "only one of snapshot_id, branch, or tag may be specified"
                )
            self._store.checkout_snapshot(snapshot_id)
            self._read_only = True
            return
        if branch is not None:
            if tag is not None:
                raise ValueError(
                    "only one of snapshot_id, branch, or tag may be specified"
                )
            self._store.checkout_branch(branch)
            self._read_only = True
            return
        if tag is not None:
            self._store.checkout_tag(tag)
            self._read_only = True
            return

        raise ValueError("a snapshot_id, branch, or tag must be specified")

    async def async_checkout(
        self,
        snapshot_id: str | None = None,
        branch: str | None = None,
        tag: str | None = None,
    ) -> None:
        """Checkout a branch, tag, or specific snapshot.

        If a branch is checked out, any following `commit` attempts will update that branch
        reference if successful. If a tag or snapshot_id are checked out, the repository
        won't allow commits.
        """
        if snapshot_id is not None:
            if branch is not None or tag is not None:
                raise ValueError(
                    "only one of snapshot_id, branch, or tag may be specified"
                )
            await self._store.async_checkout_snapshot(snapshot_id)
            self._read_only = True
            return
        if branch is not None:
            if tag is not None:
                raise ValueError(
                    "only one of snapshot_id, branch, or tag may be specified"
                )
            await self._store.async_checkout_branch(branch)
            self._read_only = True
            return
        if tag is not None:
            await self._store.async_checkout_tag(tag)
            self._read_only = True
            return

        raise ValueError("a snapshot_id, branch, or tag must be specified")

    def commit(self, message: str) -> str:
        """Commit any uncommitted changes to the store.

        This will create a new snapshot on the current branch and return
        the new snapshot id.

        This method will fail if:

        * there is no currently checked out branch
        * some other writer updated the current branch since the repository was checked out
        """
        return self._store.commit(message)

    async def async_commit(self, message: str) -> str:
        """Commit any uncommitted changes to the store.

        This will create a new snapshot on the current branch and return
        the new snapshot id.

        This method will fail if:

        * there is no currently checked out branch
        * some other writer updated the current branch since the repository was checked out
        """
        return await self._store.async_commit(message)

    def merge(self, changes: bytes) -> None:
        """Merge the changes from another store into this store.

        This will create a new snapshot on the current branch and return
        the new snapshot id.

        This method will fail if:

        * there is no currently checked out branch
        * some other writer updated the current branch since the repository was checked out

        The behavior is undefined if the stores applied conflicting changes.
        """
        return self._store.merge(changes)

    async def async_merge(self, changes: bytes) -> None:
        """Merge the changes from another store into this store.

        This will create a new snapshot on the current branch and return
        the new snapshot id.

        This method will fail if:

        * there is no currently checked out branch
        * some other writer updated the current branch since the repository was checked out

        The behavior is undefined if the stores applied conflicting changes.
        """
        return await self._store.async_merge(changes)

    @property
    def has_uncommitted_changes(self) -> bool:
        """Return True if there are uncommitted changes to the store"""
        return self._store.has_uncommitted_changes

    async def async_reset(self) -> bytes:
        """Pop any uncommitted changes and reset to the previous snapshot state.

        Returns
        -------
        bytes : The changes that were taken from the working set
        """
        return await self._store.async_reset()

    def reset(self) -> bytes:
        """Pop any uncommitted changes and reset to the previous snapshot state.

        Returns
        -------
        bytes : The changes that were taken from the working set
        """
        return self._store.reset()

    async def async_new_branch(self, branch_name: str) -> str:
        """Create a new branch pointing to the current checked out snapshot.

        This requires having no uncommitted changes.
        """
        return await self._store.async_new_branch(branch_name)

    def new_branch(self, branch_name: str) -> str:
        """Create a new branch pointing to the current checked out snapshot.

        This requires having no uncommitted changes.
        """
        return self._store.new_branch(branch_name)

    async def async_reset_branch(self, to_snapshot: str) -> None:
        """Reset the currently checked out branch to point to a different snapshot.

        This requires having no uncommitted changes.

        The snapshot id can be obtained as the result of a commit operation, but, more probably,
        as the id of one of the SnapshotMetadata objects returned by `ancestry()`

        This operation edits the repository history; it must be executed carefully.
        In particular, the current snapshot may end up being inaccessible from any
        other branches or tags.
        """
        return await self._store.async_reset_branch(to_snapshot)

    def reset_branch(self, to_snapshot: str) -> None:
        """Reset the currently checked out branch to point to a different snapshot.

        This requires having no uncommitted changes.

        The snapshot id can be obtained as the result of a commit operation, but, more probably,
        as the id of one of the SnapshotMetadata objects returned by `ancestry()`

        This operation edits the repository history, it must be executed carefully.
        In particular, the current snapshot may end up being inaccessible from any
        other branches or tags.
        """
        return self._store.reset_branch(to_snapshot)

    def tag(self, tag_name: str, snapshot_id: str) -> None:
        """Create a tag pointing to the current checked out snapshot."""
        return self._store.tag(tag_name, snapshot_id=snapshot_id)

    async def async_tag(self, tag_name: str, snapshot_id: str) -> None:
        """Create a tag pointing to the current checked out snapshot."""
        return await self._store.async_tag(tag_name, snapshot_id=snapshot_id)

    def ancestry(self) -> list[SnapshotMetadata]:
        """Get the list of parents of the current version."""
        return self._store.ancestry()

    def async_ancestry(self) -> AsyncGenerator[SnapshotMetadata, None]:
        """Get the list of parents of the current version.

        Returns
        -------
        AsyncGenerator[SnapshotMetadata, None]
        """
        return self._store.async_ancestry()

    async def clear(self) -> None:
        """Clear the store.

        This will remove all contents from the current session,
        including all groups and all arrays. But it will not modify the repository history.
        """
        return await self._store.clear()

    def sync_clear(self) -> None:
        """Clear the store.

        This will remove all contents from the current session,
        including all groups and all arrays. But it will not modify the repository history.
        """
        return self._store.sync_clear()

    async def is_empty(self, prefix: str) -> bool:
        """
        Check if the directory is empty.

        Parameters
        ----------
        prefix : str
            Prefix of keys to check.

        Returns
        -------
        bool
            True if the store is empty, False otherwise.
        """
        return await self._store.is_empty(prefix)

    async def get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: tuple[int | None, int | None] | None = None,
    ) -> Buffer | None:
        """Retrieve the value associated with a given key.

        Parameters
        ----------
        key : str
        byte_range : tuple[int, Optional[int]], optional

        Returns
        -------
        Buffer
        """

        try:
            result = await self._store.get(key, byte_range)
        except KeyError as _e:
            # Zarr python expects None to be returned if the key does not exist
            # but an IcechunkStore returns an error if the key does not exist
            return None

        return prototype.buffer.from_bytes(result)

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRangeRequest]],
    ) -> list[Buffer | None]:
        """Retrieve possibly partial values from given key_ranges.

        Parameters
        ----------
        key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]]
            Ordered set of key, range pairs, a key may occur multiple times with different ranges

        Returns
        -------
        list of values, in the order of the key_ranges, may contain null/none for missing keys
        """
        # NOTE: pyo3 has not implicit conversion from an Iterable to a rust iterable. So we convert it
        # to a list here first. Possible opportunity for optimization.
        result = await self._store.get_partial_values(list(key_ranges))
        return [prototype.buffer.from_bytes(r) for r in result]

    async def exists(self, key: str) -> bool:
        """Check if a key exists in the store.

        Parameters
        ----------
        key : str

        Returns
        -------
        bool
        """
        return await self._store.exists(key)

    @property
    def supports_writes(self) -> bool:
        """Does the store support writes?"""
        return self._store.supports_writes

    async def set(self, key: str, value: Buffer) -> None:
        """Store a (key, value) pair.

        Parameters
        ----------
        key : str
        value : Buffer
        """
        return await self._store.set(key, value.to_bytes())

    async def set_if_not_exists(self, key: str, value: Buffer) -> None:
        """
        Store a key to ``value`` if the key is not already present.

        Parameters
        -----------
        key : str
        value : Buffer
        """
        return await self._store.set_if_not_exists(key, value.to_bytes())

    async def async_set_virtual_ref(
        self, key: str, location: str, *, offset: int, length: int
    ) -> None:
        """Store a virtual reference to a chunk.

        Parameters
        ----------
        key : str
            The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'
        location : str
            The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'
        offset : int
            The offset in bytes from the start of the file location in storage the chunk starts at
        length : int
            The length of the chunk in bytes, measured from the given offset
        """
        return await self._store.async_set_virtual_ref(key, location, offset, length)

    def set_virtual_ref(
        self, key: str, location: str, *, offset: int, length: int
    ) -> None:
        """Store a virtual reference to a chunk.

        Parameters
        ----------
        key : str
            The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'
        location : str
            The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'
        offset : int
            The offset in bytes from the start of the file location in storage the chunk starts at
        length : int
            The length of the chunk in bytes, measured from the given offset
        """
        return self._store.set_virtual_ref(key, location, offset, length)

    async def delete(self, key: str) -> None:
        """Remove a key from the store

        Parameters
        ----------
        key : strz
        """
        return await self._store.delete(key)

    @property
    def supports_partial_writes(self) -> bool:
        """Does the store support partial writes?"""
        return self._store.supports_partial_writes

    async def set_partial_values(
        self, key_start_values: Iterable[tuple[str, int, BytesLike]]
    ) -> None:
        """Store values at a given key, starting at byte range_start.

        Parameters
        ----------
        key_start_values : list[tuple[str, int, BytesLike]]
            set of key, range_start, values triples, a key may occur multiple times with different
            range_starts, range_starts (considering the length of the respective values) must not
            specify overlapping ranges for the same key
        """
        # NOTE: pyo3 does not implicit conversion from an Iterable to a rust iterable. So we convert it
        # to a list here first. Possible opportunity for optimization.
        return await self._store.set_partial_values(list(key_start_values))

    @property
    def supports_listing(self) -> bool:
        """Does the store support listing?"""
        return self._store.supports_listing

    @property
    def supports_deletes(self) -> bool:
        return self._store.supports_deletes

    def list(self) -> AsyncIterator[str]:
        """Retrieve all keys in the store.

        Returns
        -------
        AsyncIterator[str, None]
        """
        # This method should be async, like overridden methods in child classes.
        # However, that's not straightforward:
        # https://stackoverflow.com/questions/68905848

        # The zarr spec specefies that that this and other
        # listing methods should not be async, so we need to
        # wrap the async method in a sync method.
        return self._store.list()

    def list_prefix(self, prefix: str) -> AsyncIterator[str]:
        """Retrieve all keys in the store that begin with a given prefix. Keys are returned relative
        to the root of the store.

        Parameters
        ----------
        prefix : str

        Returns
        -------
        AsyncIterator[str, None]
        """
        # The zarr spec specefies that that this and other
        # listing methods should not be async, so we need to
        # wrap the async method in a sync method.
        return self._store.list_prefix(prefix)

    def list_dir(self, prefix: str) -> AsyncIterator[str]:
        """
        Retrieve all keys and prefixes with a given prefix and which do not contain the character
        “/” after the given prefix.

        Parameters
        ----------
        prefix : str

        Returns
        -------
        AsyncIterator[str, None]
        """
        # The zarr spec specefies that that this and other
        # listing methods should not be async, so we need to
        # wrap the async method in a sync method.
        return self._store.list_dir(prefix)

branch: str | None property

Return the current branch name.

has_uncommitted_changes: bool property

Return True if there are uncommitted changes to the store

snapshot_id: str property

Return the current snapshot id.

supports_listing: bool property

Does the store support listing?

supports_partial_writes: bool property

Does the store support partial writes?

supports_writes: bool property

Does the store support writes?

__init__(store, read_only=False, *args, **kwargs)

Create a new IcechunkStore.

This should not be called directly, instead use the create, open_existing or open_or_create class methods.

Source code in icechunk/__init__.py
def __init__(
    self,
    store: PyIcechunkStore,
    read_only: bool = False,
    *args: Any,
    **kwargs: Any,
):
    """Create a new IcechunkStore.

    This should not be called directly, instead use the `create`, `open_existing` or `open_or_create` class methods.
    """
    super().__init__(read_only=read_only)
    if store is None:
        raise ValueError(
            "An IcechunkStore should not be created with the default constructor, instead use either the create or open_existing class methods."
        )
    self._store = store
    self._pickle_preserves_read_only = False

ancestry()

Get the list of parents of the current version.

Source code in icechunk/__init__.py
def ancestry(self) -> list[SnapshotMetadata]:
    """Get the list of parents of the current version."""
    return self._store.ancestry()

as_read_only()

Return a read-only version of this store.

Source code in icechunk/__init__.py
def as_read_only(self) -> Self:
    """Return a read-only version of this store."""
    new_store = self._store.with_read_only(read_only=True)
    return self.__class__(store=new_store, read_only=True)

as_writeable()

Return a writeable version of this store.

Source code in icechunk/__init__.py
def as_writeable(self) -> Self:
    """Return a writeable version of this store."""
    new_store = self._store.with_read_only(read_only=False)
    return self.__class__(store=new_store, read_only=False)

async_ancestry()

Get the list of parents of the current version.

Returns:

Type Description
AsyncGenerator[SnapshotMetadata, None]
Source code in icechunk/__init__.py
def async_ancestry(self) -> AsyncGenerator[SnapshotMetadata, None]:
    """Get the list of parents of the current version.

    Returns
    -------
    AsyncGenerator[SnapshotMetadata, None]
    """
    return self._store.async_ancestry()

async_checkout(snapshot_id=None, branch=None, tag=None) async

Checkout a branch, tag, or specific snapshot.

If a branch is checked out, any following commit attempts will update that branch reference if successful. If a tag or snapshot_id are checked out, the repository won't allow commits.

Source code in icechunk/__init__.py
async def async_checkout(
    self,
    snapshot_id: str | None = None,
    branch: str | None = None,
    tag: str | None = None,
) -> None:
    """Checkout a branch, tag, or specific snapshot.

    If a branch is checked out, any following `commit` attempts will update that branch
    reference if successful. If a tag or snapshot_id are checked out, the repository
    won't allow commits.
    """
    if snapshot_id is not None:
        if branch is not None or tag is not None:
            raise ValueError(
                "only one of snapshot_id, branch, or tag may be specified"
            )
        await self._store.async_checkout_snapshot(snapshot_id)
        self._read_only = True
        return
    if branch is not None:
        if tag is not None:
            raise ValueError(
                "only one of snapshot_id, branch, or tag may be specified"
            )
        await self._store.async_checkout_branch(branch)
        self._read_only = True
        return
    if tag is not None:
        await self._store.async_checkout_tag(tag)
        self._read_only = True
        return

    raise ValueError("a snapshot_id, branch, or tag must be specified")

async_commit(message) async

Commit any uncommitted changes to the store.

This will create a new snapshot on the current branch and return the new snapshot id.

This method will fail if:

  • there is no currently checked out branch
  • some other writer updated the current branch since the repository was checked out
Source code in icechunk/__init__.py
async def async_commit(self, message: str) -> str:
    """Commit any uncommitted changes to the store.

    This will create a new snapshot on the current branch and return
    the new snapshot id.

    This method will fail if:

    * there is no currently checked out branch
    * some other writer updated the current branch since the repository was checked out
    """
    return await self._store.async_commit(message)

async_merge(changes) async

Merge the changes from another store into this store.

This will create a new snapshot on the current branch and return the new snapshot id.

This method will fail if:

  • there is no currently checked out branch
  • some other writer updated the current branch since the repository was checked out

The behavior is undefined if the stores applied conflicting changes.

Source code in icechunk/__init__.py
async def async_merge(self, changes: bytes) -> None:
    """Merge the changes from another store into this store.

    This will create a new snapshot on the current branch and return
    the new snapshot id.

    This method will fail if:

    * there is no currently checked out branch
    * some other writer updated the current branch since the repository was checked out

    The behavior is undefined if the stores applied conflicting changes.
    """
    return await self._store.async_merge(changes)

async_new_branch(branch_name) async

Create a new branch pointing to the current checked out snapshot.

This requires having no uncommitted changes.

Source code in icechunk/__init__.py
async def async_new_branch(self, branch_name: str) -> str:
    """Create a new branch pointing to the current checked out snapshot.

    This requires having no uncommitted changes.
    """
    return await self._store.async_new_branch(branch_name)

async_reset() async

Pop any uncommitted changes and reset to the previous snapshot state.

Returns:

Name Type Description
bytes The changes that were taken from the working set
Source code in icechunk/__init__.py
async def async_reset(self) -> bytes:
    """Pop any uncommitted changes and reset to the previous snapshot state.

    Returns
    -------
    bytes : The changes that were taken from the working set
    """
    return await self._store.async_reset()

async_reset_branch(to_snapshot) async

Reset the currently checked out branch to point to a different snapshot.

This requires having no uncommitted changes.

The snapshot id can be obtained as the result of a commit operation, but, more probably, as the id of one of the SnapshotMetadata objects returned by ancestry()

This operation edits the repository history; it must be executed carefully. In particular, the current snapshot may end up being inaccessible from any other branches or tags.

Source code in icechunk/__init__.py
async def async_reset_branch(self, to_snapshot: str) -> None:
    """Reset the currently checked out branch to point to a different snapshot.

    This requires having no uncommitted changes.

    The snapshot id can be obtained as the result of a commit operation, but, more probably,
    as the id of one of the SnapshotMetadata objects returned by `ancestry()`

    This operation edits the repository history; it must be executed carefully.
    In particular, the current snapshot may end up being inaccessible from any
    other branches or tags.
    """
    return await self._store.async_reset_branch(to_snapshot)

async_set_virtual_ref(key, location, *, offset, length) async

Store a virtual reference to a chunk.

Parameters:

Name Type Description Default
key str

The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'

required
location str

The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'

required
offset int

The offset in bytes from the start of the file location in storage the chunk starts at

required
length int

The length of the chunk in bytes, measured from the given offset

required
Source code in icechunk/__init__.py
async def async_set_virtual_ref(
    self, key: str, location: str, *, offset: int, length: int
) -> None:
    """Store a virtual reference to a chunk.

    Parameters
    ----------
    key : str
        The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'
    location : str
        The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'
    offset : int
        The offset in bytes from the start of the file location in storage the chunk starts at
    length : int
        The length of the chunk in bytes, measured from the given offset
    """
    return await self._store.async_set_virtual_ref(key, location, offset, length)

async_tag(tag_name, snapshot_id) async

Create a tag pointing to the current checked out snapshot.

Source code in icechunk/__init__.py
async def async_tag(self, tag_name: str, snapshot_id: str) -> None:
    """Create a tag pointing to the current checked out snapshot."""
    return await self._store.async_tag(tag_name, snapshot_id=snapshot_id)

change_set_bytes()

Get the complete list of changes applied in this session, serialized to bytes.

This method is useful in combination with IcechunkStore.distributed_commit. When a write session is too large to execute in a single machine, it could be useful to distribute it across multiple workers. Each worker can write their changes independently (map) and then a single commit is executed by a coordinator (reduce).

This methods provides a way to send back to gather a "description" of the changes applied by a worker. Resulting bytes, together with the change_set_bytes of other workers, can be fed to distributed_commit.

This API is subject to change, it will be replaced by a merge operation at the Store level.

Source code in icechunk/__init__.py
def change_set_bytes(self) -> bytes:
    """Get the complete list of changes applied in this session, serialized to bytes.

    This method is useful in combination with `IcechunkStore.distributed_commit`. When a
    write session is too large to execute in a single machine, it could be useful to
    distribute it across multiple workers. Each worker can write their changes independently
    (map) and then a single commit is executed by a coordinator (reduce).

    This methods provides a way to send back to gather a "description" of the
    changes applied by a worker. Resulting bytes, together with the `change_set_bytes` of
    other workers, can be fed to `distributed_commit`.

    This API is subject to change, it will be replaced by a merge operation at the Store level.
    """
    return self._store.change_set_bytes()

checkout(snapshot_id=None, branch=None, tag=None)

Checkout a branch, tag, or specific snapshot.

If a branch is checked out, any following commit attempts will update that branch reference if successful. If a tag or snapshot_id are checked out, the repository won't allow commits.

Source code in icechunk/__init__.py
def checkout(
    self,
    snapshot_id: str | None = None,
    branch: str | None = None,
    tag: str | None = None,
) -> None:
    """Checkout a branch, tag, or specific snapshot.

    If a branch is checked out, any following `commit` attempts will update that branch
    reference if successful. If a tag or snapshot_id are checked out, the repository
    won't allow commits.
    """
    if snapshot_id is not None:
        if branch is not None or tag is not None:
            raise ValueError(
                "only one of snapshot_id, branch, or tag may be specified"
            )
        self._store.checkout_snapshot(snapshot_id)
        self._read_only = True
        return
    if branch is not None:
        if tag is not None:
            raise ValueError(
                "only one of snapshot_id, branch, or tag may be specified"
            )
        self._store.checkout_branch(branch)
        self._read_only = True
        return
    if tag is not None:
        self._store.checkout_tag(tag)
        self._read_only = True
        return

    raise ValueError("a snapshot_id, branch, or tag must be specified")

clear() async

Clear the store.

This will remove all contents from the current session, including all groups and all arrays. But it will not modify the repository history.

Source code in icechunk/__init__.py
async def clear(self) -> None:
    """Clear the store.

    This will remove all contents from the current session,
    including all groups and all arrays. But it will not modify the repository history.
    """
    return await self._store.clear()

commit(message)

Commit any uncommitted changes to the store.

This will create a new snapshot on the current branch and return the new snapshot id.

This method will fail if:

  • there is no currently checked out branch
  • some other writer updated the current branch since the repository was checked out
Source code in icechunk/__init__.py
def commit(self, message: str) -> str:
    """Commit any uncommitted changes to the store.

    This will create a new snapshot on the current branch and return
    the new snapshot id.

    This method will fail if:

    * there is no currently checked out branch
    * some other writer updated the current branch since the repository was checked out
    """
    return self._store.commit(message)

create(storage, read_only=False, config=None, *args, **kwargs) classmethod

Create a new IcechunkStore with the given storage configuration.

If a store already exists at the given location, an error will be raised.

Source code in icechunk/__init__.py
@classmethod
def create(
    cls,
    storage: StorageConfig,
    read_only: bool = False,
    config: StoreConfig | None = None,
    *args: Any,
    **kwargs: Any,
) -> Self:
    """Create a new IcechunkStore with the given storage configuration.

    If a store already exists at the given location, an error will be raised.
    """
    config = config or StoreConfig()
    store = pyicechunk_store_create(storage, config=config)
    return cls(store=store, read_only=read_only, args=args, kwargs=kwargs)

delete(key) async

Remove a key from the store

Parameters:

Name Type Description Default
key strz
required
Source code in icechunk/__init__.py
async def delete(self, key: str) -> None:
    """Remove a key from the store

    Parameters
    ----------
    key : strz
    """
    return await self._store.delete(key)

exists(key) async

Check if a key exists in the store.

Parameters:

Name Type Description Default
key str
required

Returns:

Type Description
bool
Source code in icechunk/__init__.py
async def exists(self, key: str) -> bool:
    """Check if a key exists in the store.

    Parameters
    ----------
    key : str

    Returns
    -------
    bool
    """
    return await self._store.exists(key)

get(key, prototype, byte_range=None) async

Retrieve the value associated with a given key.

Parameters:

Name Type Description Default
key str
required
byte_range tuple[int, Optional[int]]
None

Returns:

Type Description
Buffer
Source code in icechunk/__init__.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype,
    byte_range: tuple[int | None, int | None] | None = None,
) -> Buffer | None:
    """Retrieve the value associated with a given key.

    Parameters
    ----------
    key : str
    byte_range : tuple[int, Optional[int]], optional

    Returns
    -------
    Buffer
    """

    try:
        result = await self._store.get(key, byte_range)
    except KeyError as _e:
        # Zarr python expects None to be returned if the key does not exist
        # but an IcechunkStore returns an error if the key does not exist
        return None

    return prototype.buffer.from_bytes(result)

get_partial_values(prototype, key_ranges) async

Retrieve possibly partial values from given key_ranges.

Parameters:

Name Type Description Default
key_ranges Iterable[tuple[str, tuple[int | None, int | None]]]

Ordered set of key, range pairs, a key may occur multiple times with different ranges

required

Returns:

Type Description
list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in icechunk/__init__.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRangeRequest]],
) -> list[Buffer | None]:
    """Retrieve possibly partial values from given key_ranges.

    Parameters
    ----------
    key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]]
        Ordered set of key, range pairs, a key may occur multiple times with different ranges

    Returns
    -------
    list of values, in the order of the key_ranges, may contain null/none for missing keys
    """
    # NOTE: pyo3 has not implicit conversion from an Iterable to a rust iterable. So we convert it
    # to a list here first. Possible opportunity for optimization.
    result = await self._store.get_partial_values(list(key_ranges))
    return [prototype.buffer.from_bytes(r) for r in result]

is_empty(prefix) async

Check if the directory is empty.

Parameters:

Name Type Description Default
prefix str

Prefix of keys to check.

required

Returns:

Type Description
bool

True if the store is empty, False otherwise.

Source code in icechunk/__init__.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    return await self._store.is_empty(prefix)

list()

Retrieve all keys in the store.

Returns:

Type Description
AsyncIterator[str, None]
Source code in icechunk/__init__.py
def list(self) -> AsyncIterator[str]:
    """Retrieve all keys in the store.

    Returns
    -------
    AsyncIterator[str, None]
    """
    # This method should be async, like overridden methods in child classes.
    # However, that's not straightforward:
    # https://stackoverflow.com/questions/68905848

    # The zarr spec specefies that that this and other
    # listing methods should not be async, so we need to
    # wrap the async method in a sync method.
    return self._store.list()

list_dir(prefix)

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

Name Type Description Default
prefix str
required

Returns:

Type Description
AsyncIterator[str, None]
Source code in icechunk/__init__.py
def list_dir(self, prefix: str) -> AsyncIterator[str]:
    """
    Retrieve all keys and prefixes with a given prefix and which do not contain the character
    “/” after the given prefix.

    Parameters
    ----------
    prefix : str

    Returns
    -------
    AsyncIterator[str, None]
    """
    # The zarr spec specefies that that this and other
    # listing methods should not be async, so we need to
    # wrap the async method in a sync method.
    return self._store.list_dir(prefix)

list_prefix(prefix)

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

Name Type Description Default
prefix str
required

Returns:

Type Description
AsyncIterator[str, None]
Source code in icechunk/__init__.py
def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    """Retrieve all keys in the store that begin with a given prefix. Keys are returned relative
    to the root of the store.

    Parameters
    ----------
    prefix : str

    Returns
    -------
    AsyncIterator[str, None]
    """
    # The zarr spec specefies that that this and other
    # listing methods should not be async, so we need to
    # wrap the async method in a sync method.
    return self._store.list_prefix(prefix)

merge(changes)

Merge the changes from another store into this store.

This will create a new snapshot on the current branch and return the new snapshot id.

This method will fail if:

  • there is no currently checked out branch
  • some other writer updated the current branch since the repository was checked out

The behavior is undefined if the stores applied conflicting changes.

Source code in icechunk/__init__.py
def merge(self, changes: bytes) -> None:
    """Merge the changes from another store into this store.

    This will create a new snapshot on the current branch and return
    the new snapshot id.

    This method will fail if:

    * there is no currently checked out branch
    * some other writer updated the current branch since the repository was checked out

    The behavior is undefined if the stores applied conflicting changes.
    """
    return self._store.merge(changes)

new_branch(branch_name)

Create a new branch pointing to the current checked out snapshot.

This requires having no uncommitted changes.

Source code in icechunk/__init__.py
def new_branch(self, branch_name: str) -> str:
    """Create a new branch pointing to the current checked out snapshot.

    This requires having no uncommitted changes.
    """
    return self._store.new_branch(branch_name)

open(*args, **kwargs) async classmethod

This method is called by zarr-python, it's not intended for users.

Use one of IcechunkStore.open_existing, IcechunkStore.create or IcechunkStore.open_or_create instead.

Source code in icechunk/__init__.py
@classmethod
async def open(cls, *args: Any, **kwargs: Any) -> Self:
    """This method is called by zarr-python, it's not intended for users.

    Use one of `IcechunkStore.open_existing`, `IcechunkStore.create` or `IcechunkStore.open_or_create` instead.
    """
    return cls.open_or_create(*args, **kwargs)

open_existing(storage, read_only=False, config=None, *args, **kwargs) classmethod

Open an existing IcechunkStore from the given storage.

If there is not store at the given location, an error will be raised.

It is recommended to use the cached storage option for better performance. If cached=True, this will be configured automatically with the provided storage_config as the underlying storage backend.

Source code in icechunk/__init__.py
@classmethod
def open_existing(
    cls,
    storage: StorageConfig,
    read_only: bool = False,
    config: StoreConfig | None = None,
    *args: Any,
    **kwargs: Any,
) -> Self:
    """Open an existing IcechunkStore from the given storage.

    If there is not store at the given location, an error will be raised.

    It is recommended to use the cached storage option for better performance. If cached=True,
    this will be configured automatically with the provided storage_config as the underlying
    storage backend.
    """
    config = config or StoreConfig()
    # We have delayed checking if the repository exists, to avoid the delay in the happy case
    # So we need to check now if open fails, to provide a nice error message
    try:
        store = pyicechunk_store_open_existing(
            storage, read_only=read_only, config=config
        )
    # TODO: we should have an exception type to catch here, for the case of non-existing repo
    except Exception as e:
        if pyicechunk_store_exists(storage):
            # if the repo exists, this is an actual error we need to raise
            raise e
        else:
            # if the repo doesn't exists, we want to point users to that issue instead
            raise ValueError(
                "No Icechunk repository at the provided location, try opening in create mode or changing the location"
            ) from None
    return cls(store=store, read_only=read_only, args=args, kwargs=kwargs)

preserve_read_only()

Context manager to allow unpickling this store preserving read_only status. By default, stores are set to read-only after unpickling.

Source code in icechunk/__init__.py
@contextlib.contextmanager
def preserve_read_only(self) -> Generator[None, None, None]:
    """
    Context manager to allow unpickling this store preserving `read_only` status.
    By default, stores are set to read-only after unpickling.
    """
    try:
        self._pickle_preserves_read_only = True
        yield
    finally:
        self._pickle_preserves_read_only = False

reset()

Pop any uncommitted changes and reset to the previous snapshot state.

Returns:

Name Type Description
bytes The changes that were taken from the working set
Source code in icechunk/__init__.py
def reset(self) -> bytes:
    """Pop any uncommitted changes and reset to the previous snapshot state.

    Returns
    -------
    bytes : The changes that were taken from the working set
    """
    return self._store.reset()

reset_branch(to_snapshot)

Reset the currently checked out branch to point to a different snapshot.

This requires having no uncommitted changes.

The snapshot id can be obtained as the result of a commit operation, but, more probably, as the id of one of the SnapshotMetadata objects returned by ancestry()

This operation edits the repository history, it must be executed carefully. In particular, the current snapshot may end up being inaccessible from any other branches or tags.

Source code in icechunk/__init__.py
def reset_branch(self, to_snapshot: str) -> None:
    """Reset the currently checked out branch to point to a different snapshot.

    This requires having no uncommitted changes.

    The snapshot id can be obtained as the result of a commit operation, but, more probably,
    as the id of one of the SnapshotMetadata objects returned by `ancestry()`

    This operation edits the repository history, it must be executed carefully.
    In particular, the current snapshot may end up being inaccessible from any
    other branches or tags.
    """
    return self._store.reset_branch(to_snapshot)

set(key, value) async

Store a (key, value) pair.

Parameters:

Name Type Description Default
key str
required
value Buffer
required
Source code in icechunk/__init__.py
async def set(self, key: str, value: Buffer) -> None:
    """Store a (key, value) pair.

    Parameters
    ----------
    key : str
    value : Buffer
    """
    return await self._store.set(key, value.to_bytes())

set_if_not_exists(key, value) async

Store a key to value if the key is not already present.

Parameters:

Name Type Description Default
key str
required
value Buffer
required
Source code in icechunk/__init__.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    """
    Store a key to ``value`` if the key is not already present.

    Parameters
    -----------
    key : str
    value : Buffer
    """
    return await self._store.set_if_not_exists(key, value.to_bytes())

set_partial_values(key_start_values) async

Store values at a given key, starting at byte range_start.

Parameters:

Name Type Description Default
key_start_values list[tuple[str, int, BytesLike]]

set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key

required
Source code in icechunk/__init__.py
async def set_partial_values(
    self, key_start_values: Iterable[tuple[str, int, BytesLike]]
) -> None:
    """Store values at a given key, starting at byte range_start.

    Parameters
    ----------
    key_start_values : list[tuple[str, int, BytesLike]]
        set of key, range_start, values triples, a key may occur multiple times with different
        range_starts, range_starts (considering the length of the respective values) must not
        specify overlapping ranges for the same key
    """
    # NOTE: pyo3 does not implicit conversion from an Iterable to a rust iterable. So we convert it
    # to a list here first. Possible opportunity for optimization.
    return await self._store.set_partial_values(list(key_start_values))

set_read_only()

Set the store to read-only mode.

Source code in icechunk/__init__.py
def set_read_only(self) -> None:
    """Set the store to read-only mode."""
    self._store.set_read_only(read_only=True)
    self._read_only = True

set_virtual_ref(key, location, *, offset, length)

Store a virtual reference to a chunk.

Parameters:

Name Type Description Default
key str

The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'

required
location str

The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'

required
offset int

The offset in bytes from the start of the file location in storage the chunk starts at

required
length int

The length of the chunk in bytes, measured from the given offset

required
Source code in icechunk/__init__.py
def set_virtual_ref(
    self, key: str, location: str, *, offset: int, length: int
) -> None:
    """Store a virtual reference to a chunk.

    Parameters
    ----------
    key : str
        The chunk to store the reference under. This is the fully qualified zarr key eg: 'array/c/0/0/0'
    location : str
        The location of the chunk in storage. This is absolute path to the chunk in storage eg: 's3://bucket/path/to/file.nc'
    offset : int
        The offset in bytes from the start of the file location in storage the chunk starts at
    length : int
        The length of the chunk in bytes, measured from the given offset
    """
    return self._store.set_virtual_ref(key, location, offset, length)

set_writeable()

Set the store to writeable mode.

Source code in icechunk/__init__.py
def set_writeable(self) -> None:
    """Set the store to writeable mode."""
    self._store.set_read_only(read_only=False)
    self._read_only = False

sync_clear()

Clear the store.

This will remove all contents from the current session, including all groups and all arrays. But it will not modify the repository history.

Source code in icechunk/__init__.py
def sync_clear(self) -> None:
    """Clear the store.

    This will remove all contents from the current session,
    including all groups and all arrays. But it will not modify the repository history.
    """
    return self._store.sync_clear()

tag(tag_name, snapshot_id)

Create a tag pointing to the current checked out snapshot.

Source code in icechunk/__init__.py
def tag(self, tag_name: str, snapshot_id: str) -> None:
    """Create a tag pointing to the current checked out snapshot."""
    return self._store.tag(tag_name, snapshot_id=snapshot_id)

StorageConfig

Storage configuration for an IcechunkStore

Currently supports memory, filesystem, and S3 storage backends. Use the class methods to create a StorageConfig object with the desired backend.

Ex:

storage_config = StorageConfig.memory("prefix")
storage_config = StorageConfig.filesystem("/path/to/root")
storage_config = StorageConfig.s3_from_env("bucket", "prefix")
storage_config = StorageConfig.s3_from_config("bucket", "prefix", ...)

Source code in icechunk/_icechunk_python.pyi
class StorageConfig:
    """Storage configuration for an IcechunkStore

    Currently supports memory, filesystem, and S3 storage backends.
    Use the class methods to create a StorageConfig object with the desired backend.

    Ex:
    ```
    storage_config = StorageConfig.memory("prefix")
    storage_config = StorageConfig.filesystem("/path/to/root")
    storage_config = StorageConfig.s3_from_env("bucket", "prefix")
    storage_config = StorageConfig.s3_from_config("bucket", "prefix", ...)
    ```
    """

    @classmethod
    def memory(cls, prefix: str) -> StorageConfig:
        """Create a StorageConfig object for an in-memory storage backend with the given prefix"""
        ...

    @classmethod
    def filesystem(cls, root: str) -> StorageConfig:
        """Create a StorageConfig object for a local filesystem storage backend with the given root directory"""
        ...

    @classmethod
    def s3_from_env(
        cls,
        bucket: str,
        prefix: str,
        endpoint_url: str | None,
        allow_http: bool = False,
        region: str | None = None,
    ) -> StorageConfig:
        """Create a StorageConfig object for an S3 Object Storage compatible storage backend
        with the given bucket and prefix

        This assumes that the necessary credentials are available in the environment:
            AWS_REGION
            AWS_ACCESS_KEY_ID,
            AWS_SECRET_ACCESS_KEY,
            AWS_SESSION_TOKEN (optional)
            AWS_ENDPOINT_URL (optional)
            AWS_ALLOW_HTTP (optional)
        """
        ...

    @classmethod
    def s3_from_config(
        cls,
        bucket: str,
        prefix: str,
        credentials: S3Credentials,
        endpoint_url: str | None,
        allow_http: bool = False,
        region: str | None = None,
    ) -> StorageConfig:
        """Create a StorageConfig object for an S3 Object Storage compatible storage
        backend with the given bucket, prefix, and configuration

        This method will directly use the provided credentials to authenticate with the S3 service,
        ignoring any environment variables.
        """
        ...

    @classmethod
    def s3_anonymous(
        cls,
        bucket: str,
        prefix: str,
        endpoint_url: str | None,
        allow_http: bool = False,
        region: str | None = None,
    ) -> StorageConfig:
        """Create a StorageConfig object for an S3 Object Storage compatible storage
        using anonymous access
        """
        ...

filesystem(root) classmethod

Create a StorageConfig object for a local filesystem storage backend with the given root directory

Source code in icechunk/_icechunk_python.pyi
@classmethod
def filesystem(cls, root: str) -> StorageConfig:
    """Create a StorageConfig object for a local filesystem storage backend with the given root directory"""
    ...

memory(prefix) classmethod

Create a StorageConfig object for an in-memory storage backend with the given prefix

Source code in icechunk/_icechunk_python.pyi
@classmethod
def memory(cls, prefix: str) -> StorageConfig:
    """Create a StorageConfig object for an in-memory storage backend with the given prefix"""
    ...

s3_anonymous(bucket, prefix, endpoint_url, allow_http=False, region=None) classmethod

Create a StorageConfig object for an S3 Object Storage compatible storage using anonymous access

Source code in icechunk/_icechunk_python.pyi
@classmethod
def s3_anonymous(
    cls,
    bucket: str,
    prefix: str,
    endpoint_url: str | None,
    allow_http: bool = False,
    region: str | None = None,
) -> StorageConfig:
    """Create a StorageConfig object for an S3 Object Storage compatible storage
    using anonymous access
    """
    ...

s3_from_config(bucket, prefix, credentials, endpoint_url, allow_http=False, region=None) classmethod

Create a StorageConfig object for an S3 Object Storage compatible storage backend with the given bucket, prefix, and configuration

This method will directly use the provided credentials to authenticate with the S3 service, ignoring any environment variables.

Source code in icechunk/_icechunk_python.pyi
@classmethod
def s3_from_config(
    cls,
    bucket: str,
    prefix: str,
    credentials: S3Credentials,
    endpoint_url: str | None,
    allow_http: bool = False,
    region: str | None = None,
) -> StorageConfig:
    """Create a StorageConfig object for an S3 Object Storage compatible storage
    backend with the given bucket, prefix, and configuration

    This method will directly use the provided credentials to authenticate with the S3 service,
    ignoring any environment variables.
    """
    ...

s3_from_env(bucket, prefix, endpoint_url, allow_http=False, region=None) classmethod

Create a StorageConfig object for an S3 Object Storage compatible storage backend with the given bucket and prefix

This assumes that the necessary credentials are available in the environment: AWS_REGION AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN (optional) AWS_ENDPOINT_URL (optional) AWS_ALLOW_HTTP (optional)

Source code in icechunk/_icechunk_python.pyi
@classmethod
def s3_from_env(
    cls,
    bucket: str,
    prefix: str,
    endpoint_url: str | None,
    allow_http: bool = False,
    region: str | None = None,
) -> StorageConfig:
    """Create a StorageConfig object for an S3 Object Storage compatible storage backend
    with the given bucket and prefix

    This assumes that the necessary credentials are available in the environment:
        AWS_REGION
        AWS_ACCESS_KEY_ID,
        AWS_SECRET_ACCESS_KEY,
        AWS_SESSION_TOKEN (optional)
        AWS_ENDPOINT_URL (optional)
        AWS_ALLOW_HTTP (optional)
    """
    ...

StoreConfig

Configuration for an IcechunkStore

Source code in icechunk/_icechunk_python.pyi
class StoreConfig:
    """Configuration for an IcechunkStore"""

    # The number of concurrent requests to make when fetching partial values
    get_partial_values_concurrency: int | None
    # The threshold at which to inline chunks in the store in bytes. When set,
    # chunks smaller than this threshold will be inlined in the store. Default is
    # 512 bytes.
    inline_chunk_threshold_bytes: int | None
    # Whether to allow overwriting refs in the store. Default is False. Experimental.
    unsafe_overwrite_refs: bool | None
    # Configurations for virtual references such as credentials and endpoints
    virtual_ref_config: VirtualRefConfig | None

    def __init__(
        self,
        get_partial_values_concurrency: int | None = None,
        inline_chunk_threshold_bytes: int | None = None,
        unsafe_overwrite_refs: bool | None = None,
        virtual_ref_config: VirtualRefConfig | None = None,
    ):
        """Create a StoreConfig object with the given configuration options

        Parameters
        ----------
        get_partial_values_concurrency: int | None
            The number of concurrent requests to make when fetching partial values
        inline_chunk_threshold_bytes: int | None
            The threshold at which to inline chunks in the store in bytes. When set,
            chunks smaller than this threshold will be inlined in the store. Default is
            512 bytes when not specified.
        unsafe_overwrite_refs: bool | None
            Whether to allow overwriting refs in the store. Default is False. Experimental.
        virtual_ref_config: VirtualRefConfig | None
            Configurations for virtual references such as credentials and endpoints

        Returns
        -------
        StoreConfig
            A StoreConfig object with the given configuration options
        """
        ...

__init__(get_partial_values_concurrency=None, inline_chunk_threshold_bytes=None, unsafe_overwrite_refs=None, virtual_ref_config=None)

Create a StoreConfig object with the given configuration options

Parameters:

Name Type Description Default
get_partial_values_concurrency int | None

The number of concurrent requests to make when fetching partial values

None
inline_chunk_threshold_bytes int | None

The threshold at which to inline chunks in the store in bytes. When set, chunks smaller than this threshold will be inlined in the store. Default is 512 bytes when not specified.

None
unsafe_overwrite_refs bool | None

Whether to allow overwriting refs in the store. Default is False. Experimental.

None
virtual_ref_config VirtualRefConfig | None

Configurations for virtual references such as credentials and endpoints

None

Returns:

Type Description
StoreConfig

A StoreConfig object with the given configuration options

Source code in icechunk/_icechunk_python.pyi
def __init__(
    self,
    get_partial_values_concurrency: int | None = None,
    inline_chunk_threshold_bytes: int | None = None,
    unsafe_overwrite_refs: bool | None = None,
    virtual_ref_config: VirtualRefConfig | None = None,
):
    """Create a StoreConfig object with the given configuration options

    Parameters
    ----------
    get_partial_values_concurrency: int | None
        The number of concurrent requests to make when fetching partial values
    inline_chunk_threshold_bytes: int | None
        The threshold at which to inline chunks in the store in bytes. When set,
        chunks smaller than this threshold will be inlined in the store. Default is
        512 bytes when not specified.
    unsafe_overwrite_refs: bool | None
        Whether to allow overwriting refs in the store. Default is False. Experimental.
    virtual_ref_config: VirtualRefConfig | None
        Configurations for virtual references such as credentials and endpoints

    Returns
    -------
    StoreConfig
        A StoreConfig object with the given configuration options
    """
    ...

VirtualRefConfig

Source code in icechunk/_icechunk_python.pyi
class VirtualRefConfig:
    class S3:
        """Config for an S3 Object Storage compatible storage backend"""

        credentials: S3Credentials | None
        endpoint_url: str | None
        allow_http: bool | None
        region: str | None

    @classmethod
    def s3_from_env(cls) -> VirtualRefConfig:
        """Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage backend
        with the given bucket and prefix

        This assumes that the necessary credentials are available in the environment:
            AWS_REGION or AWS_DEFAULT_REGION
            AWS_ACCESS_KEY_ID,
            AWS_SECRET_ACCESS_KEY,
            AWS_SESSION_TOKEN (optional)
            AWS_ENDPOINT_URL (optional)
            AWS_ALLOW_HTTP (optional)
        """
        ...

    @classmethod
    def s3_from_config(
        cls,
        credentials: S3Credentials,
        *,
        endpoint_url: str | None = None,
        allow_http: bool | None = None,
        region: str | None = None,
    ) -> VirtualRefConfig:
        """Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage
        backend with the given bucket, prefix, and configuration

        This method will directly use the provided credentials to authenticate with the S3 service,
        ignoring any environment variables.
        """
        ...

    @classmethod
    def s3_anonymous(
        cls,
        *,
        endpoint_url: str | None = None,
        allow_http: bool | None = None,
        region: str | None = None,
    ) -> VirtualRefConfig:
        """Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage
        using anonymous access
        """
        ...

S3

Config for an S3 Object Storage compatible storage backend

Source code in icechunk/_icechunk_python.pyi
class S3:
    """Config for an S3 Object Storage compatible storage backend"""

    credentials: S3Credentials | None
    endpoint_url: str | None
    allow_http: bool | None
    region: str | None

s3_anonymous(*, endpoint_url=None, allow_http=None, region=None) classmethod

Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage using anonymous access

Source code in icechunk/_icechunk_python.pyi
@classmethod
def s3_anonymous(
    cls,
    *,
    endpoint_url: str | None = None,
    allow_http: bool | None = None,
    region: str | None = None,
) -> VirtualRefConfig:
    """Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage
    using anonymous access
    """
    ...

s3_from_config(credentials, *, endpoint_url=None, allow_http=None, region=None) classmethod

Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage backend with the given bucket, prefix, and configuration

This method will directly use the provided credentials to authenticate with the S3 service, ignoring any environment variables.

Source code in icechunk/_icechunk_python.pyi
@classmethod
def s3_from_config(
    cls,
    credentials: S3Credentials,
    *,
    endpoint_url: str | None = None,
    allow_http: bool | None = None,
    region: str | None = None,
) -> VirtualRefConfig:
    """Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage
    backend with the given bucket, prefix, and configuration

    This method will directly use the provided credentials to authenticate with the S3 service,
    ignoring any environment variables.
    """
    ...

s3_from_env() classmethod

Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage backend with the given bucket and prefix

This assumes that the necessary credentials are available in the environment: AWS_REGION or AWS_DEFAULT_REGION AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN (optional) AWS_ENDPOINT_URL (optional) AWS_ALLOW_HTTP (optional)

Source code in icechunk/_icechunk_python.pyi
@classmethod
def s3_from_env(cls) -> VirtualRefConfig:
    """Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage backend
    with the given bucket and prefix

    This assumes that the necessary credentials are available in the environment:
        AWS_REGION or AWS_DEFAULT_REGION
        AWS_ACCESS_KEY_ID,
        AWS_SECRET_ACCESS_KEY,
        AWS_SESSION_TOKEN (optional)
        AWS_ENDPOINT_URL (optional)
        AWS_ALLOW_HTTP (optional)
    """
    ...

XarrayDatasetWriter dataclass

Write Xarray Datasets to a group in an Icechunk store.

This class is private API. Please do not use it.

Source code in icechunk/xarray.py
@dataclass
class XarrayDatasetWriter:
    """
    Write Xarray Datasets to a group in an Icechunk store.

    This class is private API. Please do not use it.
    """

    dataset: Dataset = field(repr=False)
    store: IcechunkStore = field(kw_only=True)

    safe_chunks: bool = field(kw_only=True, default=True)
    # TODO: uncomment when Zarr has support
    # write_empty_chunks: bool = field(kw_only=True, default=True)

    _initialized: bool = field(default=False, repr=False)

    xarray_store: ZarrStore = field(init=False, repr=False)
    writer: LazyArrayWriter = field(init=False, repr=False)

    def __post_init__(self) -> None:
        if not isinstance(self.store, IcechunkStore):
            raise ValueError(
                f"Please pass in an IcechunkStore. Received {type(self.store)!r} instead."
            )

    def _open_group(
        self,
        *,
        group: str | None,
        mode: ZarrWriteModes | None,
        append_dim: Hashable | None,
        region: Region,
    ) -> None:
        concrete_mode: ZarrWriteModes = _choose_default_mode(
            mode=mode, append_dim=append_dim, region=region
        )

        self.xarray_store = ZarrStore.open_group(
            store=self.store,
            group=group,
            mode=concrete_mode,
            zarr_format=3,
            append_dim=append_dim,
            write_region=region,
            safe_chunks=self.safe_chunks,
            # TODO: uncomment when Zarr has support
            # write_empty=self.write_empty_chunks,
            synchronizer=None,
            consolidated=False,
            consolidate_on_close=False,
            zarr_version=None,
        )

    def write_metadata(self, encoding: Mapping[Any, Any] | None = None) -> None:
        """
        This method creates new Zarr arrays when necessary, writes attributes,
        and any in-memory arrays.
        """
        from xarray.backends.api import _validate_dataset_names, dump_to_store

        # validate Dataset keys, DataArray names
        _validate_dataset_names(self.dataset)

        if encoding is None:
            encoding = {}
        self.xarray_store._validate_encoding(encoding)

        # This writes the metadata (zarr.json) for all arrays
        # This also will resize arrays for any appends
        self.writer = LazyArrayWriter()
        dump_to_store(self.dataset, self.xarray_store, self.writer, encoding=encoding)  # type: ignore[no-untyped-call]

        self._initialized = True

    def write_eager(self) -> None:
        """
        Write in-memory variables to store.

        Returns
        -------
        None
        """
        if not self._initialized:
            raise ValueError("Please call `write_metadata` first.")
        self.writer.write_eager()

    def write_lazy(
        self,
        chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
        split_every: int | None = None,
    ) -> None:
        """
        Write lazy arrays (e.g. dask) to store.
        """
        if not self._initialized:
            raise ValueError("Please call `write_metadata` first.")

        if not self.writer.sources:
            return

        chunkmanager_store_kwargs = chunkmanager_store_kwargs or {}
        chunkmanager_store_kwargs["load_stored"] = False
        chunkmanager_store_kwargs["return_stored"] = True

        # This calls dask.array.store, and we receive a dask array where each chunk is a Zarr array
        # each of those zarr.Array.store contains the changesets we need
        stored_arrays = self.writer.sync(
            compute=False, chunkmanager_store_kwargs=chunkmanager_store_kwargs
        )  # type: ignore[no-untyped-call]

        # Now we tree-reduce all changesets
        merged_store = stateful_store_reduce(
            stored_arrays,
            prefix="ice-changeset",
            chunk=extract_store,
            aggregate=merge_stores,
            split_every=split_every,
            compute=True,
            **chunkmanager_store_kwargs,
        )
        self.store.merge(merged_store.change_set_bytes())

write_eager()

Write in-memory variables to store.

Returns:

Type Description
None
Source code in icechunk/xarray.py
def write_eager(self) -> None:
    """
    Write in-memory variables to store.

    Returns
    -------
    None
    """
    if not self._initialized:
        raise ValueError("Please call `write_metadata` first.")
    self.writer.write_eager()

write_lazy(chunkmanager_store_kwargs=None, split_every=None)

Write lazy arrays (e.g. dask) to store.

Source code in icechunk/xarray.py
def write_lazy(
    self,
    chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
    split_every: int | None = None,
) -> None:
    """
    Write lazy arrays (e.g. dask) to store.
    """
    if not self._initialized:
        raise ValueError("Please call `write_metadata` first.")

    if not self.writer.sources:
        return

    chunkmanager_store_kwargs = chunkmanager_store_kwargs or {}
    chunkmanager_store_kwargs["load_stored"] = False
    chunkmanager_store_kwargs["return_stored"] = True

    # This calls dask.array.store, and we receive a dask array where each chunk is a Zarr array
    # each of those zarr.Array.store contains the changesets we need
    stored_arrays = self.writer.sync(
        compute=False, chunkmanager_store_kwargs=chunkmanager_store_kwargs
    )  # type: ignore[no-untyped-call]

    # Now we tree-reduce all changesets
    merged_store = stateful_store_reduce(
        stored_arrays,
        prefix="ice-changeset",
        chunk=extract_store,
        aggregate=merge_stores,
        split_every=split_every,
        compute=True,
        **chunkmanager_store_kwargs,
    )
    self.store.merge(merged_store.change_set_bytes())

write_metadata(encoding=None)

This method creates new Zarr arrays when necessary, writes attributes, and any in-memory arrays.

Source code in icechunk/xarray.py
def write_metadata(self, encoding: Mapping[Any, Any] | None = None) -> None:
    """
    This method creates new Zarr arrays when necessary, writes attributes,
    and any in-memory arrays.
    """
    from xarray.backends.api import _validate_dataset_names, dump_to_store

    # validate Dataset keys, DataArray names
    _validate_dataset_names(self.dataset)

    if encoding is None:
        encoding = {}
    self.xarray_store._validate_encoding(encoding)

    # This writes the metadata (zarr.json) for all arrays
    # This also will resize arrays for any appends
    self.writer = LazyArrayWriter()
    dump_to_store(self.dataset, self.xarray_store, self.writer, encoding=encoding)  # type: ignore[no-untyped-call]

    self._initialized = True

to_icechunk(dataset, store, *, group=None, mode=None, safe_chunks=True, append_dim=None, region=None, encoding=None, chunkmanager_store_kwargs=None, split_every=None, **kwargs)

Write an Xarray Dataset to a group of an icechunk store.

Parameters:

Name Type Description Default
store (MutableMapping, str or path - like)

Store or path to directory in local or remote file system.

required
mode "w", "w-", "a", "a-", r+", None

Persistence mode: "w" means create (overwrite if exists); "w-" means create (fail if exists); "a" means override all existing variables including dimension coordinates (create if does not exist); "a-" means only append those variables that have append_dim. "r+" means modify existing array values only (raise an error if any metadata or shapes would change). The default mode is "a" if append_dim is set. Otherwise, it is "r+" if region is set and w- otherwise.

"w"
group str

Group path. (a.k.a. path in zarr terminology.)

None
encoding dict

Nested dictionary with variable names as keys and dictionaries of variable specific encodings as values, e.g., {"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}

None
append_dim hashable

If set, the dimension along which the data will be appended. All other dimensions on overridden variables must remain the same size.

None
region dict or auto

Optional mapping from dimension names to either a) "auto", or b) integer slices, indicating the region of existing zarr array(s) in which to write this dataset's data.

If "auto" is provided the existing store will be opened and the region inferred by matching indexes. "auto" can be used as a single string, which will automatically infer the region for all dimensions, or as dictionary values for specific dimensions mixed together with explicit slices for other dimensions.

Alternatively integer slices can be provided; for example, {'x': slice(0, 1000), 'y': slice(10000, 11000)} would indicate that values should be written to the region 0:1000 along x and 10000:11000 along y.

Users are expected to ensure that the specified region aligns with Zarr chunk boundaries, and that dask chunks are also aligned. Xarray makes limited checks that these multiple chunk boundaries line up. It is possible to write incomplete chunks and corrupt the data with this option if you are not careful.

None
safe_chunks bool

If True, only allow writes to when there is a many-to-one relationship between Zarr chunks (specified in encoding) and Dask chunks. Set False to override this restriction; however, data may become corrupted if Zarr arrays are written in parallel. In addition to the many-to-one relationship validation, it also detects partial chunks writes when using the region parameter, these partial chunks are considered unsafe in the mode "r+" but safe in the mode "a". Note: Even with these validations it can still be unsafe to write two or more chunked arrays in the same location in parallel if they are not writing in independent regions.

True
chunkmanager_store_kwargs dict

Additional keyword arguments passed on to the ChunkManager.store method used to store chunked arrays. For example for a dask array additional kwargs will be passed eventually to dask.array.store(). Experimental API that should not be relied upon.

None
split_every int | None

Number of tasks to merge at every level of the tree reduction.

None

Returns:

Type Description
None
Notes

Two restrictions apply to the use of region:

  • If region is set, all variables in a dataset must have at least one dimension in common with the region. Other variables should be written in a separate single call to to_zarr().
  • Dimensions cannot be included in both region and append_dim at the same time. To create empty arrays to fill in with region, use the XarrayDatasetWriter directly.
Source code in icechunk/xarray.py
def to_icechunk(
    dataset: Dataset,
    store: IcechunkStore,
    *,
    group: str | None = None,
    mode: ZarrWriteModes | None = None,
    # TODO: uncomment when Zarr has support
    # write_empty_chunks: bool | None = None,
    safe_chunks: bool = True,
    append_dim: Hashable | None = None,
    region: Region = None,
    encoding: Mapping[Any, Any] | None = None,
    chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
    split_every: int | None = None,
    **kwargs: Any,
) -> None:
    """
    Write an Xarray Dataset to a group of an icechunk store.

    Parameters
    ----------
    store : MutableMapping, str or path-like, optional
        Store or path to directory in local or remote file system.
    mode : {"w", "w-", "a", "a-", r+", None}, optional
        Persistence mode: "w" means create (overwrite if exists);
        "w-" means create (fail if exists);
        "a" means override all existing variables including dimension coordinates (create if does not exist);
        "a-" means only append those variables that have ``append_dim``.
        "r+" means modify existing array *values* only (raise an error if
        any metadata or shapes would change).
        The default mode is "a" if ``append_dim`` is set. Otherwise, it is
        "r+" if ``region`` is set and ``w-`` otherwise.
    group : str, optional
        Group path. (a.k.a. `path` in zarr terminology.)
    encoding : dict, optional
        Nested dictionary with variable names as keys and dictionaries of
        variable specific encodings as values, e.g.,
        ``{"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}``
    append_dim : hashable, optional
        If set, the dimension along which the data will be appended. All
        other dimensions on overridden variables must remain the same size.
    region : dict or "auto", optional
        Optional mapping from dimension names to either a) ``"auto"``, or b) integer
        slices, indicating the region of existing zarr array(s) in which to write
        this dataset's data.

        If ``"auto"`` is provided the existing store will be opened and the region
        inferred by matching indexes. ``"auto"`` can be used as a single string,
        which will automatically infer the region for all dimensions, or as
        dictionary values for specific dimensions mixed together with explicit
        slices for other dimensions.

        Alternatively integer slices can be provided; for example, ``{'x': slice(0,
        1000), 'y': slice(10000, 11000)}`` would indicate that values should be
        written to the region ``0:1000`` along ``x`` and ``10000:11000`` along
        ``y``.

        Users are expected to ensure that the specified region aligns with
        Zarr chunk boundaries, and that dask chunks are also aligned.
        Xarray makes limited checks that these multiple chunk boundaries line up.
        It is possible to write incomplete chunks and corrupt the data with this
        option if you are not careful.
    safe_chunks : bool, default: True
        If True, only allow writes to when there is a many-to-one relationship
        between Zarr chunks (specified in encoding) and Dask chunks.
        Set False to override this restriction; however, data may become corrupted
        if Zarr arrays are written in parallel.
        In addition to the many-to-one relationship validation, it also detects partial
        chunks writes when using the region parameter,
        these partial chunks are considered unsafe in the mode "r+" but safe in
        the mode "a".
        Note: Even with these validations it can still be unsafe to write
        two or more chunked arrays in the same location in parallel if they are
        not writing in independent regions.
    chunkmanager_store_kwargs : dict, optional
        Additional keyword arguments passed on to the `ChunkManager.store` method used to store
        chunked arrays. For example for a dask array additional kwargs will be passed eventually to
        `dask.array.store()`. Experimental API that should not be relied upon.
    split_every: int, optional
        Number of tasks to merge at every level of the tree reduction.

    Returns
    -------
    None

    Notes
    -----
    Two restrictions apply to the use of ``region``:

      - If ``region`` is set, _all_ variables in a dataset must have at
        least one dimension in common with the region. Other variables
        should be written in a separate single call to ``to_zarr()``.
      - Dimensions cannot be included in both ``region`` and
        ``append_dim`` at the same time. To create empty arrays to fill
        in with ``region``, use the `XarrayDatasetWriter` directly.
    """
    writer = XarrayDatasetWriter(dataset, store=store)

    writer._open_group(group=group, mode=mode, append_dim=append_dim, region=region)

    # write metadata
    writer.write_metadata(encoding)
    # write in-memory arrays
    writer.write_eager()
    # eagerly write dask arrays
    writer.write_lazy(chunkmanager_store_kwargs=chunkmanager_store_kwargs)