@@ -273,7 +273,7 @@ def scan_vcf(path, target_num_partitions):
273
273
274
274
275
275
def scan_vcfs (paths , show_progress , target_num_partitions , worker_processes = 1 ):
276
- logger .info (f"Scanning { len (paths )} VCFs" )
276
+ logger .info (f"Scanning { len (paths )} VCFs attempting to split into { target_num_partitions } partitions. " )
277
277
progress_config = core .ProgressConfig (
278
278
total = len (paths ),
279
279
units = "files" ,
@@ -311,6 +311,7 @@ def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1):
311
311
key = lambda x : (contig_index_map [x .region .contig ], x .region .start )
312
312
)
313
313
vcf_metadata .partitions = all_partitions
314
+ logger .info (f"Scan complete, resulting in { len (all_partitions )} partitions." )
314
315
return vcf_metadata , header
315
316
316
317
@@ -875,6 +876,7 @@ def num_columns(self):
875
876
return len (self .columns )
876
877
877
878
def mkdirs (self ):
879
+ logger .info (f"Creating { len (self .columns ) * self .num_partitions } directories" )
878
880
self .path .mkdir ()
879
881
for col in self .columns .values ():
880
882
col .path .mkdir (parents = True )
@@ -883,14 +885,12 @@ def mkdirs(self):
883
885
part_path .mkdir ()
884
886
885
887
def write_metadata (self ):
888
+ logger .info (f"Writing metadata" )
886
889
with open (self .path / "metadata.json" , "w" ) as f :
887
890
json .dump (self .metadata .asdict (), f , indent = 4 )
888
- # Write number of partitions in a convenience file for
889
- # workflows
890
- with open (self .path / "num_partitions.txt" , "w" ) as f :
891
- f .write (str (self .num_partitions ))
892
891
893
892
def write_header (self ):
893
+ logger .info (f"Writing header" )
894
894
with open (self .path / "header.txt" , "w" ) as f :
895
895
f .write (self .vcf_header )
896
896
@@ -1012,35 +1012,52 @@ def convert_init(vcfs, out_path, *, num_partitions=1, worker_processes=1, show_p
1012
1012
return pcvcf
1013
1013
1014
1014
def convert_slice (self , start , stop , * , worker_processes = 1 , show_progress = False , column_chunk_size = 16 ):
1015
- logger .info (
1016
- f"Exploding { self .num_columns } columns { self .metadata .num_records } variants "
1017
- f"{ self .num_samples } samples"
1018
- )
1019
1015
if start < 0 :
1020
1016
raise ValueError (f"start={ start } must be non-negative" )
1021
1017
if stop > self .num_partitions :
1022
1018
raise ValueError (f"stop={ stop } must be less than the number of partitions" )
1023
- num_records_to_progress = sum ([partition .num_records for partition in self .metadata .partitions [start :stop ]])
1019
+ if start == 0 and stop == self .num_partitions :
1020
+ num_records_to_process = self .num_records
1021
+ logger .info (
1022
+ f"Exploding { self .num_columns } columns { self .metadata .num_records } variants "
1023
+ f"{ self .num_samples } samples"
1024
+ )
1025
+ else :
1026
+ num_records_to_process = None
1027
+ logger .info (
1028
+ f"Exploding { self .num_columns } columns { self .num_samples } samples"
1029
+ f" from partitions { start } to { stop } "
1030
+ )
1031
+
1024
1032
progress_config = core .ProgressConfig (
1025
- total = num_records_to_progress ,
1033
+ total = num_records_to_process ,
1026
1034
units = "vars" ,
1027
1035
title = "Explode" ,
1028
1036
show = show_progress ,
1029
1037
)
1030
- with core .ParallelWorkManager (worker_processes , progress_config ) as pwm :
1031
- for j in range (start , stop ):
1032
- pwm .submit (
1033
- PickleChunkedVcf .convert_partition ,
1034
- self .metadata ,
1035
- j ,
1036
- self .path ,
1037
- column_chunk_size = column_chunk_size ,
1038
- )
1039
- for _ in pwm .results_as_completed ():
1040
- pass
1038
+ if stop - start == 1 :
1039
+ PickleChunkedVcf .convert_partition (
1040
+ self .metadata ,
1041
+ start ,
1042
+ self .path ,
1043
+ column_chunk_size = column_chunk_size ,
1044
+ )
1045
+ else :
1046
+ with core .ParallelWorkManager (worker_processes , progress_config ) as pwm :
1047
+ for j in range (start , stop ):
1048
+ pwm .submit (
1049
+ PickleChunkedVcf .convert_partition ,
1050
+ self .metadata ,
1051
+ j ,
1052
+ self .path ,
1053
+ column_chunk_size = column_chunk_size ,
1054
+ )
1055
+ for _ in pwm .results_as_completed ():
1056
+ pass
1041
1057
1042
1058
def convert_finalise (self ):
1043
- assert not self .metadata .finalised
1059
+ if self .metadata .finalised :
1060
+ raise ValueError ("Already finalised" )
1044
1061
1045
1062
partition_summaries = self .load_partition_summaries ()
1046
1063
for index , summary in enumerate (partition_summaries ):
0 commit comments