Skip to content

Commit 96a531b

Browse files
authored
Use atomic writes for new files in LocalStore (#3412)
* Use atomic writes in LocalStore Fixes #3411 I use the standard strategy of writing to a temporary file in the same directory, and then renaming it to the desired name. This ensure that Zarr writes are either complete or not written at all. * Lint fixes * import sort * actually fix import order * ruff format * Add release note
1 parent c9509ee commit 96a531b

File tree

3 files changed

+87
-11
lines changed

3 files changed

+87
-11
lines changed

changes/3411.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
LocalStore now uses atomic writes, which should prevent some cases of corrupted data.

src/zarr/storage/_local.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import contextlib
45
import io
56
import os
67
import shutil
8+
import sys
9+
import uuid
710
from pathlib import Path
8-
from typing import TYPE_CHECKING, Self
11+
from typing import TYPE_CHECKING, BinaryIO, Literal, Self
912

1013
from zarr.abc.store import (
1114
ByteRequest,
@@ -19,7 +22,7 @@
1922
from zarr.core.common import AccessModeLiteral, concurrent_map
2023

2124
if TYPE_CHECKING:
22-
from collections.abc import AsyncIterator, Iterable
25+
from collections.abc import AsyncIterator, Iterable, Iterator
2326

2427
from zarr.core.buffer import BufferPrototype
2528

@@ -41,27 +44,55 @@ def _get(path: Path, prototype: BufferPrototype, byte_range: ByteRequest | None)
4144
return prototype.buffer.from_bytes(f.read())
4245

4346

47+
if sys.platform == "win32":
48+
# Per the os.rename docs:
49+
# On Windows, if dst exists a FileExistsError is always raised.
50+
_safe_move = os.rename
51+
else:
52+
# On Unix, os.rename silently replace files, so instead we use os.link like
53+
# atomicwrites:
54+
# https://github.com/untitaker/python-atomicwrites/blob/1.4.1/atomicwrites/__init__.py#L59-L60
55+
# This also raises FileExistsError if dst exists.
56+
def _safe_move(src: Path, dst: Path) -> None:
57+
os.link(src, dst)
58+
os.unlink(src)
59+
60+
61+
@contextlib.contextmanager
62+
def _atomic_write(
63+
path: Path,
64+
mode: Literal["r+b", "wb"],
65+
exclusive: bool = False,
66+
) -> Iterator[BinaryIO]:
67+
tmp_path = path.with_suffix(f".{uuid.uuid4().hex}.partial")
68+
try:
69+
with tmp_path.open(mode) as f:
70+
yield f
71+
if exclusive:
72+
_safe_move(tmp_path, path)
73+
else:
74+
tmp_path.replace(path)
75+
except Exception:
76+
tmp_path.unlink(missing_ok=True)
77+
raise
78+
79+
4480
def _put(
4581
path: Path,
4682
value: Buffer,
4783
start: int | None = None,
4884
exclusive: bool = False,
4985
) -> int | None:
5086
path.parent.mkdir(parents=True, exist_ok=True)
87+
# write takes any object supporting the buffer protocol
88+
view = value.as_buffer_like()
5189
if start is not None:
5290
with path.open("r+b") as f:
5391
f.seek(start)
54-
# write takes any object supporting the buffer protocol
55-
f.write(value.as_buffer_like())
92+
f.write(view)
5693
return None
5794
else:
58-
view = value.as_buffer_like()
59-
if exclusive:
60-
mode = "xb"
61-
else:
62-
mode = "wb"
63-
with path.open(mode=mode) as f:
64-
# write takes any object supporting the buffer protocol
95+
with _atomic_write(path, "wb", exclusive=exclusive) as f:
6596
return f.write(view)
6697

6798

tests/test_store/test_local.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from zarr import create_array
1111
from zarr.core.buffer import Buffer, cpu
1212
from zarr.storage import LocalStore
13+
from zarr.storage._local import _atomic_write
1314
from zarr.testing.store import StoreTests
1415
from zarr.testing.utils import assert_bytes_equal
1516

@@ -109,3 +110,46 @@ async def test_move(
109110
FileExistsError, match=re.escape(f"Destination root {destination} already exists")
110111
):
111112
await store2.move(destination)
113+
114+
115+
@pytest.mark.parametrize("exclusive", [True, False])
116+
def test_atomic_write_successful(tmp_path: pathlib.Path, exclusive: bool) -> None:
117+
path = pathlib.Path(tmp_path) / "data"
118+
with _atomic_write(path, "wb", exclusive=exclusive) as f:
119+
f.write(b"abc")
120+
assert path.read_bytes() == b"abc"
121+
assert list(path.parent.iterdir()) == [path] # no temp files
122+
123+
124+
@pytest.mark.parametrize("exclusive", [True, False])
125+
def test_atomic_write_incomplete(tmp_path: pathlib.Path, exclusive: bool) -> None:
126+
path = pathlib.Path(tmp_path) / "data"
127+
with pytest.raises(RuntimeError): # noqa: PT012
128+
with _atomic_write(path, "wb", exclusive=exclusive) as f:
129+
f.write(b"a")
130+
raise RuntimeError
131+
assert not path.exists()
132+
assert list(path.parent.iterdir()) == [] # no temp files
133+
134+
135+
def test_atomic_write_non_exclusive_preexisting(tmp_path: pathlib.Path) -> None:
136+
path = pathlib.Path(tmp_path) / "data"
137+
with path.open("wb") as f:
138+
f.write(b"xyz")
139+
assert path.read_bytes() == b"xyz"
140+
with _atomic_write(path, "wb", exclusive=False) as f:
141+
f.write(b"abc")
142+
assert path.read_bytes() == b"abc"
143+
assert list(path.parent.iterdir()) == [path] # no temp files
144+
145+
146+
def test_atomic_write_exclusive_preexisting(tmp_path: pathlib.Path) -> None:
147+
path = pathlib.Path(tmp_path) / "data"
148+
with path.open("wb") as f:
149+
f.write(b"xyz")
150+
assert path.read_bytes() == b"xyz"
151+
with pytest.raises(FileExistsError):
152+
with _atomic_write(path, "wb", exclusive=True) as f:
153+
f.write(b"abc")
154+
assert path.read_bytes() == b"xyz"
155+
assert list(path.parent.iterdir()) == [path] # no temp files

0 commit comments

Comments
 (0)