Skip to content

Commit 524fcb2

Browse files
Various ground work for improving logging
1 parent 571cc21 commit 524fcb2

File tree

2 files changed

+53
-62
lines changed

2 files changed

+53
-62
lines changed

bio2zarr/core.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,16 @@ def cancel_futures(futures):
134134
class BufferedArray:
135135
array: zarr.Array
136136
array_offset: int
137+
name: str
137138
buff: np.ndarray
138139
buffer_row: int
140+
max_buff_size: int = 0
139141

140-
def __init__(self, array, offset):
142+
def __init__(self, array, offset, name="Unknown"):
141143
self.array = array
142144
self.array_offset = offset
143145
assert offset % array.chunks[0] == 0
146+
self.name = name
144147
dims = list(array.shape)
145148
dims[0] = min(array.chunks[0], array.shape[0])
146149
self.buff = np.empty(dims, dtype=array.dtype)
@@ -171,11 +174,12 @@ def flush(self):
171174
self.buff[: self.buffer_row], self.array, self.array_offset
172175
)
173176
logger.debug(
174-
f"Flushed <{self.array.name} {self.array.shape} "
177+
f"Flushed <{self.name} {self.array.shape} "
175178
f"{self.array.dtype}> "
176179
f"{self.array_offset}:{self.array_offset + self.buffer_row}"
177180
f"{self.buff.nbytes / 2**20: .2f}Mb"
178181
)
182+
self.max_buff_size = max(self.max_buff_size, sys.getsizeof(self.buff))
179183
self.array_offset += self.variants_chunk_size
180184
self.buffer_row = 0
181185

bio2zarr/vcf2zarr/vcz.py

Lines changed: 47 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -844,23 +844,36 @@ def encode_partition(self, partition_index):
844844
os.rename(partition_path, final_path)
845845

846846
def init_partition_array(self, partition_index, name):
847+
field_map = self.schema.field_map()
848+
array_spec = field_map[name]
847849
# Create an empty array like the definition
848-
src = self.arrays_path / name
850+
src = self.arrays_path / array_spec.name
849851
# Overwrite any existing WIP files
850-
wip_path = self.wip_partition_array_path(partition_index, name)
852+
wip_path = self.wip_partition_array_path(partition_index, array_spec.name)
851853
shutil.copytree(src, wip_path, dirs_exist_ok=True)
852854
array = zarr.open_array(store=wip_path, mode="a")
853-
logger.debug(f"Opened empty array {array.name} <{array.dtype}> @ {wip_path}")
854-
return array
855-
856-
def finalise_partition_array(self, partition_index, name):
857-
logger.debug(f"Encoded {name} partition {partition_index}")
855+
partition = self.metadata.partitions[partition_index]
856+
ba = core.BufferedArray(array, partition.start, name)
857+
logger.info(
858+
f"Start partition {partition_index} array {name} <{array.dtype}> "
859+
f"{array.shape} @ {wip_path}"
860+
)
861+
return ba
862+
863+
def finalise_partition_array(self, partition_index, buffered_array):
864+
buffered_array.flush()
865+
# field_map = self.schema.field_map()
866+
# array_spec = field_map[buffered_array.name]
867+
# ba = buffered_array
868+
# print(array_spec.name, "ba.max_buff_size", ba.max_buff_size,
869+
# array_spec.variant_chunk_nbytes)
870+
logger.info(
871+
f"Completed partition {partition_index} array {buffered_array.name}"
872+
)
858873

859874
def encode_array_partition(self, array_spec, partition_index):
860-
array = self.init_partition_array(partition_index, array_spec.name)
861-
862875
partition = self.metadata.partitions[partition_index]
863-
ba = core.BufferedArray(array, partition.start)
876+
ba = self.init_partition_array(partition_index, array_spec.name)
864877
source_field = self.icf.fields[array_spec.vcf_field]
865878
sanitiser = source_field.sanitiser_factory(ba.buff.shape)
866879

@@ -869,20 +882,16 @@ def encode_array_partition(self, array_spec, partition_index):
869882
# to make it easier to reason about dimension padding
870883
j = ba.next_buffer_row()
871884
sanitiser(ba.buff, j, value)
872-
ba.flush()
873-
self.finalise_partition_array(partition_index, array_spec.name)
885+
self.finalise_partition_array(partition_index, ba)
874886

