Skip to content

Commit 3d991f0

Browse files
committed
Use atomic writes in LocalStore
Fixes #3411 I use the standard strategy of writing to a temporary file in the same directory, and then renaming it to the desired name. This ensure that Zarr writes are either complete or not written at all.
1 parent c9509ee commit 3d991f0

File tree

2 files changed

+86
-10
lines changed

2 files changed

+86
-10
lines changed

src/zarr/storage/_local.py

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
from __future__ import annotations
22

33
import asyncio
4+
from collections.abc import Iterator
5+
import contextlib
46
import io
57
import os
68
import shutil
9+
import sys
710
from pathlib import Path
8-
from typing import TYPE_CHECKING, Self
11+
from typing import TYPE_CHECKING, Literal, BinaryIO, Self
12+
import uuid
913

1014
from zarr.abc.store import (
1115
ByteRequest,
@@ -41,27 +45,55 @@ def _get(path: Path, prototype: BufferPrototype, byte_range: ByteRequest | None)
4145
return prototype.buffer.from_bytes(f.read())
4246

4347

48+
if sys.platform == 'win32':
49+
# Per the os.rename docs:
50+
# On Windows, if dst exists a FileExistsError is always raised.
51+
_safe_move = os.rename
52+
else:
53+
# On Unix, os.rename silently replace files, so instead we use os.link like
54+
# atomicwrites:
55+
# https://github.com/untitaker/python-atomicwrites/blob/1.4.1/atomicwrites/__init__.py#L59-L60
56+
# This also raises FileExistsError if dst exists.
57+
def _safe_move(src: Path, dst: Path) -> None:
58+
os.link(src, dst)
59+
os.unlink(src)
60+
61+
62+
@contextlib.contextmanager
63+
def _atomic_write(
64+
path: Path,
65+
mode: Literal["r+b", "wb"],
66+
exclusive: bool = False,
67+
) -> Iterator[BinaryIO]:
68+
tmp_path = path.with_suffix(f'.{uuid.uuid4().hex}.partial')
69+
try:
70+
with tmp_path.open(mode) as f:
71+
yield f
72+
if exclusive:
73+
_safe_move(tmp_path, path)
74+
else:
75+
tmp_path.replace(path)
76+
except Exception:
77+
tmp_path.unlink(missing_ok=True)
78+
raise
79+
80+
4481
def _put(
4582
path: Path,
4683
value: Buffer,
4784
start: int | None = None,
4885
exclusive: bool = False,
4986
) -> int | None:
5087
path.parent.mkdir(parents=True, exist_ok=True)
88+
# write takes any object supporting the buffer protocol
89+
view = value.as_buffer_like()
5190
if start is not None:
5291
with path.open("r+b") as f:
5392
f.seek(start)
54-
# write takes any object supporting the buffer protocol
55-
f.write(value.as_buffer_like())
93+
f.write(view)
5694
return None
5795
else:
58-
view = value.as_buffer_like()
59-
if exclusive:
60-
mode = "xb"
61-
else:
62-
mode = "wb"
63-
with path.open(mode=mode) as f:
64-
# write takes any object supporting the buffer protocol
96+
with _atomic_write(path, "wb", exclusive=exclusive) as f:
6597
return f.write(view)
6698

6799

tests/test_store/test_local.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from zarr import create_array
1111
from zarr.core.buffer import Buffer, cpu
1212
from zarr.storage import LocalStore
13+
from zarr.storage._local import _atomic_write
1314
from zarr.testing.store import StoreTests
1415
from zarr.testing.utils import assert_bytes_equal
1516

@@ -109,3 +110,46 @@ async def test_move(
109110
FileExistsError, match=re.escape(f"Destination root {destination} already exists")
110111
):
111112
await store2.move(destination)
113+
114+
115+
@pytest.mark.parametrize("exclusive", [True, False])
116+
def test_atomic_write_successful(tmp_path: pathlib.Path, exclusive: bool) -> None:
117+
path = pathlib.Path(tmp_path) / 'data'
118+
with _atomic_write(path, 'wb', exclusive=exclusive) as f:
119+
f.write(b'abc')
120+
assert path.read_bytes() == b'abc'
121+
assert list(path.parent.iterdir()) == [path] # no temp files
122+
123+
124+
@pytest.mark.parametrize("exclusive", [True, False])
125+
def test_atomic_write_incomplete(tmp_path: pathlib.Path, exclusive: bool) -> None:
126+
path = pathlib.Path(tmp_path) / 'data'
127+
with pytest.raises(RuntimeError):
128+
with _atomic_write(path, 'wb', exclusive=exclusive) as f:
129+
f.write(b'a')
130+
raise RuntimeError
131+
assert not path.exists()
132+
assert list(path.parent.iterdir()) == [] # no temp files
133+
134+
135+
def test_atomic_write_non_exclusive_preexisting(tmp_path: pathlib.Path) -> None:
136+
path = pathlib.Path(tmp_path) / 'data'
137+
with path.open('wb') as f:
138+
f.write(b'xyz')
139+
assert path.read_bytes() == b'xyz'
140+
with _atomic_write(path, 'wb', exclusive=False) as f:
141+
f.write(b'abc')
142+
assert path.read_bytes() == b'abc'
143+
assert list(path.parent.iterdir()) == [path] # no temp files
144+
145+
146+
def test_atomic_write_exclusive_preexisting(tmp_path: pathlib.Path) -> None:
147+
path = pathlib.Path(tmp_path) / 'data'
148+
with path.open('wb') as f:
149+
f.write(b'xyz')
150+
assert path.read_bytes() == b'xyz'
151+
with pytest.raises(FileExistsError):
152+
with _atomic_write(path, 'wb', exclusive=True) as f:
153+
f.write(b'abc')
154+
assert path.read_bytes() == b'xyz'
155+
assert list(path.parent.iterdir()) == [path] # no temp files

0 commit comments

Comments
 (0)