Skip to content

Commit f77642e

Browse files
Split all encoding steps by slice
Closes #62
1 parent bae5b96 commit f77642e

File tree

2 files changed

+118
-130
lines changed

2 files changed

+118
-130
lines changed

bio2zarr/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def __init__(self, worker_processes=1, progress_config=None):
175175
self.progress_config = progress_config
176176
self.progress_bar = tqdm.tqdm(
177177
total=progress_config.total,
178-
desc=f"{progress_config.title:>7}",
178+
desc=f"{progress_config.title:>9}",
179179
unit_scale=True,
180180
unit=progress_config.units,
181181
smoothing=0.1,

bio2zarr/vcf.py

Lines changed: 117 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -581,22 +581,7 @@ def chunk_files(self, partition_id, start=0):
581581
def read_chunk(self, path):
582582
with open(path, "rb") as f:
583583
pkl = self.compressor.decode(f.read())
584-
return pickle.loads(pkl), len(pkl)
585-
586-
def iter_values_bytes(self):
587-
num_records = 0
588-
bytes_read = 0
589-
for partition_id in range(self.num_partitions):
590-
for chunk_path in self.chunk_files(partition_id):
591-
chunk, chunk_bytes = self.read_chunk(chunk_path)
592-
bytes_read += chunk_bytes
593-
for record in chunk:
594-
yield record, bytes_read
595-
num_records += 1
596-
if num_records != self.num_records:
597-
raise ValueError(
598-
f"Corruption detected: incorrect number of records in {str(self.path)}."
599-
)
584+
return pickle.loads(pkl)
600585

601586
def iter_values(self, start=None, stop=None):
602587
start = 0 if start is None else start
@@ -620,7 +605,7 @@ def iter_values(self, start=None, stop=None):
620605
)
621606

622607
for chunk_path in self.chunk_files(start_partition, start_chunk):
623-
chunk, _ = self.read_chunk(chunk_path)
608+
chunk = self.read_chunk(chunk_path)
624609
for record in chunk:
625610
if record_id == stop:
626611
return
@@ -630,7 +615,7 @@ def iter_values(self, start=None, stop=None):
630615
assert record_id > start
631616
for partition_id in range(start_partition + 1, self.num_partitions):
632617
for chunk_path in self.chunk_files(partition_id):
633-
chunk, _ = self.read_chunk(chunk_path)
618+
chunk = self.read_chunk(chunk_path)
634619
for record in chunk:
635620
if record_id == stop:
636621
return
@@ -641,7 +626,19 @@ def iter_values(self, start=None, stop=None):
641626
# but making a property for consistency with xarray etc
642627
@property
643628
def values(self):
644-
return [record for record, _ in self.iter_values_bytes()]
629+
ret = [None] * self.num_records
630+
j = 0
631+
for partition_id in range(self.num_partitions):
632+
for chunk_path in self.chunk_files(partition_id):
633+
chunk = self.read_chunk(chunk_path)
634+
for record in chunk:
635+
ret[j] = record
636+
j += 1
637+
if j != self.num_records:
638+
raise ValueError(
639+
f"Corruption detected: incorrect number of records in {str(self.path)}."
640+
)
641+
return ret
645642

