Home / icechunk-python / parallel
Parallel Writes#
A common pattern with large distributed write jobs is to first initialize the dataset on a disk with all appropriate metadata, and any coordinate variables. Following this a large write job is kicked off in a distributed setting, where each worker is responsible for an independent "region" of the output.
Why is Icechunk different from any other Zarr store?#
The reason is that unlike Zarr, Icechunk is a "stateful" store. The Session object keeps a record of all writes, that is then bundled together in a commit. Thus Session.commit
must be executed on a Session object that knows about all writes, including those executed remotely in a multi-processing or any other remote execution context.
Example#
Here is how you can execute such writes with Icechunk, illustrate with a ThreadPoolExecutor
. First read some example data, and create an Icechunk Repository.
import xarray as xr
import tempfile
from icechunk import Repository, local_filesystem_storage
ds = xr.tutorial.open_dataset("rasm").isel(time=slice(24))
repo = Repository.create(local_filesystem_storage(tempfile.TemporaryDirectory().name))
session = repo.writable_session("main")
We will orchestrate so that each task writes one timestep. This is an arbitrary choice but determines what we set for the Zarr chunk size.
Initialize the dataset using Dataset.to_zarr
and compute=False
, this will NOT write any chunked array data, but will write all array metadata, and any in-memory arrays (only time
in this case).
ds.to_zarr(session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w")
# this commit is optional, but may be useful in your workflow
print(session.commit("initialize store"))
21P4QN2J8V2WNTS4VVK0
Multi-threading#
First define a function that constitutes one "write task".
from icechunk import Session
def write_timestamp(*, itime: int, session: Session) -> None:
# pass a list to isel to preserve the time dimension
ds = xr.tutorial.open_dataset("rasm").isel(time=[itime])
# region="auto" tells Xarray to infer which "region" of the output arrays to write to.
ds.to_zarr(session.store, region="auto", consolidated=False)
Now execute the writes.
from concurrent.futures import ThreadPoolExecutor, wait
session = repo.writable_session("main")
with ThreadPoolExecutor() as executor:
# submit the writes
futures = [executor.submit(write_timestamp, itime=i, session=session) for i in range(ds.sizes["time"])]
wait(futures)
print(session.commit("finished writes"))
Verify that the writes worked as expected:
ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False)
xr.testing.assert_identical(ds, ondisk)
Distributed writes#
There are fundamentally two different modes for distributed writes in Icechunk:
- "Cooperative" distributed writes, in which all of the changes being written are part of the same transaction. The point of this is to allow large-scale, massively-parallel writing to the store as part of a single coordinated job. In this scenario, it's the user's job to align the writing process with the Zarr chunks and avoid inconsistent metadata updates.
- "Uncooperative" writes, in which multiple workers are attempting to write to the same store in an uncoordinated way. This path relies on the optimistic concurrency mechanism to detect and resolve conflicts.
Info
This code will not execute with a ProcessPoolExecutor
without some changes. Specifically it requires wrapping the code in a if __name__ == "__main__":
block. See a full executable example here.
Cooperative distributed writes#
Any task execution framework (e.g. ProcessPoolExecutor
, Joblib, Lithops, Dask Distributed, Ray, etc.) can be used instead of the ThreadPoolExecutor
. However such workloads should account for Icechunk being a "stateful" store that records changes executed in a write session.
There are three key points to keep in mind:
- The
write_task
function must return theSession
. It contains a record of the changes executed by this task. These changes must be manually communicated back to the coordinating process, since each of the distributed processes are working with their own independentSession
instance. - Icechunk requires that users opt-in to pickling a writable
Session
using theSession.allow_pickling()
context manager, to remind the user that distributed writes with Icechunk require care. - The user must manually merge the Session objects to create a meaningful commit.
First we modify write_task
to return the Session
:
from icechunk import Session
def write_timestamp(*, itime: int, session: Session) -> Session:
# pass a list to isel to preserve the time dimension
ds = xr.tutorial.open_dataset("rasm").isel(time=[itime])
# region="auto" tells Xarray to infer which "region" of the output arrays to write to.
ds.to_zarr(session.store, region="auto", consolidated=False)
return session
Now we issue write tasks within the session.allow_pickling()
context, gather the Sessions from individual tasks, merge them, and make a successful commit.
from concurrent.futures import ProcessPoolExecutor
from icechunk.distributed import merge_sessions
session = repo.writable_session("main")
with ProcessPoolExecutor() as executor:
# opt-in to successful pickling of a writable session
with session.allow_pickling():
# submit the writes
futures = [
executor.submit(write_timestamp, itime=i, session=session)
for i in range(ds.sizes["time"])
]
# grab the Session objects from each individual write task
sessions = [f.result() for f in futures]
# manually merge the remote sessions in to the local session
session = merge_sessions(session, *sessions)
print(session.commit("finished writes"))
Verify that the writes worked as expected:
ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False)
xr.testing.assert_identical(ds, ondisk)
Uncooperative distributed writes#
Warning
Using multiprocessing start method 'fork' will result in deadlock when trying to open an existing repository. This happens because the files behind the repository needs to be locked. The 'fork' start method will copying not only the lock, but also the state of the lock. Thus all child processes will copy the file lock in an acquired state, leaving them hanging indefinitely waiting for the file lock to be released, which never happens. Polars has a similar issue, which is described in their documentation about multiprocessing. Putting mp.set_start_method('forkserver')
at the beginning of the script will solve this issue. Only necessary for POSIX systems except MacOS, because MacOS and Windows do not support the fork
method.
Here is an example of uncooperative distributed writes using multiprocessing
, based on this discussion.
import multiprocessing as mp
import icechunk as ic
import zarr
def get_storage():
return ic.local_filesystem_storage(tempfile.TemporaryDirectory().name)
def worker(i):
print(f"Stated worker {i}")
storage = get_storage()
repo = ic.Repository.open(storage)
# keep trying until it succeeds
while True:
try:
session = repo.writable_session("main")
z = zarr.open(session.store, mode="r+")
print(f"Opened store for {i} | {dict(z.attrs)}")
a = z.attrs.get("done", [])
a.append(i)
z.attrs["done"] = a
session.commit(f"wrote from worker {i}")
break
except ic.ConflictError:
print(f"Conflict for {i}, retrying")
pass
def main():
# This is necessary on linux systems
mp.set_start_method('forkserver')
storage = get_storage()
repo = ic.Repository.create(storage)
session = repo.writable_session("main")
zarr.create(
shape=(10, 10),
chunks=(5, 5),
store=session.store,
overwrite=True,
)
session.commit("initialized dataset")
p1 = mp.Process(target=worker, args=(1,))
p2 = mp.Process(target=worker, args=(2,))
p1.start()
p2.start()
p1.join()
p2.join()
session = repo.readonly_session(branch="main")
z = zarr.open(session.store, mode="r")
print(z.attrs["done"])
print(list(repo.ancestry(branch="main")))
if __name__ == "__main__":
main()
This should output something like the following. (Note that the order of the writes is not guaranteed.)
Stated worker 1
Stated worker 2
Opened store for 1 | {}
Opened store for 2 | {}
Conflict for 1, retrying
Opened store for 1 | {'done': [2]}
[2, 1]
[SnapshotInfo(id="MGPV1YE1SY0799AZFFB0", parent_id=YAN3D2N7ANCNKCFN3JSG, written_at=datetime.datetime(2025,3,4,21,40,57,19985, tzinfo=datetime.timezone.utc), message="wrote from..."), SnapshotInfo(id="YAN3D2N7ANCNKCFN3JSG", parent_id=0M5H3J6SC8MYBQYWACC0, written_at=datetime.datetime(2025,3,4,21,40,56,734126, tzinfo=datetime.timezone.utc), message="wrote from..."), SnapshotInfo(id="0M5H3J6SC8MYBQYWACC0", parent_id=WKKQ9K7ZFXZER26SES5G, written_at=datetime.datetime(2025,3,4,21,40,56,47192, tzinfo=datetime.timezone.utc), message="initialize..."), SnapshotInfo(id="WKKQ9K7ZFXZER26SES5G", parent_id=None, written_at=datetime.datetime(2025,3,4,21,40,55,868277, tzinfo=datetime.timezone.utc), message="Repository...")]