@@ -1540,15 +1540,6 @@ def summary_table(self):
1540
1540
return data
1541
1541
1542
1542
1543
- @dataclasses .dataclass
1544
- class EncodingWork :
1545
- func : callable = dataclasses .field (repr = False )
1546
- start : int
1547
- stop : int
1548
- columns : list [str ]
1549
- memory : int = 0
1550
-
1551
-
1552
1543
def parse_max_memory (max_memory ):
1553
1544
if max_memory is None :
1554
1545
# Effectively unbounded
@@ -1640,7 +1631,7 @@ def init(
1640
1631
):
1641
1632
self .icf = icf
1642
1633
if self .path .exists ():
1643
- raise ValueError ("Zarr path already exists" )
1634
+ raise ValueError ("Zarr path already exists" ) # NEEDS TEST
1644
1635
partitions = VcfZarrPartition .generate_partitions (
1645
1636
self .icf .num_records ,
1646
1637
schema .variants_chunk_size ,
@@ -1807,6 +1798,7 @@ def finalise_partition_array(self, partition_index, name):
1807
1798
wip_path = self .wip_partition_array_path (partition_index , name )
1808
1799
final_path = self .partition_array_path (partition_index , name )
1809
1800
if final_path .exists ():
1801
+ # NEEDS TEST
1810
1802
logger .warning (f"Removing existing { final_path } " )
1811
1803
shutil .rmtree (final_path )
1812
1804
# Atomic swap
@@ -1923,7 +1915,7 @@ def encode_filters_partition(self, partition_index):
1923
1915
var_filter .buff [j , lookup [f ]] = True
1924
1916
except KeyError :
1925
1917
raise ValueError (
1926
- f"Filter '{ f } ' was not defined " f" in the header."
1918
+ f"Filter '{ f } ' was not defined in the header."
1927
1919
) from None
1928
1920
var_filter .flush ()
1929
1921
@@ -1956,6 +1948,7 @@ def finalise_array(self, name):
1956
1948
logger .info (f"Finalising { name } " )
1957
1949
final_path = self .path / name
1958
1950
if final_path .exists ():
1951
+ # NEEDS TEST
1959
1952
raise ValueError (f"Array { name } already exists" )
1960
1953
for partition in range (len (self .metadata .partitions )):
1961
1954
# Move all the files in partition dir to dest dir
@@ -1992,7 +1985,12 @@ def finalise(self, show_progress=False):
1992
1985
# NOTE: it's not clear that adding more workers will make this quicker,
1993
1986
# as it's just going to be causing contention on the file system.
1994
1987
# Something to check empirically in some deployments.
1995
- with core .ParallelWorkManager (1 , progress_config ) as pwm :
1988
+ # FIXME we're just using worker_processes=0 here to hook into the
1989
+ # SynchronousExecutor which is intended for testing purposes so
1990
+ # that we get test coverage. Should fix this either by allowing
1991
+ # for multiple workers, or making a standard wrapper for tqdm
1992
+ # that allows us to have a consistent look and feel.
1993
+ with core .ParallelWorkManager (0 , progress_config ) as pwm :
1996
1994
for name in self .metadata .schema .columns :
1997
1995
pwm .submit (self .finalise_array , name )
1998
1996
zarr .consolidate_metadata (self .path )
@@ -2131,11 +2129,9 @@ def encode_init(
2131
2129
)
2132
2130
2133
2131
2134
- def encode_partition (zarr_path , partition , * , show_progress = False , worker_processes = 1 ):
2132
+ def encode_partition (zarr_path , partition ):
2135
2133
writer = VcfZarrWriter (zarr_path )
2136
- writer .encode_partition (
2137
- partition , show_progress = show_progress , worker_processes = worker_processes
2138
- )
2134
+ writer .encode_partition (partition )
2139
2135
2140
2136
2141
2137
def encode_finalise (zarr_path , show_progress = False ):
0 commit comments