Skip to content

Commit 06cf7bd

Browse files
authored
Wrap Tensorstore Calls with Retry (#1319)
* Wrap tensorstore calls with retry * Write changelog
1 parent 1202254 commit 06cf7bd

File tree

2 files changed

+91
-37
lines changed

2 files changed

+91
-37
lines changed

webknossos/Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ For upgrade instructions, please check the respective _Breaking Changes_ section
1818
- Added a feature to track attached files of a segmentation layer. Previously, these files were only auto-detected by WEBKNOSSOS based on the location in special folders. Attachments can be added, e.g. `dataset.get_segmentation_layer("segmentation").attachments.add_mesh('path/to/meshfile.hdf5')`. [#1312](https://github.com/scalableminds/webknossos-libs/pull/1312)
1919
- Added a `with_attachments` parameter to `Dataset.copy_dataset` to copy attachments. [#1312](https://github.com/scalableminds/webknossos-libs/pull/1312)
2020
- Added a `get_segmentation_layer` method to `Dataset` to get a correctly-typed segmentation layer by name. [#1312](https://github.com/scalableminds/webknossos-libs/pull/1312)
21+
- Retry asynchronous tensorstore calls on failure to make operation on object storage datasets accessed via network more robust. [#1319](https://github.com/scalableminds/webknossos-libs/pull/1319)
2122

2223
### Changed
2324
- When adding a layer, the used dtype is only valid if it is supported by webknossos. [#1316](https://github.com/scalableminds/webknossos-libs/pull/1316)

webknossos/webknossos/dataset/_array.py

Lines changed: 90 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
import re
2+
import time
23
from abc import ABC, abstractmethod
3-
from collections.abc import Iterable, Iterator
4+
from collections.abc import Callable, Iterable, Iterator
45
from dataclasses import dataclass
56
from functools import lru_cache
7+
from logging import getLogger
68
from os.path import relpath
79
from pathlib import Path
810
from tempfile import mkdtemp
911
from typing import (
1012
Any,
1113
Literal,
14+
TypeVar,
1215
)
1316
from urllib.parse import urlparse
1417

@@ -22,8 +25,12 @@
2225
from ..utils import is_fs_path
2326
from .data_format import DataFormat
2427

28+
logger = getLogger(__name__)
29+
2530
TS_CONTEXT = tensorstore.Context()
2631

32+
DEFAULT_NUM_RETRIES = 5
33+
2734

2835
def _is_power_of_two(num: int) -> bool:
2936
return num & (num - 1) == 0
@@ -33,6 +40,35 @@ class ArrayException(Exception):
3340
pass
3441

3542

43+
ReturnType = TypeVar("ReturnType")
44+
45+
46+
def call_with_retries(
47+
fn: Callable[[], ReturnType],
48+
num_retries: int = DEFAULT_NUM_RETRIES,
49+
description: str = "",
50+
) -> ReturnType:
51+
"""Call a function, retrying up to `num_retries` times on an exception during the call. Useful for retrying requests or network io."""
52+
last_exception = None
53+
for i in range(num_retries):
54+
try:
55+
return fn()
56+
except Exception as e: # noqa: PERF203 # allow try except in loop
57+
logger.warning(
58+
f"{description} attempt {i + 1}/{num_retries} failed, retrying..."
59+
f"Error was: {e}"
60+
)
61+
# We introduce some randomness to avoid multiple processes retrying at the same time
62+
random_factor = np.random.uniform(0.5, 1.5)
63+
time.sleep(1 * (1.5**i) * random_factor)
64+
last_exception = e
65+
# If the last attempt fails, we log the error and raise it.
66+
# This is important to avoid silent failures.
67+
logger.error(f"{description} failed after {num_retries} attempts.")
68+
assert last_exception is not None, "last_exception should never be None here"
69+
raise last_exception
70+
71+
3672
@dataclass
3773
class ArrayInfo:
3874
data_format: DataFormat
@@ -473,15 +509,18 @@ def open(cls, path: Path) -> "TensorStoreArray":
473509
def _open(cls, path: Path) -> Self:
474510
try:
475511
uri = cls._make_kvstore(path)
476-
_array = tensorstore.open(
477-
{
478-
"driver": str(cls.data_format),
479-
"kvstore": uri,
480-
},
481-
open=True,
482-
create=False,
483-
context=TS_CONTEXT,
484-
).result() # check that everything exists
512+
_array = call_with_retries(
513+
lambda: tensorstore.open(
514+
{
515+
"driver": str(cls.data_format),
516+
"kvstore": uri,
517+
},
518+
open=True,
519+
create=False,
520+
context=TS_CONTEXT,
521+
).result(),
522+
description="Opening tensorstore array",
523+
) # check that everything exists
485524
return cls(path, _array)
486525
except Exception as exc:
487526
raise ArrayException(f"Could not open array at {uri}.") from exc
@@ -506,18 +545,18 @@ def read(self, bbox: NDBoundingBox) -> np.ndarray:
506545
)
507546
available_domain = requested_domain.intersect(array.domain)
508547

548+
data = call_with_retries(
549+
lambda: array[available_domain].read(order="F").result(),
550+
description="Reading tensorstore array",
551+
)
509552
if available_domain != requested_domain:
510553
# needs padding
511554
out = np.zeros(
512555
requested_domain.shape, dtype=array.dtype.numpy_dtype, order="F"
513556
)
514-
data = array[available_domain].read(order="F").result()
515557
out[tuple(slice(0, data.shape[i]) for i in range(len(data.shape)))] = data
516-
if not has_channel_dimension:
517-
out = np.expand_dims(out, 0)
518-
return out
519-
520-
out = array[requested_domain].read(order="F").result()
558+
else:
559+
out = data
521560
if not has_channel_dimension:
522561
out = np.expand_dims(out, 0)
523562
return out
@@ -539,24 +578,30 @@ def resize(self, new_bbox: NDBoundingBox) -> None:
539578

540579
if new_domain != array.domain:
541580
# Check on-disk for changes to shape
542-
current_array = tensorstore.open(
543-
{
544-
"driver": str(self.data_format),
545-
"kvstore": self._make_kvstore(self._path),
546-
},
547-
context=TS_CONTEXT,
548-
).result()
581+
current_array = call_with_retries(
582+
lambda: tensorstore.open(
583+
{
584+
"driver": str(self.data_format),
585+
"kvstore": self._make_kvstore(self._path),
586+
},
587+
context=TS_CONTEXT,
588+
).result(),
589+
description="Opening tensorstore array for resizing",
590+
)
549591
if array.domain != current_array.domain:
550592
raise RuntimeError(
551593
f"While resizing the Zarr array at {self._path}, a differing shape ({array.domain} != {current_array.domain}) was found in the currently persisted metadata."
552594
+ "This is likely happening because multiple processes changed the metadata of this array."
553595
)
554596

555-
self._cached_array = array.resize(
556-
inclusive_min=None,
557-
exclusive_max=new_domain.exclusive_max,
558-
resize_metadata_only=True,
559-
).result()
597+
self._cached_array = call_with_retries(
598+
lambda: array.resize(
599+
inclusive_min=None,
600+
exclusive_max=new_domain.exclusive_max,
601+
resize_metadata_only=True,
602+
).result(),
603+
description="Resizing tensorstore array",
604+
)
560605

561606
def write(self, bbox: NDBoundingBox, data: np.ndarray) -> None:
562607
if data.ndim == len(bbox):
@@ -574,7 +619,10 @@ def write(self, bbox: NDBoundingBox, data: np.ndarray) -> None:
574619
inclusive_min=(0,) + bbox.topleft.to_tuple(),
575620
shape=(self.info.num_channels,) + bbox.size.to_tuple(),
576621
)
577-
array[requested_domain].write(data).result()
622+
call_with_retries(
623+
lambda: array[requested_domain].write(data).result(),
624+
description="Writing tensorstore array",
625+
)
578626

579627
def _chunk_key_encoding(self) -> tuple[Literal["default", "v2"], Literal["/", "."]]:
580628
raise NotImplementedError
@@ -593,7 +641,9 @@ def _try_parse_ints(vec: Iterable[Any]) -> list[int] | None:
593641
return None
594642
return output
595643

596-
keys = kvstore.list().result()
644+
keys = call_with_retries(
645+
lambda: kvstore.list().result(), description="Listing keys in kvstore"
646+
)
597647
for key in keys:
598648
key_parts = key.decode("utf-8").split(separator)
599649
if _type == "default":
@@ -640,13 +690,16 @@ def close(self) -> None:
640690
def _array(self) -> tensorstore.TensorStore:
641691
if self._cached_array is None:
642692
try:
643-
self._cached_array = tensorstore.open(
644-
{
645-
"driver": str(self.data_format),
646-
"kvstore": self._make_kvstore(self._path),
647-
},
648-
context=TS_CONTEXT,
649-
).result()
693+
self._cached_array = call_with_retries(
694+
lambda: tensorstore.open(
695+
{
696+
"driver": str(self.data_format),
697+
"kvstore": self._make_kvstore(self._path),
698+
},
699+
context=TS_CONTEXT,
700+
).result(),
701+
description="Creating tensorstore array",
702+
)
650703
except Exception as e:
651704
raise ArrayException(
652705
f"Exception while opening array for {self._make_kvstore(self._path)}"

0 commit comments

Comments
 (0)