Skip to content

Commit 8c3ec06

Browse files
Merge pull request #81 from jeromekelleher/refactor-zarr-encode
Refactor zarr encode
2 parents 8acf0d4 + 986f999 commit 8c3ec06

File tree

5 files changed

+228
-147
lines changed

5 files changed

+228
-147
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# 0.0.2 2024-03-xx
2+
3+
- Merged 1D and 2D encode steps into one, and change rate reporting to bytes
4+
- Add --max-memory for encode

bio2zarr/cli.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,13 @@ def mkschema(if_path):
109109
"schema tuning."
110110
),
111111
)
112+
@click.option(
113+
"-M",
114+
"--max-memory",
115+
type=int,
116+
default=None,
117+
help="An approximate bound on overall memory usage in megabytes",
118+
)
112119
@worker_processes
113120
def encode(
114121
if_path,
@@ -118,6 +125,7 @@ def encode(
118125
chunk_length,
119126
chunk_width,
120127
max_variant_chunks,
128+
max_memory,
121129
worker_processes,
122130
):
123131
"""
@@ -132,6 +140,7 @@ def encode(
132140
chunk_width=chunk_width,
133141
max_v_chunks=max_variant_chunks,
134142
worker_processes=worker_processes,
143+
max_memory=max_memory,
135144
show_progress=True,
136145
)
137146

bio2zarr/core.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def flush(self):
118118

119119
def sync_flush_1d_array(np_buffer, zarr_array, offset):
120120
zarr_array[offset : offset + np_buffer.shape[0]] = np_buffer
121-
update_progress(1)
121+
update_progress(np_buffer.nbytes)
122122

123123

124124
def sync_flush_2d_array(np_buffer, zarr_array, offset):
@@ -127,12 +127,15 @@ def sync_flush_2d_array(np_buffer, zarr_array, offset):
127127
# encoder implementations.
128128
s = slice(offset, offset + np_buffer.shape[0])
129129
chunk_width = zarr_array.chunks[1]
130+
# TODO use zarr chunks here to support non-uniform chunking later
131+
# and for simplicity
130132
zarr_array_width = zarr_array.shape[1]
131133
start = 0
132134
while start < zarr_array_width:
133135
stop = min(start + chunk_width, zarr_array_width)
134-
zarr_array[s, start:stop] = np_buffer[:, start:stop]
135-
update_progress(1)
136+
chunk_buffer = np_buffer[:, start:stop]
137+
zarr_array[s, start:stop] = chunk_buffer
138+
update_progress(chunk_buffer.nbytes)
136139
start = stop
137140

138141

@@ -177,15 +180,15 @@ def __init__(self, worker_processes=1, progress_config=None):
177180
self.executor = cf.ProcessPoolExecutor(
178181
max_workers=worker_processes,
179182
)
180-
self.futures = []
183+
self.futures = set()
181184

182185
set_progress(0)
183186
if progress_config is None:
184187
progress_config = ProgressConfig()
185188
self.progress_config = progress_config
186189
self.progress_bar = tqdm.tqdm(
187190
total=progress_config.total,
188-
desc=f"{progress_config.title:>9}",
191+
desc=f"{progress_config.title:>7}",
189192
unit_scale=True,
190193
unit=progress_config.units,
191194
smoothing=0.1,
@@ -216,7 +219,19 @@ def _update_progress_worker(self):
216219
logger.debug("Exit progress thread")
217220

218221
def submit(self, *args, **kwargs):
219-
self.futures.append(self.executor.submit(*args, **kwargs))
222+
future = self.executor.submit(*args, **kwargs)
223+
self.futures.add(future)
224+
return future
225+
226+
def wait_for_completed(self, timeout=None):
227+
done, not_done = cf.wait(self.futures, timeout, cf.FIRST_COMPLETED)
228+
for future in done:
229+
exception = future.exception()
230+
# TODO do the check for BrokenProcessPool here
231+
if exception is not None:
232+
raise exception
233+
self.futures = not_done
234+
return done
220235

221236
def results_as_completed(self):
222237
for future in cf.as_completed(self.futures):

0 commit comments

Comments
 (0)