646643
def sanitiser_factory(self, shape):
647644
"""
@@ -1288,7 +1285,6 @@ def encode_genotypes_slice(self, pcvcf, start, stop):
12881285
gt_phased = core.BufferedArray(self.root["call_genotype_phased"], start)
12891286

12901287
for value in source_col.iter_values(start, stop):
1291-
# core.update_progress(sys.getsizeof(value))
12921288
j = gt.next_buffer_row()
12931289
sanitise_value_int_2d(gt.buff, j, value[:, :-1])
12941290
j = gt_phased.next_buffer_row()
@@ -1300,26 +1296,70 @@ def encode_genotypes_slice(self, pcvcf, start, stop):
13001296
gt.flush()
13011297
gt_phased.flush()
13021298
gt_mask.flush()
1303-
logger.debug(f"GT slice {start}:{stop} done")
1299+
logger.debug(f"Encoded GT slice {start}:{stop}")
13041300

1305-
def encode_alleles(self, pcvcf):
1301+
def encode_alleles_slice(self, pcvcf, start, stop):
13061302
ref_col = pcvcf.columns["REF"]
13071303
alt_col = pcvcf.columns["ALT"]
1308-
ref_values = ref_col.values
1309-
alt_values = alt_col.values
1310-
allele_array = self.root["variant_allele"]
1311-
1312-
# We could do this chunk-by-chunk, but it doesn't seem worth the bother.
1313-
alleles = np.full(allele_array.shape, "", dtype="O")
1314-
for j, (ref, alt) in enumerate(zip(ref_values, alt_values)):
1315-
alleles[j, 0] = ref[0]
1316-
alleles[j, 1 : 1 + len(alt)] = alt
1317-
allele_array[:] = alleles
1318-
size = sum(
1319-
col.vcf_field.summary.uncompressed_size for col in [ref_col, alt_col]
1320-
)
1321-
logger.debug("alleles done")
1322-
core.update_progress(allele_array.nchunks)
1304+
alleles = core.BufferedArray(self.root["variant_allele"], start)
1305+
1306+
for ref, alt in zip(
1307+
ref_col.iter_values(start, stop), alt_col.iter_values(start, stop)
1308+
):
1309+
j = alleles.next_buffer_row()
1310+
alleles.buff[j, :] = STR_FILL
1311+
alleles.buff[j, 0] = ref[0]
1312+
alleles.buff[j, 1 : 1 + len(alt)] = alt
1313+
alleles.flush()
1314+
logger.debug(f"Encoded alleles slice {start}:{stop}")
1315+
1316+
def encode_id_slice(self, pcvcf, start, stop):
1317+
col = pcvcf.columns["ID"]
1318+
vid = core.BufferedArray(self.root["variant_id"], start)
1319+
vid_mask = core.BufferedArray(self.root["variant_id_mask"], start)
1320+
1321+
for value in col.iter_values(start, stop):
1322+
j = vid.next_buffer_row()
1323+
k = vid_mask.next_buffer_row()
1324+
assert j == k
1325+
if value is not None:
1326+
vid.buff[j] = value[0]
1327+
vid_mask.buff[j] = False
1328+
else:
1329+
vid.buff[j] = STR_MISSING
1330+
vid_mask.buff[j] = True
1331+
vid.flush()
1332+
vid_mask.flush()
1333+
logger.debug(f"Encoded ID slice {start}:{stop}")
1334+
1335+
def encode_filters_slice(self, pcvcf, lookup, start, stop):
1336+
col = pcvcf.columns["FILTERS"]
1337+
var_filter = core.BufferedArray(self.root["variant_filter"], start)
1338+
1339+
for value in col.iter_values(start, stop):
1340+
j = var_filter.next_buffer_row()
1341+
var_filter.buff[j] = False
1342+
try:
1343+
for f in value:
1344+
var_filter.buff[j, lookup[f]] = True
1345+
except IndexError:
1346+
raise ValueError(f"Filter '{f}' was not defined in the header.")
1347+
var_filter.flush()
1348+
logger.debug(f"Encoded FILTERS slice {start}:{stop}")
1349+
1350+
def encode_contig_slice(self, pcvcf, lookup, start, stop):
1351+
col = pcvcf.columns["CHROM"]
1352+
contig = core.BufferedArray(self.root["variant_contig"], start)
1353+
1354+
for value in col.iter_values(start, stop):
1355+
j = contig.next_buffer_row()
1356+
try:
1357+
contig.buff[j] = lookup[value[0]]
1358+
except KeyError:
1359+
# TODO add advice about adding it to the spec
1360+
raise ValueError(f"Contig '{contig}' was not defined in the header.")
1361+
contig.flush()
1362+
logger.debug(f"Encoded CHROM slice {start}:{stop}")
13231363

13241364
def encode_samples(self, pcvcf, sample_id, chunk_width):
13251365
if not np.array_equal(sample_id, pcvcf.metadata.samples):
@@ -1333,90 +1373,33 @@ def encode_samples(self, pcvcf, sample_id, chunk_width):
13331373
)
13341374
array.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
13351375
logger.debug("Samples done")
1336-
core.update_progress(array.nchunks)
13371376

1338-
def encode_contig(self, pcvcf, contig_names, contig_lengths):
1339-
logger.debug("Start encode contig")
1377+
def encode_contig_id(self, pcvcf, contig_names, contig_lengths):
13401378
array = self.root.array(
13411379
"contig_id",
13421380
contig_names,
13431381
dtype="str",
13441382
compressor=core.default_compressor,
13451383
)
13461384
array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
1347-
core.update_progress(array.nchunks)
1348-
13491385
if contig_lengths is not None:
13501386
array = self.root.array(
13511387
"contig_length",
13521388
contig_lengths,
13531389
dtype=np.int64,
13541390
)
13551391
array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
1356-
core.update_progress(array.nchunks)
1357-
1358-
col = pcvcf.columns["CHROM"]
1359-
array = self.root["variant_contig"]
1360-
buff = np.zeros_like(array)
1361-
lookup = {v: j for j, v in enumerate(contig_names)}
1362-
for j, contig in enumerate(col.values):
1363-
try:
1364-
buff[j] = lookup[contig[0]]
1365-
except KeyError:
1366-
# TODO add advice about adding it to the spec
1367-
raise ValueError(f"Contig '{contig}' was not defined in the header.")
1392+
return {v: j for j, v in enumerate(contig_names)}
13681393

1369-
logger.debug("Start contig write")
1370-
array[:] = buff
1371-
core.update_progress(array.nchunks)
1372-
logger.debug("Contig done")
1373-
1374-
def encode_filters(self, pcvcf, filter_names):
1394+
def encode_filter_id(self, pcvcf, filter_names):
13751395
array = self.root.array(
13761396
"filter_id",
13771397
filter_names,
13781398
dtype="str",
13791399
compressor=core.default_compressor,
13801400
)
13811401
array.attrs["_ARRAY_DIMENSIONS"] = ["filters"]
1382-
core.update_progress(array.nchunks)
1383-
1384-
col = pcvcf.columns["FILTERS"]
1385-
array = self.root["variant_filter"]
1386-
buff = np.zeros_like(array)
1387-
1388-
lookup = {v: j for j, v in enumerate(filter_names)}
1389-
for j, filters in enumerate(col.values):
1390-
try:
1391-
for f in filters:
1392-
buff[j, lookup[f]] = True
1393-
except IndexError:
1394-
raise ValueError(f"Filter '{f}' was not defined in the header.")
1395-
1396-
array[:] = buff
1397-
1398-
core.update_progress(array.nchunks)
1399-
logger.debug("Filters done")
1400-
1401-
def encode_id(self, pcvcf):
1402-
col = pcvcf.columns["ID"]
1403-
id_array = self.root["variant_id"]
1404-
id_mask_array = self.root["variant_id_mask"]
1405-
id_buff = np.full_like(id_array, "")
1406-
id_mask_buff = np.zeros_like(id_mask_array)
1407-
1408-
for j, value in enumerate(col.values):
1409-
if value is not None:
1410-
id_buff[j] = value[0]
1411-
else:
1412-
id_buff[j] = "." # TODO is this correct??
1413-
id_mask_buff[j] = True
1414-
1415-
id_array[:] = id_buff
1416-
core.update_progress(id_array.nchunks)
1417-
id_mask_array[:] = id_mask_buff
1418-
core.update_progress(id_mask_array.nchunks)
1419-
logger.debug("ID done")
1402+
return {v: j for j, v in enumerate(filter_names)}
14201403

14211404
@staticmethod
14221405
def encode(
@@ -1453,49 +1436,54 @@ def encode(
14531436
units="chunks",
14541437
show=show_progress,
14551438
)
1439+
1440+
# Do these syncronously for simplicity so we have the mapping
1441+
filter_id_map = sgvcf.encode_filter_id(pcvcf, conversion_spec.filter_id)
1442+
contig_id_map = sgvcf.encode_contig_id(
1443+
pcvcf, conversion_spec.contig_id, conversion_spec.contig_length
1444+
)
1445+
14561446
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
1457-
# FIXME these need to be split up, as they are quite slow on large
1458-
# files, and if worker_processes < 8 we get a big pause
14591447
pwm.submit(
14601448
sgvcf.encode_samples,
14611449
pcvcf,
14621450
conversion_spec.sample_id,
14631451
conversion_spec.chunk_width,
14641452
)
1465-
pwm.submit(sgvcf.encode_alleles, pcvcf)
1466-
pwm.submit(sgvcf.encode_id, pcvcf)
1467-
pwm.submit(
1468-
sgvcf.encode_contig,
1469-
pcvcf,
1470-
conversion_spec.contig_id,
1471-
conversion_spec.contig_length,
1472-
)
1473-
pwm.submit(sgvcf.encode_filters, pcvcf, conversion_spec.filter_id)
1474-
for col in chunked_1d:
1475-
if col.vcf_field is not None:
1476-
for start, stop in slices:
1453+
for start, stop in slices:
1454+
pwm.submit(sgvcf.encode_alleles_slice, pcvcf, start, stop)
1455+
pwm.submit(sgvcf.encode_id_slice, pcvcf, start, stop)
1456+
pwm.submit(
1457+
sgvcf.encode_filters_slice, pcvcf, filter_id_map, start, stop
1458+
)
1459+
pwm.submit(sgvcf.encode_contig_slice, pcvcf, contig_id_map, start, stop)
1460+
for col in chunked_1d:
1461+
if col.vcf_field is not None:
14771462
pwm.submit(sgvcf.encode_column_slice, pcvcf, col, start, stop)
14781463

14791464
chunked_2d = [
14801465
col for col in conversion_spec.columns.values() if len(col.chunks) >= 2
14811466
]
1482-
progress_config = core.ProgressConfig(
1483-
total=sum(sgvcf.root[col.name].nchunks for col in chunked_2d),
1484-
title="Encode 2D",
1485-
units="chunks",
1486-
show=show_progress,
1487-
)
1488-
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
1489-
if "call_genotype" in conversion_spec.columns:
1490-
logger.info(f"Submit encode call_genotypes in {len(slices)} slices")
1491-
for start, stop in slices:
1492-
pwm.submit(sgvcf.encode_genotypes_slice, pcvcf, start, stop)
1493-
1494-
for col in chunked_2d:
1495-
if col.vcf_field is not None:
1496-
logger.info(f"Submit encode {col.name} in {len(slices)} slices")
1467+
if len(chunked_2d) > 0:
1468+
progress_config = core.ProgressConfig(
1469+
total=sum(sgvcf.root[col.name].nchunks for col in chunked_2d),
1470+
title="Encode 2D",
1471+
units="chunks",
1472+
show=show_progress,
1473+
)
1474+
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
1475+
if "call_genotype" in conversion_spec.columns:
1476+
logger.info(f"Submit encode call_genotypes in {len(slices)} slices")
14971477
for start, stop in slices:
1498-
pwm.submit(sgvcf.encode_column_slice, pcvcf, col, start, stop)
1478+
pwm.submit(sgvcf.encode_genotypes_slice, pcvcf, start, stop)
1479+
1480+
for col in chunked_2d:
1481+
if col.vcf_field is not None:
1482+
logger.info(f"Submit encode {col.name} in {len(slices)} slices")
1483+
for start, stop in slices:
1484+
pwm.submit(
1485+
sgvcf.encode_column_slice, pcvcf, col, start, stop
1486+
)
14991487

15001488
zarr.consolidate_metadata(write_path)
15011489
# Atomic swap, now we've completely finished.
@@ -1746,7 +1734,7 @@ def validate(vcf_path, zarr_path, show_progress=False):
17461734
assert pos[start_index] == first_pos
17471735
vcf = cyvcf2.VCF(vcf_path)
17481736
if show_progress:
1749-
iterator = tqdm.tqdm(vcf, desc=" Verify", total=vcf.num_records)
1737+
iterator = tqdm.tqdm(vcf, desc=" Verify", total=vcf.num_records)
17501738
else:
17511739
iterator = vcf
17521740
for j, row in enumerate(iterator, start_index):

0 commit comments

Comments
 (0)