Skip to content

Commit 4ab8dec

Browse files
Add tests for ZarrEncoder
1 parent 9193834 commit 4ab8dec

File tree

2 files changed

+102
-7
lines changed

2 files changed

+102
-7
lines changed

bio2zarr/core.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def swap_buffers(self):
3131
def async_flush(self, executor, offset, buff_stop=None):
3232
return async_flush_array(executor, self.buff[:buff_stop], self.array, offset)
3333

34+
# TODO: factor these functions into the BufferedArray class
3435

3536
def sync_flush_array(np_buffer, zarr_array, offset):
3637
zarr_array[offset : offset + np_buffer.shape[0]] = np_buffer
@@ -72,7 +73,9 @@ def flush_chunk(start, stop):
7273

7374

7475
class ThreadedZarrEncoder(contextlib.AbstractContextManager):
75-
def __init__(self, buffered_arrays, encoder_threads):
76+
# TODO (maybe) add option with encoder_threads=None to run synchronously for
77+
# debugging using a mock Executor
78+
def __init__(self, buffered_arrays, encoder_threads=1):
7679
self.buffered_arrays = buffered_arrays
7780
self.executor = cf.ThreadPoolExecutor(max_workers=encoder_threads)
7881
self.chunk_length = buffered_arrays[0].chunk_length
@@ -99,8 +102,6 @@ def swap_buffers(self):
99102
self.wait_on_futures()
100103
self.futures = []
101104
for ba in self.buffered_arrays:
102-
# TODO add debug log
103-
# print("Scheduling", ba.array, offset, buff_stop)
104105
self.futures.extend(
105106
ba.async_flush(self.executor, self.array_offset, self.next_row)
106107
)
@@ -112,9 +113,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
112113
self.next_row += 1
113114
self.swap_buffers()
114115
self.wait_on_futures()
115-
# TODO add arguments to wait and cancel_futures appropriate
116-
# for the an error condition occuring here. Generally need
117-
# to think about the error exit condition here (like running
118-
# out of disk space) to see what the right behaviour is.
116+
else:
117+
for future in self.futures:
118+
future.cancel()
119119
self.executor.shutdown()
120120
return False

tests/test_core.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import numpy as np
2+
import numpy.testing as nt
3+
import pytest
4+
import zarr
5+
6+
from bio2zarr import core
7+
8+
9+
def encode_arrays(arrays, data, encoder_threads=1):
10+
buffered_arrays = [core.BufferedArray(a) for a in arrays]
11+
assert len(arrays) == len(data)
12+
for a, d in zip(arrays, data):
13+
assert a.shape == d.shape
14+
assert a.shape[0] == arrays[0].shape[0]
15+
data_row = 0
16+
with core.ThreadedZarrEncoder(buffered_arrays, encoder_threads) as tze:
17+
for data_row in range(len(data[0])):
18+
j = tze.next_buffer_row()
19+
for ba, data_array in zip(buffered_arrays, data):
20+
ba.buff[j] = data_array[data_row]
21+
22+
23+
class TestZarrEncoder:
24+
@pytest.mark.parametrize(
25+
["data", "chunk_size"],
26+
[
27+
(np.arange(10), (1,)),
28+
(np.arange(10), (3,)),
29+
(np.arange(10), (5,)),
30+
(np.arange(10), (10,)),
31+
(np.arange(10, dtype=np.int8), (3,)),
32+
(np.arange(10, dtype=np.int32), (3,)),
33+
(np.arange(10, dtype=np.float32), (3,)),
34+
(np.arange(10, dtype=np.float64), (3,)),
35+
(-1 * np.arange(100, dtype=np.int32)[::-1], (7,)),
36+
# 2D arrays
37+
(np.arange(16).reshape((4, 4)), (1, 4)),
38+
(np.arange(16).reshape((4, 4)), (3, 3)),
39+
(np.arange(16).reshape((4, 4)), (16, 1)),
40+
# 3D arrays
41+
(np.arange(32).reshape((8, 2, 2)), (1, 4, 2)),
42+
],
43+
)
44+
def test_single_array(self, data, chunk_size):
45+
a = zarr.empty_like(data, chunks=chunk_size)
46+
encode_arrays([a], [data])
47+
nt.assert_array_equal(a[:], data)
48+
49+
@pytest.mark.parametrize("chunk_size", range(1, 6))
50+
def test_multi_array(self, chunk_size):
51+
n = 33
52+
data = [
53+
np.arange(n),
54+
np.arange(n, dtype=np.int32),
55+
np.arange(n, dtype=np.float64),
56+
]
57+
arrays = [zarr.empty_like(d, chunks=(chunk_size,)) for d in data]
58+
encode_arrays(arrays, data)
59+
60+
@pytest.mark.parametrize("threads", range(1, 6))
61+
def test_single_array_threads(self, threads):
62+
data = np.arange(10_333)
63+
a = zarr.empty_like(data, chunks=(100,))
64+
encode_arrays([a], [data], threads)
65+
nt.assert_array_equal(a[:], data)
66+
67+
def test_error_in_user_code(self):
68+
data = list(range(10)) + ["string"]
69+
a = zarr.empty(len(data), chunks=(1,), dtype=int)
70+
ba = core.BufferedArray(a)
71+
72+
with pytest.raises(ValueError, match="int()"):
73+
with core.ThreadedZarrEncoder([ba]) as tze:
74+
for d in data:
75+
j = tze.next_buffer_row()
76+
# This raises an error when "string" inserted to buffer
77+
ba.buff[j] = d
78+
79+
def test_error_in_encode(self):
80+
data = np.array([1])
81+
a = zarr.empty_like(data, chunks=(1,))
82+
ba = core.BufferedArray(a)
83+
84+
with pytest.raises(ValueError, match="int()"):
85+
with core.ThreadedZarrEncoder([ba]) as tze:
86+
for d in data:
87+
j = tze.next_buffer_row()
88+
# This raises an error when "string" inserted to buffer
89+
ba.buff[j] = d
90+
# We only flush on exiting the context manager, so switch the
91+
# buffer for something nasty.
92+
# NB: this is the only reliable way I can think of raising
93+
# an error in the futures. In reality these will happen
94+
# when we run out of disk space, but this is hard to simulate
95+
ba.buff = np.array(["not an integer"])

0 commit comments

Comments
 (0)