@@ -151,7 +151,7 @@ class VcfPartition:
151
151
152
152
ICF_METADATA_FORMAT_VERSION = "0.2"
153
153
ICF_DEFAULT_COMPRESSOR = numcodecs .Blosc (
154
- cname = "lz4 " , clevel = 7 , shuffle = numcodecs .Blosc .NOSHUFFLE
154
+ cname = "zstd " , clevel = 7 , shuffle = numcodecs .Blosc .NOSHUFFLE
155
155
)
156
156
157
157
@@ -890,6 +890,15 @@ def num_columns(self):
890
890
return len (self .columns )
891
891
892
892
893
+
894
+ def mkdir_with_progress (path ):
895
+ logger .debug (f"mkdir f{ path } " )
896
+ # NOTE we may have race-conditions here, I'm not sure. Hopefully allowing
897
+ # parents=True will take care of it.
898
+ path .mkdir (parents = True )
899
+ core .update_progress (1 )
900
+
901
+
893
902
class IntermediateColumnarFormatWriter :
894
903
def __init__ (self , path ):
895
904
self .path = pathlib .Path (path )
@@ -932,7 +941,7 @@ def init(
932
941
# dependencies as well.
933
942
self .metadata .provenance = {"source" : f"bio2zarr-{ provenance .__version__ } " }
934
943
935
- self .mkdirs (worker_processes )
944
+ self .mkdirs (worker_processes , show_progress = show_progress )
936
945
937
946
# Note: this is needed for the current version of the vcfzarr spec, but it's
938
947
# probably going to be dropped.
@@ -947,30 +956,30 @@ def init(
947
956
json .dump (self .metadata .asdict (), f , indent = 4 )
948
957
return self .num_partitions
949
958
950
- def mkdirs (self , worker_processes = 1 ):
951
- logger .info (
952
- f"Creating { len (self .metadata .fields ) * self .num_partitions } directories"
953
- )
959
+ def mkdirs (self , worker_processes = 1 , show_progress = False ):
960
+ num_dirs = len (self .metadata .fields ) * self .num_partitions
961
+ logger .info (f"Creating { num_dirs } directories" )
954
962
self .path .mkdir ()
955
963
self .wip_path .mkdir ()
956
964
# Due to high latency batch system filesystems, we create all the directories in
957
965
# parallel
958
966
progress_config = core .ProgressConfig (
959
- total = len ( self . metadata . fields ) * self . num_partitions ,
960
- units = "dir " ,
961
- title = "Creating directories " ,
962
- show = True
967
+ total = num_dirs ,
968
+ units = "dirs " ,
969
+ title = "Mkdirs " ,
970
+ show = show_progress ,
963
971
)
964
972
with core .ParallelWorkManager (
965
- worker_processes = worker_processes ,
966
- progress_config = progress_config
973
+ worker_processes = worker_processes , progress_config = progress_config
967
974
) as manager :
968
975
for field in self .metadata .fields :
969
976
col_path = get_vcf_field_path (self .path , field )
977
+ # Don't bother trying to count the intermediate directories towards
978
+ # progress
970
979
manager .submit (col_path .mkdir , parents = True )
971
980
for j in range (self .num_partitions ):
972
981
part_path = col_path / f"p{ j } "
973
- manager .submit (part_path . mkdir , parents = True )
982
+ manager .submit (mkdir_with_progress , part_path )
974
983
975
984
def load_partition_summaries (self ):
976
985
summaries = []
@@ -1499,15 +1508,17 @@ def parse_max_memory(max_memory):
1499
1508
1500
1509
1501
1510
class VcfZarrWriter :
1502
- def __init__ (self , path , icf , schema ):
1511
+ def __init__ (self , path , icf , schema , dimension_separator = None ):
1503
1512
self .path = pathlib .Path (path )
1504
1513
self .icf = icf
1505
1514
self .schema = schema
1515
+ # Default to using nested directories following the Zarr v3 default.
1516
+ # This seems to require version 2.17+ to work properly
1517
+ self .dimension_separator = "/" if dimension_separator is None else dimension_separator
1506
1518
store = zarr .DirectoryStore (self .path )
1507
1519
self .root = zarr .group (store = store )
1508
1520
1509
1521
def init_array (self , variable ):
1510
- # print("CREATE", variable)
1511
1522
object_codec = None
1512
1523
if variable .dtype == "O" :
1513
1524
object_codec = numcodecs .VLenUTF8 ()
@@ -1519,7 +1530,9 @@ def init_array(self, variable):
1519
1530
compressor = numcodecs .get_codec (variable .compressor ),
1520
1531
filters = [numcodecs .get_codec (filt ) for filt in variable .filters ],
1521
1532
object_codec = object_codec ,
1533
+ dimension_separator = self .dimension_separator ,
1522
1534
)
1535
+ # Dimension names are part of the spec in Zarr v3
1523
1536
a .attrs ["_ARRAY_DIMENSIONS" ] = variable .dimensions
1524
1537
1525
1538
def get_array (self , name ):
@@ -1657,6 +1670,7 @@ def encode_contig_id(self):
1657
1670
"contig_length" ,
1658
1671
self .schema .contig_length ,
1659
1672
dtype = np .int64 ,
1673
+ compressor = DEFAULT_ZARR_COMPRESSOR ,
1660
1674
)
1661
1675
array .attrs ["_ARRAY_DIMENSIONS" ] = ["contigs" ]
1662
1676
return {v : j for j , v in enumerate (self .schema .contig_id )}
@@ -1849,6 +1863,7 @@ def encode(
1849
1863
variants_chunk_size = None ,
1850
1864
samples_chunk_size = None ,
1851
1865
max_v_chunks = None ,
1866
+ dimension_separator = None ,
1852
1867
max_memory = None ,
1853
1868
worker_processes = 1 ,
1854
1869
show_progress = False ,
@@ -1872,7 +1887,7 @@ def encode(
1872
1887
if zarr_path .exists ():
1873
1888
logger .warning (f"Deleting existing { zarr_path } " )
1874
1889
shutil .rmtree (zarr_path )
1875
- vzw = VcfZarrWriter (zarr_path , icf , schema )
1890
+ vzw = VcfZarrWriter (zarr_path , icf , schema , dimension_separator = dimension_separator )
1876
1891
vzw .init ()
1877
1892
vzw .encode (
1878
1893
max_v_chunks = max_v_chunks ,
0 commit comments