875887
def encode_genotypes_partition(self, partition_index):
876-
gt_array = self.init_partition_array(partition_index, "call_genotype")
877-
gt_mask_array = self.init_partition_array(partition_index, "call_genotype_mask")
878-
gt_phased_array = self.init_partition_array(
879-
partition_index, "call_genotype_phased"
880-
)
888+
# FIXME we should be doing these one at a time, reading back in the genotypes
889+
# like we do for local alleles
890+
gt = self.init_partition_array(partition_index, "call_genotype")
891+
gt_mask = self.init_partition_array(partition_index, "call_genotype_mask")
892+
gt_phased = self.init_partition_array(partition_index, "call_genotype_phased")
881893

882894
partition = self.metadata.partitions[partition_index]
883-
gt = core.BufferedArray(gt_array, partition.start)
884-
gt_mask = core.BufferedArray(gt_mask_array, partition.start)
885-
gt_phased = core.BufferedArray(gt_phased_array, partition.start)
886895

887896
source_field = self.icf.fields["FORMAT/GT"]
888897
for value in source_field.iter_values(partition.start, partition.stop):
@@ -898,18 +907,14 @@ def encode_genotypes_partition(self, partition_index):
898907
# with mixed ploidies?
899908
j = gt_mask.next_buffer_row()
900909
gt_mask.buff[j] = gt.buff[j] < 0
901-
gt.flush()
902-
gt_phased.flush()
903-
gt_mask.flush()
904910

905-
self.finalise_partition_array(partition_index, "call_genotype")
906-
self.finalise_partition_array(partition_index, "call_genotype_mask")
907-
self.finalise_partition_array(partition_index, "call_genotype_phased")
911+
self.finalise_partition_array(partition_index, gt)
912+
self.finalise_partition_array(partition_index, gt_phased)
913+
self.finalise_partition_array(partition_index, gt_mask)
908914

909915
def encode_local_alleles_partition(self, partition_index):
910916
partition = self.metadata.partitions[partition_index]
911-
call_LA_array = self.init_partition_array(partition_index, "call_LA")
912-
call_LA = core.BufferedArray(call_LA_array, partition.start)
917+
call_LA = self.init_partition_array(partition_index, "call_LA")
913918

