@@ -1323,12 +1323,9 @@ def variant_chunk_nbytes(self):
1323
1323
"""
1324
1324
Returns the nbytes for a single variant chunk of this array.
1325
1325
"""
1326
- chunk_items = 1
1327
- for dim , size in enumerate (self .shape ):
1328
- chunk_dim_size = size
1329
- if dim < len (self .chunks ):
1330
- chunk_dim_size = self .chunks [dim ]
1331
- chunk_items *= chunk_dim_size
1326
+ chunk_items = self .chunks [0 ]
1327
+ for size in self .shape [1 :]:
1328
+ chunk_items *= size
1332
1329
dt = np .dtype (self .dtype )
1333
1330
return chunk_items * dt .itemsize
1334
1331
@@ -1616,6 +1613,10 @@ def __init__(self, path):
1616
1613
def schema (self ):
1617
1614
return self .metadata .schema
1618
1615
1616
+ @property
1617
+ def num_partitions (self ):
1618
+ return len (self .metadata .partitions )
1619
+
1619
1620
#######################
1620
1621
# init
1621
1622
#######################
@@ -1778,10 +1779,10 @@ def encode_partition(self, partition_index):
1778
1779
self .encode_id_partition (partition_index )
1779
1780
self .encode_filters_partition (partition_index )
1780
1781
self .encode_contig_partition (partition_index )
1781
- for col in self .metadata . schema .columns .values ():
1782
+ for col in self .schema .columns .values ():
1782
1783
if col .vcf_field is not None :
1783
1784
self .encode_array_partition (col , partition_index )
1784
- if "call_genotype" in self .metadata . schema .columns :
1785
+ if "call_genotype" in self .schema .columns :
1785
1786
self .encode_genotypes_partition (partition_index )
1786
1787
1787
1788
def init_partition_array (self , partition_index , name ):
@@ -1954,6 +1955,7 @@ def finalise_array(self, name):
1954
1955
# Move all the files in partition dir to dest dir
1955
1956
src = self .partition_array_path (partition , name )
1956
1957
if not src .exists ():
1958
+ # Needs test
1957
1959
raise ValueError (f"Partition { partition } of { name } does not exist" )
1958
1960
dest = self .arrays_path / name
1959
1961
# This is Zarr v2 specific. Chunks in v3 with start with "c" prefix.
@@ -1977,7 +1979,7 @@ def finalise(self, show_progress=False):
1977
1979
self .load_metadata ()
1978
1980
1979
1981
progress_config = core .ProgressConfig (
1980
- total = len (self .metadata . schema .columns ),
1982
+ total = len (self .schema .columns ),
1981
1983
title = "Finalise" ,
1982
1984
units = "array" ,
1983
1985
show = show_progress ,
@@ -1991,7 +1993,7 @@ def finalise(self, show_progress=False):
1991
1993
# for multiple workers, or making a standard wrapper for tqdm
1992
1994
# that allows us to have a consistent look and feel.
1993
1995
with core .ParallelWorkManager (0 , progress_config ) as pwm :
1994
- for name in self .metadata . schema .columns :
1996
+ for name in self .schema .columns :
1995
1997
pwm .submit (self .finalise_array , name )
1996
1998
zarr .consolidate_metadata (self .path )
1997
1999
@@ -2003,16 +2005,28 @@ def get_max_encoding_memory(self):
2003
2005
"""
2004
2006
Return the approximate maximum memory used to encode a variant chunk.
2005
2007
"""
2006
- return max (
2007
- col .variant_chunk_nbytes for col in self .metadata . schema .columns .values ()
2008
+ max_encoding_mem = max (
2009
+ col .variant_chunk_nbytes for col in self .schema .columns .values ()
2008
2010
)
2011
+ gt_mem = 0
2012
+ if "call_genotype" in self .schema .columns :
2013
+ encoded_together = [
2014
+ "call_genotype" ,
2015
+ "call_genotype_phased" ,
2016
+ "call_genotype_mask" ,
2017
+ ]
2018
+ gt_mem = sum (
2019
+ self .schema .columns [col ].variant_chunk_nbytes
2020
+ for col in encoded_together
2021
+ )
2022
+ return max (max_encoding_mem , gt_mem )
2009
2023
2010
2024
def encode_all_partitions (
2011
2025
self , * , worker_processes = 1 , show_progress = False , max_memory = None
2012
2026
):
2013
2027
max_memory = parse_max_memory (max_memory )
2014
2028
self .load_metadata ()
2015
- num_partitions = len (self .metadata . partitions )
2029
+ num_partitions = len (self .num_partitions )
2016
2030
per_worker_memory = self .get_max_encoding_memory ()
2017
2031
logger .info (
2018
2032
f"Encoding Zarr over { num_partitions } partitions with "
@@ -2120,13 +2134,14 @@ def encode_init(
2120
2134
schema = VcfZarrSchema .fromjson (f .read ())
2121
2135
zarr_path = pathlib .Path (zarr_path )
2122
2136
vzw = VcfZarrWriter (zarr_path )
2123
- return vzw .init (
2137
+ vzw .init (
2124
2138
icf ,
2125
2139
target_num_partitions = target_num_partitions ,
2126
2140
schema = schema ,
2127
2141
dimension_separator = dimension_separator ,
2128
2142
max_variant_chunks = max_variant_chunks ,
2129
2143
)
2144
+ return vzw .num_partitions , vzw .get_max_encoding_memory ()
2130
2145
2131
2146
2132
2147
def encode_partition (zarr_path , partition ):
0 commit comments