Skip to content

Commit 3245833

Browse files
committed
Address comments
1 parent 5897755 commit 3245833

File tree

2 files changed

+35
-37
lines changed

2 files changed

+35
-37
lines changed

bio2zarr/cli.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"--column-chunk-size",
2020
type=int,
2121
default=64,
22-
help="Chunk size in the columns dimension",
22+
help="Size of exploded column chunks",
2323
)
2424

2525
# Note: -l and -w were chosen when these were called "width" and "length".
@@ -81,7 +81,7 @@ def explode(vcfs, out_path, verbose, worker_processes, column_chunk_size):
8181
@click.command
8282
@click.argument("vcfs", nargs=-1, required=True)
8383
@click.argument("out_path", type=click.Path())
84-
@click.option("-n", "--target_num_partitions", type=int, required=True)
84+
@click.option("-n", "--target-num-partitions", type=int, required=True)
8585
@verbose
8686
@worker_processes
8787
def explode_init(vcfs, out_path, target_num_partitions, verbose, worker_processes):

bio2zarr/vcf.py

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ class VcfMetadata:
159159
fields: list
160160
partitions: list = None
161161
contig_lengths: list = None
162-
finalised: bool = False
163162

164163
@property
165164
def info_fields(self):
@@ -884,10 +883,15 @@ def mkdirs(self):
884883
part_path = col.path / f"p{j}"
885884
part_path.mkdir()
886885

887-
def write_metadata(self):
888-
logger.info(f"Writing metadata")
889-
with open(self.path / "metadata.json", "w") as f:
886+
def write_metadata(self, final=False):
887+
logger.info(f"Writing metadata ({'final' if final else 'initial'})")
888+
with open(self.path / f"metadata.{'wip.' if not final else ''}json", "w") as f:
890889
json.dump(self.metadata.asdict(), f, indent=4)
890+
if final:
891+
try:
892+
os.remove(self.path / "metadata.wip.json")
893+
except FileNotFoundError:
894+
pass
891895

892896
def write_header(self):
893897
logger.info(f"Writing header")
@@ -899,7 +903,7 @@ def load_partition_summaries(self):
899903
not_found = []
900904
for j in range(self.num_partitions):
901905
try:
902-
with open(self.path / f"partition_{j}_metadata.json") as f:
906+
with open(self.path / f"p{j}_metadata.json") as f:
903907
summary = json.load(f)
904908
for k, v in summary['field_summaries'].items():
905909
summary['field_summaries'][k] = VcfFieldSummary(**v)
@@ -916,14 +920,20 @@ def load_partition_summaries(self):
916920
@staticmethod
917921
def load(path):
918922
path = pathlib.Path(path)
919-
with open(path / "metadata.json") as f:
920-
metadata = VcfMetadata.fromdict(json.load(f))
923+
final = True
924+
try:
925+
with open(path / "metadata.json") as f:
926+
metadata = VcfMetadata.fromdict(json.load(f))
927+
except FileNotFoundError:
928+
with open(path / "metadata.wip.json") as f:
929+
metadata = VcfMetadata.fromdict(json.load(f))
930+
final = False
921931
with open(path / "header.txt") as f:
922932
header = f.read()
923933
pcvcf = PickleChunkedVcf(path, metadata, header)
924934
logger.info(
925935
f"Loaded PickleChunkedVcf(partitions={pcvcf.num_partitions}, "
926-
f"records={pcvcf.num_records}, columns={pcvcf.num_columns})"
936+
f"records={pcvcf.num_records}, columns={pcvcf.num_columns}), final={final}"
927937
)
928938
return pcvcf
929939

@@ -987,7 +997,7 @@ def convert_partition(
987997
"num_records": num_records,
988998
"field_summaries": {k:v.asdict() for k,v in tcw.field_summaries.items()}
989999
}
990-
with open(out_path / f"partition_{partition_index}_metadata.json", "w") as f:
1000+
with open(out_path / f"p{partition_index}_metadata.json", "w") as f:
9911001
json.dump(partition_metadata, f, indent=4)
9921002
logger.info(
9931003
f"Finish p{partition_index} {partition.vcf_path}__{partition.region}="
@@ -1007,7 +1017,7 @@ def convert_init(vcfs, out_path, *, num_partitions=1, worker_processes=1, show_p
10071017
pcvcf = PickleChunkedVcf(out_path, vcf_metadata, header)
10081018
pcvcf.mkdirs()
10091019

1010-
pcvcf.write_metadata()
1020+
pcvcf.write_metadata(final=False)
10111021
pcvcf.write_header()
10121022
return pcvcf
10131023

@@ -1035,30 +1045,19 @@ def convert_slice(self, start, stop, *, worker_processes=1, show_progress=False,
10351045
title="Explode",
10361046
show=show_progress,
10371047
)
1038-
if stop-start == 1:
1039-
PickleChunkedVcf.convert_partition(
1040-
self.metadata,
1041-
start,
1042-
self.path,
1043-
column_chunk_size=column_chunk_size,
1044-
)
1045-
else:
1046-
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
1047-
for j in range(start, stop):
1048-
pwm.submit(
1049-
PickleChunkedVcf.convert_partition,
1050-
self.metadata,
1051-
j,
1052-
self.path,
1053-
column_chunk_size=column_chunk_size,
1054-
)
1055-
for _ in pwm.results_as_completed():
1056-
pass
1048+
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
1049+
for j in range(start, stop):
1050+
pwm.submit(
1051+
PickleChunkedVcf.convert_partition,
1052+
self.metadata,
1053+
j,
1054+
self.path,
1055+
column_chunk_size=column_chunk_size,
1056+
)
1057+
for _ in pwm.results_as_completed():
1058+
pass
10571059

10581060
def convert_finalise(self):
1059-
if self.metadata.finalised:
1060-
raise ValueError("Already finalised")
1061-
10621061
partition_summaries = self.load_partition_summaries()
10631062
for index, summary in enumerate(partition_summaries):
10641063
self.metadata.partitions[index].num_records = summary['num_records']
@@ -1074,14 +1073,13 @@ def convert_finalise(self):
10741073
for summary in partition_summaries:
10751074
field.summary.update(summary["field_summaries"][field.full_name])
10761075

1077-
self.metadata.finalised = True
1078-
self.write_metadata()
1076+
self.write_metadata(final=True)
10791077

10801078
@staticmethod
10811079
def convert(
10821080
vcfs, out_path, *, column_chunk_size=16, worker_processes=1, show_progress=False
10831081
):
1084-
pcvcf = PickleChunkedVcf.convert_init(vcfs, out_path, num_partitions=max(1, worker_processes * 40), worker_processes=worker_processes, show_progress=show_progress)
1082+
pcvcf = PickleChunkedVcf.convert_init(vcfs, out_path, num_partitions=max(1, worker_processes * 4), worker_processes=worker_processes, show_progress=show_progress)
10851083
pcvcf.convert_slice(0, len(pcvcf.metadata.partitions), worker_processes=worker_processes, show_progress=show_progress, column_chunk_size=column_chunk_size)
10861084
pcvcf.convert_finalise()
10871085

0 commit comments

Comments
 (0)