Skip to content

Commit bd6677d

Browse files
Denis Barakhtanov0xE0F
authored andcommitted
Fix ftest and bubble up worker errors
Features: pytorch Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com>
1 parent a57a352 commit bd6677d

File tree

2 files changed

+48
-38
lines changed

2 files changed

+48
-38
lines changed

src/client/pydaos/torch/torch_api.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import math
1717
import os
1818
import stat
19+
import sys
1920
from multiprocessing import Process, Queue
2021
from pathlib import Path
2122

@@ -374,15 +375,19 @@ def __init__(self, dfs, path, mode, open_flags, class_name,
374375
self._workers.append(worker)
375376

376377
def _worker_fn(self, queue):
377-
self._dfs.worker_init()
378-
while True:
379-
work = queue.get()
380-
if work is None:
381-
break
382-
383-
(offset, chunk) = work
384-
self._dfs.write(self._path, self._mode, self._oflags,
385-
self._class_name, self._file_chunk_size, offset, chunk)
378+
try:
379+
self._dfs.worker_init()
380+
while True:
381+
work = queue.get()
382+
if work is None:
383+
break
384+
385+
(offset, chunk) = work
386+
self._dfs.write(self._path, self._mode, self._oflags,
387+
self._class_name, self._file_chunk_size, offset, chunk)
388+
# pylint: disable=broad-exception-caught
389+
except Exception as e:
390+
sys.exit(getattr(e, 'errno', errno.EIO))
386391

387392
def write(self, data):
388393
""" Writes data to the buffer."""
@@ -433,6 +438,11 @@ def close(self):
433438
for worker in self._workers:
434439
worker.join()
435440

441+
# lets see if any worker exited abnormally and if so, raise an error
442+
for worker in self._workers:
443+
if worker.exitcode != 0:
444+
raise OSError(worker.exitcode, os.strerror(worker.exitcode))
445+
436446
super().close()
437447

438448
def _flush(self):

src/tests/ftest/pytorch/checkpoint.py

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -84,28 +84,30 @@ def test_checkpoint_nested_directories(self):
8484
:avocado: tags=PytorchCheckpointTest,test_checkpoint_nested_directories
8585
"""
8686

87-
pool = self.get_pool()
88-
container = self.get_container(pool)
89-
data = os.urandom(4096)
87+
pool = self.get_pool().identifier
88+
container = self.get_container(pool).identifier
9089

91-
files = ["/file.pt", "/one/file.pt", "/one/two/file.pt"]
90+
d1, d2 = str(uuid.uuid4()), str(uuid.uuid4())
91+
files = ["/file.pt", f"/{d1}/file.pt", f"/{d1}/{d2}/file.pt"]
9292

93+
# by default parent directories should be created
9394
with Checkpoint(pool, container) as pt:
94-
# By default parent should be created
9595
for name in files:
9696
with pt.writer(name) as w:
97-
w.write(data)
97+
w.write(os.urandom(4096))
9898

99-
try:
99+
# ensure that it fails with expected exception
100+
try:
101+
with Checkpoint(pool, container) as pt:
100102
fname = f"/{str(uuid.uuid4())}/file.pt"
101103
with pt.writer(fname, ensure_path=False) as w:
102-
w.write(data)
104+
w.write(os.urandom(4096))
103105
raise RuntimeError("expected OSError with errno.ENOENT")
104-
except OSError as e:
105-
if e.errno != errno.ENOENT:
106-
raise RuntimeError(f"expected errno.ENOENT, got {os.strerror(e.errno)}") from e
107-
except Exception as e:
108-
raise RuntimeError(f"unexpected error: {e}") from e
106+
except OSError as e:
107+
if e.errno != errno.ENOENT:
108+
raise RuntimeError(f"expected errno.ENOENT, got {os.strerror(e.errno)}") from e
109+
except Exception as e:
110+
raise RuntimeError(f"unexpected error: {e}") from e
109111

110112
def _test_checkpoint(self, pool, cont, writes, chunk_size=0, chunks_limit=0, workers=0):
111113
"""Creates a checkpoint with the given parameters, writes the given data to it,
@@ -114,19 +116,17 @@ def _test_checkpoint(self, pool, cont, writes, chunk_size=0, chunks_limit=0, wor
114116

115117
self.log.info("Checkpoint test: writes=%s, chunk_size=%s, chunks_limit=%s, workers=%s",
116118
len(writes), chunk_size, chunks_limit, workers)
117-
chkp = Checkpoint(pool, cont, transfer_chunk_size=chunk_size, chunks_limit=chunks_limit,
118-
workers=workers)
119-
120-
expected = bytearray()
121-
fname = str(uuid.uuid4())
122-
with chkp.writer(fname) as w:
123-
for chunk in writes:
124-
w.write(chunk)
125-
expected.extend(chunk)
126-
127-
actual = chkp.reader(fname)
128-
if expected != actual.getvalue():
129-
self.fail(
130-
f"checkpoint did not read back the expected content for {len(writes)} writes,"
131-
f"chunk_size={chunk_size}, chunks_limit={chunks_limit}, workers={workers}")
132-
del chkp
119+
with Checkpoint(pool, cont, transfer_chunk_size=chunk_size, chunks_limit=chunks_limit,
120+
workers=workers) as chkp:
121+
expected = bytearray()
122+
fname = str(uuid.uuid4())
123+
with chkp.writer(fname) as w:
124+
for chunk in writes:
125+
w.write(chunk)
126+
expected.extend(chunk)
127+
128+
actual = chkp.reader(fname)
129+
if expected != actual.getvalue():
130+
self.fail(
131+
f"checkpoint did not read back the expected content for {len(writes)} writes,"
132+
f"chunk_size={chunk_size}, chunks_limit={chunks_limit}, workers={workers}")

0 commit comments

Comments
 (0)