@@ -1967,7 +1967,9 @@ def finalise_array(self, name):
1967
1967
chunk_files = [
1968
1968
path for path in src .iterdir () if not path .name .startswith ("." )
1969
1969
]
1970
- # TODO check for a count of then number of files
1970
+ # TODO check for a count of then number of files. If we require a
1971
+ # dimension_separator of "/" then we could make stronger assertions
1972
+ # here, as we'd always have num_variant_chunks
1971
1973
logger .debug (
1972
1974
f"Moving { len (chunk_files )} chunks for { name } partition { partition } "
1973
1975
)
@@ -1976,11 +1978,23 @@ def finalise_array(self, name):
1976
1978
# Finally, once all the chunks have moved into the arrays dir,
1977
1979
# we move it out of wip
1978
1980
os .rename (self .arrays_path / name , self .path / name )
1981
+ core .update_progress (1 )
1979
1982
1980
- def finalise (self ):
1983
+ def finalise (self , show_progress = False ):
1981
1984
self .load_metadata ()
1982
- for name in self .metadata .schema .columns :
1983
- self .finalise_array (name )
1985
+
1986
+ progress_config = core .ProgressConfig (
1987
+ total = len (self .metadata .schema .columns ),
1988
+ title = "Finalise" ,
1989
+ units = "array" ,
1990
+ show = show_progress ,
1991
+ )
1992
+ # NOTE: it's not clear that adding more workers will make this quicker,
1993
+ # as it's just going to be causing contention on the file system.
1994
+ # Something to check empirically in some deployments.
1995
+ with core .ParallelWorkManager (1 , progress_config ) as pwm :
1996
+ for name in self .metadata .schema .columns :
1997
+ pwm .submit (self .finalise_array , name )
1984
1998
zarr .consolidate_metadata (self .path )
1985
1999
1986
2000
######################
@@ -2074,7 +2088,7 @@ def encode(
2074
2088
show_progress = show_progress ,
2075
2089
max_memory = max_memory ,
2076
2090
)
2077
- encode_finalise ( zarr_path )
2091
+ vzw . finalise ( show_progress )
2078
2092
2079
2093
2080
2094
def encode_init (
@@ -2124,9 +2138,9 @@ def encode_partition(zarr_path, partition, *, show_progress=False, worker_proces
2124
2138
)
2125
2139
2126
2140
2127
- def encode_finalise (zarr_path ):
2141
+ def encode_finalise (zarr_path , show_progress = False ):
2128
2142
writer = VcfZarrWriter (zarr_path )
2129
- writer .finalise ()
2143
+ writer .finalise (show_progress = show_progress )
2130
2144
2131
2145
2132
2146
def convert (
@@ -2336,7 +2350,7 @@ def validate(vcf_path, zarr_path, show_progress=False):
2336
2350
assert pos [start_index ] == first_pos
2337
2351
vcf = cyvcf2 .VCF (vcf_path )
2338
2352
if show_progress :
2339
- iterator = tqdm .tqdm (vcf , desc = " Verify" , total = vcf .num_records ) # NEEDS TEST
2353
+ iterator = tqdm .tqdm (vcf , desc = " Verify" , total = vcf .num_records ) # NEEDS TEST
2340
2354
else :
2341
2355
iterator = vcf
2342
2356
for j , row in enumerate (iterator , start_index ):
0 commit comments