@@ -236,14 +236,14 @@ def scan_vcfs(paths, show_progress, target_num_partitions):
236
236
regions = indexed_vcf .partition_into_regions (num_parts = target_num_partitions )
237
237
for region in regions :
238
238
partitions .append (
239
- # Requires cyvcf2>=0.30.27
240
239
VcfPartition (
241
240
vcf_path = str (path ),
242
241
region = region ,
243
242
)
244
243
)
245
- # TODO figure out if this is safe when we have multiple chrs
246
- # in the file
244
+ # TODO figure out if this is correct. It should be fine because of VCF
245
+ # sorting, but need to verify there isn't some loophole with BCF, where
246
+ # contigs are sorted by their index in the list rather than string value.
247
247
partitions .sort (key = lambda x : (x .region .contig , x .region .start ))
248
248
vcf_metadata .partitions = partitions
249
249
return vcf_metadata , header
@@ -791,7 +791,7 @@ def convert_partition(
791
791
partition = vcf_metadata .partitions [partition_index ]
792
792
vcf = cyvcf2 .VCF (partition .vcf_path )
793
793
logger .info (
794
- f"Start partition { partition_index } { partition .vcf_path } : { partition .region } "
794
+ f"Start p { partition_index } { partition .vcf_path } __ { partition .region } "
795
795
)
796
796
797
797
info_fields = []
@@ -835,6 +835,7 @@ def variants():
835
835
encoder_threads = 0 ,
836
836
chunk_size = column_chunk_size ,
837
837
) as tcw :
838
+ num_records = 0
838
839
for variant in variants ():
839
840
tcw .append ("CHROM" , variant .CHROM )
840
841
tcw .append ("POS" , variant .POS )
@@ -859,8 +860,13 @@ def variants():
859
860
# is that we get a significant pause at the end of the counter while
860
861
# all the "small" fields get flushed. Possibly not much to be done about it.
861
862
core .update_progress (1 )
863
+ num_records += 1
862
864
863
- return tcw .field_summaries
865
+ logger .info (
866
+ f"Finish p{ partition_index } { partition .vcf_path } __{ partition .region } ="
867
+ f"{ num_records } records" )
868
+
869
+ return tcw .field_summaries , num_records
864
870
865
871
@staticmethod
866
872
def convert (
@@ -896,7 +902,13 @@ def convert(
896
902
out_path ,
897
903
column_chunk_size = column_chunk_size ,
898
904
)
899
- partition_summaries = list (pwm .results_as_completed ())
905
+ num_records = 0
906
+ partition_summaries = []
907
+ for partition_summary , partition_num_records in pwm .results_as_completed ():
908
+ num_records += partition_num_records
909
+ partition_summaries .append (partition_summary )
910
+
911
+ assert num_records == pcvcf .num_records
900
912
901
913
for field in vcf_metadata .fields :
902
914
# Clear the summary to avoid problems when running in debug
0 commit comments