@@ -791,6 +791,8 @@ def __init__(
791
791
for vcf_field in icf_metadata .fields :
792
792
field_path = get_vcf_field_path (out_path , vcf_field )
793
793
field_partition_path = field_path / f"p{ partition_index } "
794
+ # Should be robust to running explode_partition twice.
795
+ field_partition_path .mkdir (exist_ok = True )
794
796
transformer = VcfValueTransformer .factory (vcf_field , num_samples )
795
797
self .field_writers [vcf_field .full_name ] = IcfFieldWriter (
796
798
vcf_field ,
@@ -890,14 +892,6 @@ def num_columns(self):
890
892
return len (self .columns )
891
893
892
894
893
- def mkdir_with_progress (path ):
894
- logger .debug (f"mkdir f{ path } " )
895
- # NOTE we may have race-conditions here, I'm not sure. Hopefully allowing
896
- # parents=True will take care of it.
897
- path .mkdir (parents = True )
898
- core .update_progress (1 )
899
-
900
-
901
895
class IntermediateColumnarFormatWriter :
902
896
def __init__ (self , path ):
903
897
self .path = pathlib .Path (path )
@@ -940,7 +934,7 @@ def init(
940
934
# dependencies as well.
941
935
self .metadata .provenance = {"source" : f"bio2zarr-{ provenance .__version__ } " }
942
936
943
- self .mkdirs (worker_processes , show_progress = show_progress )
937
+ self .mkdirs ()
944
938
945
939
# Note: this is needed for the current version of the vcfzarr spec, but it's
946
940
# probably going to be dropped.
@@ -955,30 +949,14 @@ def init(
955
949
json .dump (self .metadata .asdict (), f , indent = 4 )
956
950
return self .num_partitions
957
951
958
- def mkdirs (self , worker_processes = 1 , show_progress = False ):
959
- num_dirs = len (self .metadata .fields ) * self . num_partitions
960
- logger .info (f"Creating { num_dirs } directories" )
952
+ def mkdirs (self ):
953
+ num_dirs = len (self .metadata .fields )
954
+ logger .info (f"Creating { num_dirs } field directories" )
961
955
self .path .mkdir ()
962
956
self .wip_path .mkdir ()
963
- # Due to high latency batch system filesystems, we create all the directories in
964
- # parallel
965
- progress_config = core .ProgressConfig (
966
- total = num_dirs ,
967
- units = "dirs" ,
968
- title = "Mkdirs" ,
969
- show = show_progress ,
970
- )
971
- with core .ParallelWorkManager (
972
- worker_processes = worker_processes , progress_config = progress_config
973
- ) as manager :
974
- for field in self .metadata .fields :
975
- col_path = get_vcf_field_path (self .path , field )
976
- # Don't bother trying to count the intermediate directories towards
977
- # progress
978
- manager .submit (col_path .mkdir , parents = True )
979
- for j in range (self .num_partitions ):
980
- part_path = col_path / f"p{ j } "
981
- manager .submit (mkdir_with_progress , part_path )
957
+ for field in self .metadata .fields :
958
+ col_path = get_vcf_field_path (self .path , field )
959
+ col_path .mkdir (parents = True )
982
960
983
961
def load_partition_summaries (self ):
984
962
summaries = []
0 commit comments