1
1
import concurrent .futures as cf
2
- import warnings
3
2
import collections
4
3
import dataclasses
5
4
import functools
@@ -183,6 +182,7 @@ def make_field_def(name, vcf_type, vcf_number):
183
182
]
184
183
return fields
185
184
185
+
186
186
def scan_vcf (path , target_num_partitions ):
187
187
logger .debug (f"Scanning { path } " )
188
188
with vcf_utils .IndexedVcf (path ) as indexed_vcf :
@@ -269,7 +269,9 @@ def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1):
269
269
# Sort by contig (in the order they appear in the header) first,
270
270
# then by start coordinate
271
271
contig_index_map = {contig : j for j , contig in enumerate (metadata .contig_names )}
272
- all_partitions .sort (key = lambda x : (contig_index_map [x .region .contig ], x .region .start ))
272
+ all_partitions .sort (
273
+ key = lambda x : (contig_index_map [x .region .contig ], x .region .start )
274
+ )
273
275
vcf_metadata .partitions = all_partitions
274
276
return vcf_metadata , header
275
277
@@ -814,7 +816,6 @@ def convert_partition(
814
816
column_chunk_size = 16 ,
815
817
):
816
818
partition = vcf_metadata .partitions [partition_index ]
817
- vcf = cyvcf2 .VCF (partition .vcf_path )
818
819
logger .info (
819
820
f"Start p{ partition_index } { partition .vcf_path } __{ partition .region } "
820
821
)
@@ -831,23 +832,6 @@ def convert_partition(
831
832
else :
832
833
format_fields .append (field )
833
834
834
- def variants ():
835
- # with warnings.catch_warnings():
836
- # # TODO cyvcf2 emits a warning for empty regions; either make the
837
- # # warning more specific, or remove the need for querying empty
838
- # # regions.
839
- # # FIXME this also absorbs any warnings emitted within the loop,
840
- # # so definitely need to do this a different way.
841
- # warnings.simplefilter("ignore")
842
- # for var in region_filter(vcf(partition.region), partition.region):
843
- # yield var
844
-
845
- # TODO move this into the IndexedVCF class
846
- start = 1 if partition .region .start is None else partition .region .start
847
- for var in vcf (str (partition .region )):
848
- if var .POS >= start :
849
- yield var
850
-
851
835
# FIXME it looks like this is actually a bit pointless now that we
852
836
# can split up into multiple regions within the VCF. It's simpler
853
837
# and easier to explain and predict performance if we just do
@@ -860,38 +844,38 @@ def variants():
860
844
encoder_threads = 0 ,
861
845
chunk_size = column_chunk_size ,
862
846
) as tcw :
863
- num_records = 0
864
- for variant in variants ():
865
- tcw .append ("CHROM" , variant .CHROM )
866
- tcw .append ("POS" , variant .POS )
867
- tcw .append ("QUAL" , variant .QUAL )
868
- tcw .append ("ID" , variant .ID )
869
- tcw .append ("FILTERS" , variant .FILTERS )
870
- tcw .append ("REF" , variant .REF )
871
- tcw .append ("ALT" , variant .ALT )
872
- for field in info_fields :
873
- tcw .append (field .full_name , variant .INFO .get (field .name , None ))
874
- if has_gt :
875
- tcw .append ("FORMAT/GT" , variant .genotype .array ())
876
- for field in format_fields :
877
- val = None
878
- try :
879
- val = variant .format (field .name )
880
- except KeyError :
881
- pass
882
- tcw .append (field .full_name , val )
883
-
884
- # Note: an issue with updating the progress per variant here like this
885
- # is that we get a significant pause at the end of the counter while
886
- # all the "small" fields get flushed. Possibly not much to be done about it.
887
- core .update_progress (1 )
888
- num_records += 1
847
+ with vcf_utils .IndexedVcf (partition .vcf_path ) as ivcf :
848
+ num_records = 0
849
+ for variant in ivcf .variants (partition .region ):
850
+ tcw .append ("CHROM" , variant .CHROM )
851
+ tcw .append ("POS" , variant .POS )
852
+ tcw .append ("QUAL" , variant .QUAL )
853
+ tcw .append ("ID" , variant .ID )
854
+ tcw .append ("FILTERS" , variant .FILTERS )
855
+ tcw .append ("REF" , variant .REF )
856
+ tcw .append ("ALT" , variant .ALT )
857
+ for field in info_fields :
858
+ tcw .append (field .full_name , variant .INFO .get (field .name , None ))
859
+ if has_gt :
860
+ tcw .append ("FORMAT/GT" , variant .genotype .array ())
861
+ for field in format_fields :
862
+ val = None
863
+ try :
864
+ val = variant .format (field .name )
865
+ except KeyError :
866
+ pass
867
+ tcw .append (field .full_name , val )
868
+
869
+ # Note: an issue with updating the progress per variant here like this
870
+ # is that we get a significant pause at the end of the counter while
871
+ # all the "small" fields get flushed. Possibly not much to be done about it.
872
+ core .update_progress (1 )
873
+ num_records += 1
889
874
890
875
logger .info (
891
876
f"Finish p{ partition_index } { partition .vcf_path } __{ partition .region } ="
892
877
f"{ num_records } records"
893
878
)
894
-
895
879
return partition_index , tcw .field_summaries , num_records
896
880
897
881
@staticmethod
0 commit comments