914919
gt_array = zarr.open_array(
915920
store=self.wip_partition_array_path(partition_index, "call_genotype"),
@@ -921,26 +926,23 @@ def encode_local_alleles_partition(self, partition_index):
921926
la = compute_la_field(genotypes)
922927
j = call_LA.next_buffer_row()
923928
call_LA.buff[j] = la
924-
925-
call_LA.flush()
926-
self.finalise_partition_array(partition_index, "call_LA")
929+
self.finalise_partition_array(partition_index, call_LA)
927930

928931
def encode_local_allele_fields_partition(self, partition_index):
929932
partition = self.metadata.partitions[partition_index]
930933
la_array = zarr.open_array(
931934
store=self.wip_partition_array_path(partition_index, "call_LA"),
932935
mode="r",
933936
)
934-
field_map = self.schema.field_map()
935937
# We got through the localisable fields one-by-one so that we don't need to
936938
# keep several large arrays in memory at once for each partition.
939+
field_map = self.schema.field_map()
937940
for descriptor in localisable_fields:
938941
if descriptor.array_name not in field_map:
939942
continue
940943
assert field_map[descriptor.array_name].vcf_field is None
941944

942-
array = self.init_partition_array(partition_index, descriptor.array_name)
943-
buff = core.BufferedArray(array, partition.start)
945+
buff = self.init_partition_array(partition_index, descriptor.array_name)
944946
source = self.icf.fields[descriptor.vcf_field].iter_values(
945947
partition.start, partition.stop
946948
)
@@ -951,14 +953,11 @@ def encode_local_allele_fields_partition(self, partition_index):
951953
value = descriptor.sanitise(raw_value, 2, raw_value.dtype)
952954
j = buff.next_buffer_row()
953955
buff.buff[j] = descriptor.convert(value, la)
954-
buff.flush()
955-
self.finalise_partition_array(partition_index, "array_name")
956+
self.finalise_partition_array(partition_index, buff)
956957

957958
def encode_alleles_partition(self, partition_index):
958-
array_name = "variant_allele"
959-
alleles_array = self.init_partition_array(partition_index, array_name)
959+
alleles = self.init_partition_array(partition_index, "variant_allele")
960960
partition = self.metadata.partitions[partition_index]
961-
alleles = core.BufferedArray(alleles_array, partition.start)
962961
ref_field = self.icf.fields["REF"]
963962
alt_field = self.icf.fields["ALT"]
964963

@@ -970,16 +969,12 @@ def encode_alleles_partition(self, partition_index):
970969
alleles.buff[j, :] = constants.STR_FILL
971970
alleles.buff[j, 0] = ref[0]
972971
alleles.buff[j, 1 : 1 + len(alt)] = alt
973-
alleles.flush()
974-
975-
self.finalise_partition_array(partition_index, array_name)
972+
self.finalise_partition_array(partition_index, alleles)
976973

977974
def encode_id_partition(self, partition_index):
978-
vid_array = self.init_partition_array(partition_index, "variant_id")
979-
vid_mask_array = self.init_partition_array(partition_index, "variant_id_mask")
975+
vid = self.init_partition_array(partition_index, "variant_id")
976+
vid_mask = self.init_partition_array(partition_index, "variant_id_mask")
980977
partition = self.metadata.partitions[partition_index]
981-
vid = core.BufferedArray(vid_array, partition.start)
982-
vid_mask = core.BufferedArray(vid_mask_array, partition.start)
983978
field = self.icf.fields["ID"]
984979

985980
for value in field.iter_values(partition.start, partition.stop):
@@ -992,18 +987,14 @@ def encode_id_partition(self, partition_index):
992987
else:
993988
vid.buff[j] = constants.STR_MISSING
994989
vid_mask.buff[j] = True
995-
vid.flush()
996-
vid_mask.flush()
997990

998-
self.finalise_partition_array(partition_index, "variant_id")
999-
self.finalise_partition_array(partition_index, "variant_id_mask")
991+
self.finalise_partition_array(partition_index, vid)
992+
self.finalise_partition_array(partition_index, vid_mask)
1000993

1001994
def encode_filters_partition(self, partition_index):
1002995
lookup = {filt.id: index for index, filt in enumerate(self.schema.filters)}
1003-
array_name = "variant_filter"
1004-
array = self.init_partition_array(partition_index, array_name)
996+
var_filter = self.init_partition_array(partition_index, "variant_filter")
1005997
partition = self.metadata.partitions[partition_index]
1006-
var_filter = core.BufferedArray(array, partition.start)
1007998

1008999
field = self.icf.fields["FILTERS"]
10091000
for value in field.iter_values(partition.start, partition.stop):
@@ -1016,16 +1007,13 @@ def encode_filters_partition(self, partition_index):
10161007
raise ValueError(
10171008
f"Filter '{f}' was not defined in the header."
10181009
) from None
1019-
var_filter.flush()
10201010

1021-
self.finalise_partition_array(partition_index, array_name)
1011+
self.finalise_partition_array(partition_index, var_filter)
10221012

10231013
def encode_contig_partition(self, partition_index):
10241014
lookup = {contig.id: index for index, contig in enumerate(self.schema.contigs)}
1025-
array_name = "variant_contig"
1026-
array = self.init_partition_array(partition_index, array_name)
1015+
contig = self.init_partition_array(partition_index, "variant_contig")
10271016
partition = self.metadata.partitions[partition_index]
1028-
contig = core.BufferedArray(array, partition.start)
10291017
field = self.icf.fields["CHROM"]
10301018

10311019
for value in field.iter_values(partition.start, partition.stop):
@@ -1035,9 +1023,8 @@ def encode_contig_partition(self, partition_index):
10351023
# will always succeed. However, if anyone ever does hit a KeyError
10361024
# here, please do open an issue with a reproducible example!
10371025
contig.buff[j] = lookup[value[0]]
1038-
contig.flush()
10391026

1040-
self.finalise_partition_array(partition_index, array_name)
1027+
self.finalise_partition_array(partition_index, contig)
10411028

10421029
#######################
10431030
# finalise

0 commit comments

Comments
 (0)