@@ -983,12 +983,12 @@ def convert_partition(
983
983
# is that we get a significant pause at the end of the counter while
984
984
# all the "small" fields get flushed. Possibly not much to be done about it.
985
985
core .update_progress (1 )
986
- partition_metadata = {
987
- "num_records" : num_records ,
988
- "field_summaries" : {k :v .asdict () for k ,v in tcw .field_summaries .items ()}
989
- }
990
- with open (out_path / f"partition_{ partition_index } _metadata.json" , "w" ) as f :
991
- json .dump (partition_metadata , f , indent = 4 )
986
+ partition_metadata = {
987
+ "num_records" : num_records ,
988
+ "field_summaries" : {k :v .asdict () for k ,v in tcw .field_summaries .items ()}
989
+ }
990
+ with open (out_path / f"partition_{ partition_index } _metadata.json" , "w" ) as f :
991
+ json .dump (partition_metadata , f , indent = 4 )
992
992
logger .info (
993
993
f"Finish p{ partition_index } { partition .vcf_path } __{ partition .region } ="
994
994
f"{ num_records } records"
@@ -1081,7 +1081,7 @@ def convert_finalise(self):
1081
1081
def convert (
1082
1082
vcfs , out_path , * , column_chunk_size = 16 , worker_processes = 1 , show_progress = False
1083
1083
):
1084
- pcvcf = PickleChunkedVcf .convert_init (vcfs , out_path , worker_processes = worker_processes , show_progress = show_progress )
1084
+ pcvcf = PickleChunkedVcf .convert_init (vcfs , out_path , num_partitions = max ( 1 , worker_processes * 40 ), worker_processes = worker_processes , show_progress = show_progress )
1085
1085
pcvcf .convert_slice (0 , len (pcvcf .metadata .partitions ), worker_processes = worker_processes , show_progress = show_progress , column_chunk_size = column_chunk_size )
1086
1086
pcvcf .convert_finalise ()
1087
1087
0 commit comments