Skip to content

Commit 526cce1

Browse files
Add memory budget
1 parent bdabfdb commit 526cce1

File tree

4 files changed

+87
-7
lines changed

4 files changed

+87
-7
lines changed

bio2zarr/cli.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,13 @@ def mkschema(if_path):
109109
"schema tuning."
110110
),
111111
)
112+
@click.option(
113+
"-M",
114+
"--max-memory",
115+
type=int,
116+
default=None,
117+
help="An approximate bound on overall memory usage in megabytes",
118+
)
112119
@worker_processes
113120
def encode(
114121
if_path,
@@ -118,6 +125,7 @@ def encode(
118125
chunk_length,
119126
chunk_width,
120127
max_variant_chunks,
128+
max_memory,
121129
worker_processes,
122130
):
123131
"""
@@ -132,6 +140,7 @@ def encode(
132140
chunk_width=chunk_width,
133141
max_v_chunks=max_variant_chunks,
134142
worker_processes=worker_processes,
143+
max_memory=max_memory,
135144
show_progress=True,
136145
)
137146

bio2zarr/core.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def __init__(self, worker_processes=1, progress_config=None):
180180
self.executor = cf.ProcessPoolExecutor(
181181
max_workers=worker_processes,
182182
)
183-
self.futures = []
183+
self.futures = set()
184184

185185
set_progress(0)
186186
if progress_config is None:
@@ -219,7 +219,19 @@ def _update_progress_worker(self):
219219
logger.debug("Exit progress thread")
220220

221221
def submit(self, *args, **kwargs):
222-
self.futures.append(self.executor.submit(*args, **kwargs))
222+
future = self.executor.submit(*args, **kwargs)
223+
self.futures.add(future)
224+
return future
225+
226+
def wait_for_completed(self, timeout):
227+
done, not_done = cf.wait(self.futures, timeout, cf.FIRST_COMPLETED)
228+
for future in done:
229+
exception = future.exception()
230+
# TODO do the check for BrokenProcessPool here
231+
if exception is not None:
232+
raise exception
233+
self.futures = not_done
234+
return done
223235

224236
def results_as_completed(self):
225237
for future in cf.as_completed(self.futures):

bio2zarr/vcf.py

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def display_number(x):
4848

4949

5050
def display_size(n):
51-
return humanfriendly.format_size(n)
51+
return humanfriendly.format_size(n, binary=True)
5252

5353

5454
@dataclasses.dataclass
@@ -1289,6 +1289,7 @@ class EncodingWork:
12891289
func: callable
12901290
start: int
12911291
stop: int
1292+
memory: int = 0
12921293

12931294

12941295
class VcfZarrWriter:
@@ -1478,7 +1479,17 @@ def encode(
14781479
worker_processes=1,
14791480
max_v_chunks=None,
14801481
show_progress=False,
1482+
max_memory=None,
14811483
):
1484+
if max_memory is None:
1485+
# Unbounded
1486+
max_memory = 2**63
1487+
else:
1488+
# Value is specified in Mibibytes
1489+
max_memory *= 2**20
1490+
1491+
# TODO this will move into the setup logic later when we're making it possible
1492+
# to split the work by slice
14821493
num_slices = max(1, worker_processes * 4)
14831494
# Using POS arbitrarily to get the array slices
14841495
slices = core.chunk_aligned_slices(
@@ -1492,8 +1503,16 @@ def encode(
14921503
array.resize(shape)
14931504

14941505
total_bytes = 0
1506+
encoding_memory_requirements = {}
14951507
for col in self.schema.columns.values():
14961508
array = self.get_array(col.name)
1509+
# NOTE!! this is bad, we're potentially creating quite a large
1510+
# numpy array for basically nothing. We can compute this.
1511+
variant_chunk_size = array.blocks[0].nbytes
1512+
encoding_memory_requirements[col.name] = variant_chunk_size
1513+
logger.debug(
1514+
f"{col.name} requires at least {display_size(variant_chunk_size)} per worker"
1515+
)
14971516
total_bytes += array.nbytes
14981517

14991518
filter_id_map = self.encode_filter_id()
@@ -1504,7 +1523,11 @@ def encode(
15041523
for col in self.schema.columns.values():
15051524
if col.vcf_field is not None:
15061525
f = functools.partial(self.encode_array_slice, col)
1507-
work.append(EncodingWork(f, start, stop))
1526+
work.append(
1527+
EncodingWork(
1528+
f, start, stop, encoding_memory_requirements[col.name]
1529+
)
1530+
)
15081531
work.append(EncodingWork(self.encode_alleles_slice, start, stop))
15091532
work.append(EncodingWork(self.encode_id_slice, start, stop))
15101533
work.append(
@@ -1522,18 +1545,51 @@ def encode(
15221545
)
15231546
)
15241547
if "call_genotype" in self.schema.columns:
1525-
work.append(EncodingWork(self.encode_genotypes_slice, start, stop))
1548+
gt_memory = sum(
1549+
encoding_memory_requirements[name]
1550+
for name in [
1551+
"call_genotype",
1552+
"call_genotype_phased",
1553+
"call_genotype_mask",
1554+
]
1555+
)
1556+
work.append(
1557+
EncodingWork(self.encode_genotypes_slice, start, stop, gt_memory)
1558+
)
1559+
# Fail early if we can't fit a particular column into memory
1560+
for wp in work:
1561+
if wp.memory >= max_memory:
1562+
raise ValueError(f"Insufficient memory for {wp.func}: "
1563+
f"{display_size(wp.memory)} > {display_size(max_memory)}")
1564+
15261565

15271566
progress_config = core.ProgressConfig(
15281567
total=total_bytes,
15291568
title="Encode",
15301569
units="B",
15311570
show=show_progress,
15321571
)
1572+
used_memory = 0
15331573
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
1534-
pwm.submit(self.encode_samples)
1574+
future = pwm.submit(self.encode_samples)
1575+
future_to_memory_use = {future: 0}
15351576
for wp in work:
1536-
pwm.submit(wp.func, wp.start, wp.stop)
1577+
while used_memory + wp.memory >= max_memory:
1578+
logger.info(
1579+
f"Memory budget {display_size(max_memory)} exceeded: "
1580+
f"used={display_size(used_memory)} needed={display_size(wp.memory)}"
1581+
)
1582+
futures = pwm.wait_for_completed(timeout=5)
1583+
released_mem = sum(
1584+
future_to_memory_use.pop(future) for future in futures
1585+
)
1586+
logger.info(
1587+
f"{len(futures)} completed, released {display_size(released_mem)}"
1588+
)
1589+
used_memory -= released_mem
1590+
future = pwm.submit(wp.func, wp.start, wp.stop)
1591+
used_memory += wp.memory
1592+
future_to_memory_use[future] = wp.memory
15371593

15381594

15391595
def mkschema(if_path, out):
@@ -1549,6 +1605,7 @@ def encode(
15491605
chunk_length=None,
15501606
chunk_width=None,
15511607
max_v_chunks=None,
1608+
max_memory=None,
15521609
worker_processes=1,
15531610
show_progress=False,
15541611
):
@@ -1574,6 +1631,7 @@ def encode(
15741631
vzw.encode(
15751632
max_v_chunks=max_v_chunks,
15761633
worker_processes=worker_processes,
1634+
max_memory=max_memory,
15771635
show_progress=show_progress,
15781636
)
15791637
vzw.finalise()

tests/test_cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def test_encode(self):
6868
chunk_width=None,
6969
max_v_chunks=None,
7070
worker_processes=1,
71+
max_memory=None,
7172
show_progress=True,
7273
)
7374

0 commit comments

Comments
 (0)