@@ -1311,6 +1311,7 @@ def __post_init__(self):
1311
1311
self .shape = tuple (self .shape )
1312
1312
self .chunks = tuple (self .chunks )
1313
1313
self .dimensions = tuple (self .dimensions )
1314
+ self .filters = tuple (self .filters )
1314
1315
1315
1316
@staticmethod
1316
1317
def new (** kwargs ):
@@ -1396,27 +1397,29 @@ def variant_chunk_nbytes(self):
1396
1397
for size in self .shape [1 :]:
1397
1398
chunk_items *= size
1398
1399
dt = np .dtype (self .dtype )
1399
- if dt .kind == "O" :
1400
+ if dt .kind == "O" and "samples" in self . dimensions :
1400
1401
logger .warning (
1401
1402
f"Field { self .name } is a string; max memory usage may "
1402
1403
"be a significant underestimate"
1403
1404
)
1404
1405
return chunk_items * dt .itemsize
1405
1406
1406
1407
1407
- ZARR_SCHEMA_FORMAT_VERSION = "0.3 "
1408
+ ZARR_SCHEMA_FORMAT_VERSION = "0.4 "
1408
1409
1409
1410
1410
1411
@dataclasses .dataclass
1411
1412
class VcfZarrSchema :
1412
1413
format_version : str
1413
1414
samples_chunk_size : int
1414
1415
variants_chunk_size : int
1415
- dimensions : list
1416
1416
samples : list
1417
1417
contigs : list
1418
1418
filters : list
1419
- fields : dict
1419
+ fields : list
1420
+
1421
+ def field_map (self ):
1422
+ return {field .name : field for field in self .fields }
1420
1423
1421
1424
def asdict (self ):
1422
1425
return dataclasses .asdict (self )
@@ -1435,9 +1438,7 @@ def fromdict(d):
1435
1438
ret .samples = [Sample (** sd ) for sd in d ["samples" ]]
1436
1439
ret .contigs = [Contig (** sd ) for sd in d ["contigs" ]]
1437
1440
ret .filters = [Filter (** sd ) for sd in d ["filters" ]]
1438
- ret .fields = {
1439
- key : ZarrColumnSpec (** value ) for key , value in d ["fields" ].items ()
1440
- }
1441
+ ret .fields = [ZarrColumnSpec (** sd ) for sd in d ["fields" ]]
1441
1442
return ret
1442
1443
1443
1444
@staticmethod
@@ -1572,8 +1573,7 @@ def fixed_field_spec(
1572
1573
format_version = ZARR_SCHEMA_FORMAT_VERSION ,
1573
1574
samples_chunk_size = samples_chunk_size ,
1574
1575
variants_chunk_size = variants_chunk_size ,
1575
- fields = {col .name : col for col in colspecs },
1576
- dimensions = ["variants" , "samples" , "ploidy" , "alleles" , "filters" ],
1576
+ fields = colspecs ,
1577
1577
samples = icf .metadata .samples ,
1578
1578
contigs = icf .metadata .contigs ,
1579
1579
filters = icf .metadata .filters ,
@@ -1701,6 +1701,12 @@ def schema(self):
1701
1701
def num_partitions (self ):
1702
1702
return len (self .metadata .partitions )
1703
1703
1704
+ def has_genotypes (self ):
1705
+ for field in self .schema .fields :
1706
+ if field .name == "call_genotype" :
1707
+ return True
1708
+ return False
1709
+
1704
1710
#######################
1705
1711
# init
1706
1712
#######################
@@ -1760,7 +1766,7 @@ def init(
1760
1766
root = zarr .group (store = store )
1761
1767
1762
1768
total_chunks = 0
1763
- for field in self .schema .fields . values () :
1769
+ for field in self .schema .fields :
1764
1770
a = self .init_array (root , field , partitions [- 1 ].stop )
1765
1771
total_chunks += a .nchunks
1766
1772
@@ -1778,9 +1784,7 @@ def init(
1778
1784
1779
1785
def encode_samples (self , root ):
1780
1786
if self .schema .samples != self .icf .metadata .samples :
1781
- raise ValueError (
1782
- "Subsetting or reordering samples not supported currently"
1783
- ) # NEEDS TEST
1787
+ raise ValueError ("Subsetting or reordering samples not supported currently" )
1784
1788
array = root .array (
1785
1789
"sample_id" ,
1786
1790
[sample .id for sample in self .schema .samples ],
@@ -1880,10 +1884,10 @@ def encode_partition(self, partition_index):
1880
1884
self .encode_filters_partition (partition_index )
1881
1885
self .encode_contig_partition (partition_index )
1882
1886
self .encode_alleles_partition (partition_index )
1883
- for col in self .schema .fields . values () :
1887
+ for col in self .schema .fields :
1884
1888
if col .vcf_field is not None :
1885
1889
self .encode_array_partition (col , partition_index )
1886
- if "call_genotype" in self .schema . fields :
1890
+ if self .has_genotypes () :
1887
1891
self .encode_genotypes_partition (partition_index )
1888
1892
1889
1893
final_path = self .partition_path (partition_index )
@@ -2100,8 +2104,8 @@ def finalise(self, show_progress=False):
2100
2104
# for multiple workers, or making a standard wrapper for tqdm
2101
2105
# that allows us to have a consistent look and feel.
2102
2106
with core .ParallelWorkManager (0 , progress_config ) as pwm :
2103
- for name in self .schema .fields :
2104
- pwm .submit (self .finalise_array , name )
2107
+ for field in self .schema .fields :
2108
+ pwm .submit (self .finalise_array , field . name )
2105
2109
logger .debug (f"Removing { self .wip_path } " )
2106
2110
shutil .rmtree (self .wip_path )
2107
2111
logger .info ("Consolidating Zarr metadata" )
@@ -2116,17 +2120,14 @@ def get_max_encoding_memory(self):
2116
2120
Return the approximate maximum memory used to encode a variant chunk.
2117
2121
"""
2118
2122
max_encoding_mem = 0
2119
- for col in self .schema .fields . values () :
2123
+ for col in self .schema .fields :
2120
2124
max_encoding_mem = max (max_encoding_mem , col .variant_chunk_nbytes )
2121
2125
gt_mem = 0
2122
- if "call_genotype" in self .schema .fields :
2123
- encoded_together = [
2124
- "call_genotype" ,
2125
- "call_genotype_phased" ,
2126
- "call_genotype_mask" ,
2127
- ]
2126
+ if self .has_genotypes :
2128
2127
gt_mem = sum (
2129
- self .schema .fields [col ].variant_chunk_nbytes for col in encoded_together
2128
+ field .variant_chunk_nbytes
2129
+ for field in self .schema .fields
2130
+ if field .name .startswith ("call_genotype" )
2130
2131
)
2131
2132
return max (max_encoding_mem , gt_mem )
2132
2133
@@ -2158,7 +2159,7 @@ def encode_all_partitions(
2158
2159
num_workers = min (max_num_workers , worker_processes )
2159
2160
2160
2161
total_bytes = 0
2161
- for col in self .schema .fields . values () :
2162
+ for col in self .schema .fields :
2162
2163
# Open the array definition to get the total size
2163
2164
total_bytes += zarr .open (self .arrays_path / col .name ).nbytes
2164
2165
@@ -2273,7 +2274,7 @@ def convert(
2273
2274
# TODO add arguments to control location of tmpdir
2274
2275
):
2275
2276
with tempfile .TemporaryDirectory (prefix = "vcf2zarr" ) as tmp :
2276
- if_dir = pathlib .Path (tmp ) / "if "
2277
+ if_dir = pathlib .Path (tmp ) / "icf "
2277
2278
explode (
2278
2279
if_dir ,
2279
2280
vcfs ,
0 commit comments