Skip to content

Commit 20e8c58

Browse files
Merge pull request #83 from jeromekelleher/finalise-per-array
Finalise each array as it finishes
2 parents 8c3ec06 + e5d61f6 commit 20e8c58

File tree

1 file changed

+49
-28
lines changed

1 file changed

+49
-28
lines changed

bio2zarr/vcf.py

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1286,7 +1286,7 @@ def summary_table(self):
12861286

12871287
@dataclasses.dataclass
12881288
class EncodingWork:
1289-
func: callable
1289+
func: callable = dataclasses.field(repr=False)
12901290
start: int
12911291
stop: int
12921292
columns: list[str]
@@ -1319,12 +1319,12 @@ def init_array(self, variable):
13191319
def get_array(self, name):
13201320
return self.root["wip_" + name]
13211321

1322-
def finalise_array(self, variable):
1323-
source = self.path / ("wip_" + variable.name)
1324-
dest = self.path / variable.name
1322+
def finalise_array(self, variable_name):
1323+
source = self.path / ("wip_" + variable_name)
1324+
dest = self.path / variable_name
13251325
# Atomic swap
13261326
os.rename(source, dest)
1327-
logger.debug(f"Finalised {variable.name}")
1327+
logger.info(f"Finalised {variable_name}")
13281328

13291329
def encode_array_slice(self, column, start, stop):
13301330
source_col = self.pcvcf.columns[column.vcf_field]
@@ -1471,8 +1471,8 @@ def init(self):
14711471
self.init_array(column)
14721472

14731473
def finalise(self):
1474-
for column in self.schema.columns.values():
1475-
self.finalise_array(column)
1474+
# for column in self.schema.columns.values():
1475+
# self.finalise_array(column)
14761476
zarr.consolidate_metadata(self.path)
14771477

14781478
def encode(
@@ -1536,21 +1536,25 @@ def encode(
15361536
work.append(
15371537
EncodingWork(self.encode_alleles_slice, start, stop, ["variant_allele"])
15381538
)
1539-
work.append(EncodingWork(self.encode_id_slice, start, stop, ["variant_id"]))
1539+
work.append(
1540+
EncodingWork(
1541+
self.encode_id_slice, start, stop, ["variant_id", "variant_id_mask"]
1542+
)
1543+
)
15401544
work.append(
15411545
EncodingWork(
15421546
functools.partial(self.encode_filters_slice, filter_id_map),
15431547
start,
15441548
stop,
1545-
["variant_filters"],
1549+
["variant_filter"],
15461550
)
15471551
)
15481552
work.append(
15491553
EncodingWork(
15501554
functools.partial(self.encode_contig_slice, contig_id_map),
15511555
start,
15521556
stop,
1553-
["variant_contig_id"],
1557+
["variant_contig"],
15541558
)
15551559
)
15561560
if "call_genotype" in self.schema.columns:
@@ -1567,6 +1571,7 @@ def encode(
15671571
self.encode_genotypes_slice, start, stop, variables, gt_memory
15681572
)
15691573
)
1574+
15701575
# Fail early if we can't fit a particular column into memory
15711576
for wp in work:
15721577
if wp.memory >= max_memory:
@@ -1581,31 +1586,47 @@ def encode(
15811586
units="B",
15821587
show=show_progress,
15831588
)
1584-
# TODO add a map of slices completed to column here, so that we can
1585-
# finalise the arrays as they get completed. We'll have to service
1586-
# the futures more, though, not just when we exceed the memory budget
15871589

15881590
used_memory = 0
1591+
max_queued = 4 * max(1, worker_processes)
1592+
encoded_slices = collections.Counter()
1593+
15891594
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
15901595
future = pwm.submit(self.encode_samples)
1591-
future_to_memory_use = {future: 0}
1592-
for wp in work:
1593-
while used_memory + wp.memory >= max_memory:
1594-
logger.info(
1595-
f"Memory budget {display_size(max_memory)} exceeded: "
1596-
f"used={display_size(used_memory)} needed={display_size(wp.memory)}"
1597-
)
1598-
futures = pwm.wait_for_completed()
1599-
released_mem = sum(
1600-
future_to_memory_use.pop(future) for future in futures
1601-
)
1602-
logger.info(
1603-
f"{len(futures)} completed, released {display_size(released_mem)}"
1596+
future_to_work = {future: EncodingWork(None, 0, 0, [])}
1597+
1598+
def service_completed_futures():
1599+
nonlocal used_memory
1600+
1601+
completed = pwm.wait_for_completed()
1602+
for future in completed:
1603+
wp_done = future_to_work.pop(future)
1604+
used_memory -= wp_done.memory
1605+
logger.debug(
1606+
f"Complete {wp_done}: used mem={display_size(used_memory)}"
16041607
)
1605-
used_memory -= released_mem
1608+
for column in wp_done.columns:
1609+
encoded_slices[column] += 1
1610+
if encoded_slices[column] == len(slices):
1611+
# Do this syncronously for simplicity. Should be
1612+
# fine as the workers will probably be busy with
1613+
# large encode tasks most of the time.
1614+
self.finalise_array(column)
1615+
1616+
for wp in work:
1617+
if (
1618+
used_memory + wp.memory > max_memory
1619+
or len(future_to_work) > max_queued
1620+
):
1621+
service_completed_futures()
16061622
future = pwm.submit(wp.func, wp.start, wp.stop)
16071623
used_memory += wp.memory
1608-
future_to_memory_use[future] = wp.memory
1624+
logger.debug(f"Submit {wp}: used mem={display_size(used_memory)}")
1625+
future_to_work[future] = wp
1626+
1627+
logger.debug("All work submitted")
1628+
while len(future_to_work) > 0:
1629+
service_completed_futures()
16091630

16101631

16111632
def mkschema(if_path, out):

0 commit comments

Comments
 (0)