Skip to content

Commit 79f366a

Browse files
A few minor usability improvements
1 parent bee6a7b commit 79f366a

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

bio2zarr/core.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ def wait_on_futures(futures):
5353
for future in cf.as_completed(futures):
5454
exception = future.exception()
5555
if exception is not None:
56-
raise exception
56+
cancel_futures(futures)
57+
if isinstance(exception, cf.process.BrokenProcessPool):
58+
raise RuntimeError(
59+
"Worker process died: you may have run out of memory") from exception
60+
else:
61+
raise exception
5762

5863

5964
def cancel_futures(futures):
@@ -74,7 +79,10 @@ def __init__(self, array, offset):
7479
assert offset % array.chunks[0] == 0
7580
dims = list(array.shape)
7681
dims[0] = min(array.chunks[0], array.shape[0])
77-
self.buff = np.zeros(dims, dtype=array.dtype)
82+
self.buff = np.empty(dims, dtype=array.dtype)
83+
# Explicitly Fill with zeros here to make any out-of-memory errors happen
84+
# quickly.
85+
self.buff[:] = 0
7886
self.buffer_row = 0
7987

8088
@property

bio2zarr/vcf.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,13 +1529,27 @@ def encode(
15291529
)
15301530
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
15311531
if "call_genotype" in conversion_spec.columns:
1532-
logger.info(f"Submit encode call_genotypes in {len(slices)} slices")
1532+
arrays = [
1533+
sgvcf.root["call_genotype"],
1534+
sgvcf.root["call_genotype_phased"],
1535+
sgvcf.root["call_genotype_mask"],
1536+
]
1537+
min_mem = sum(array.blocks[0].nbytes for array in arrays)
1538+
logger.info(
1539+
f"Submit encode call_genotypes in {len(slices)} slices. "
1540+
f"Min per-worker mem={display_size(min_mem)}"
1541+
)
15331542
for start, stop in slices:
15341543
pwm.submit(sgvcf.encode_genotypes_slice, pcvcf, start, stop)
15351544

15361545
for col in chunked_2d:
15371546
if col.vcf_field is not None:
1538-
logger.info(f"Submit encode {col.name} in {len(slices)} slices")
1547+
array = sgvcf.root[col.name]
1548+
min_mem = array.blocks[0].nbytes
1549+
logger.info(
1550+
f"Submit encode {col.name} in {len(slices)} slices. "
1551+
f"Min per-worker mem={display_size(min_mem)}"
1552+
)
15391553
for start, stop in slices:
15401554
pwm.submit(
15411555
sgvcf.encode_column_slice, pcvcf, col, start, stop

0 commit comments

Comments
 (0)