Skip to content

Commit 384e92f

Browse files
Merge pull request #79 from benjeffery/explode-slice-cli
Explode slice cli
2 parents 8874dea + c0ca12b commit 384e92f

File tree

5 files changed

+297
-36
lines changed

5 files changed

+297
-36
lines changed

bio2zarr/cli.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,17 @@
1414
"-p", "--worker-processes", type=int, default=1, help="Number of worker processes"
1515
)
1616

17+
column_chunk_size = click.option(
18+
"-c",
19+
"--column-chunk-size",
20+
type=int,
21+
default=64,
22+
help="Size of exploded column chunks",
23+
)
24+
1725
# Note: -l and -w were chosen when these were called "width" and "length".
1826
# possibly there are better letters now.
27+
# TODO help text
1928
variants_chunk_size = click.option(
2029
"-l",
2130
"--variants-chunk-size",
@@ -55,7 +64,7 @@ def setup_logging(verbosity):
5564
@click.argument("out_path", type=click.Path())
5665
@verbose
5766
@worker_processes
58-
@click.option("-c", "--column-chunk-size", type=int, default=64)
67+
@column_chunk_size
5968
def explode(vcfs, out_path, verbose, worker_processes, column_chunk_size):
6069
"""
6170
Convert VCF(s) to columnar intermediate format
@@ -69,6 +78,63 @@ def explode(vcfs, out_path, verbose, worker_processes, column_chunk_size):
6978
show_progress=True,
7079
)
7180

81+
@click.command
82+
@click.argument("vcfs", nargs=-1, required=True)
83+
@click.argument("out_path", type=click.Path())
84+
@click.option("-n", "--target-num-partitions", type=int, required=True)
85+
@verbose
86+
@worker_processes
87+
def explode_init(vcfs, out_path, target_num_partitions, verbose, worker_processes):
88+
"""
89+
Initial step for parallel conversion of VCF(s) to columnar intermediate format
90+
"""
91+
setup_logging(verbose)
92+
vcf.explode_init(
93+
vcfs,
94+
out_path,
95+
target_num_partitions=target_num_partitions,
96+
worker_processes=worker_processes,
97+
show_progress=True,
98+
)
99+
100+
@click.command
101+
@click.argument("path", type=click.Path())
102+
def explode_partition_count(path):
103+
"""
104+
Count the actual number of partitions in a parallel conversion of VCF(s) to columnar intermediate format
105+
"""
106+
print(vcf.explode_partition_count(path))
107+
108+
@click.command
109+
@click.argument("path", type=click.Path(), required=True)
110+
@click.option("-s", "--start", type=int, required=True)
111+
@click.option("-e", "--end", type=int, required=True)
112+
@verbose
113+
@worker_processes
114+
@column_chunk_size
115+
def explode_slice(path, start, end, verbose, worker_processes, column_chunk_size):
116+
"""
117+
Convert VCF(s) to columnar intermediate format
118+
"""
119+
setup_logging(verbose)
120+
vcf.explode_slice(
121+
path,
122+
start,
123+
end,
124+
worker_processes=worker_processes,
125+
column_chunk_size=column_chunk_size,
126+
show_progress=True,
127+
)
128+
129+
@click.command
130+
@click.argument("path", type=click.Path(), required=True)
131+
@verbose
132+
def explode_finalise(path, verbose):
133+
"""
134+
Final step for parallel conversion of VCF(s) to columnar intermediate format
135+
"""
136+
setup_logging(verbose)
137+
vcf.explode_finalise(path)
72138

73139
@click.command
74140
@click.argument("if_path", type=click.Path())
@@ -189,6 +255,10 @@ def vcf2zarr():
189255

190256
# TODO figure out how to get click to list these in the given order.
191257
vcf2zarr.add_command(explode)
258+
vcf2zarr.add_command(explode_init)
259+
vcf2zarr.add_command(explode_partition_count)
260+
vcf2zarr.add_command(explode_slice)
261+
vcf2zarr.add_command(explode_finalise)
192262
vcf2zarr.add_command(inspect)
193263
vcf2zarr.add_command(mkschema)
194264
vcf2zarr.add_command(encode)

bio2zarr/vcf.py

Lines changed: 138 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ def update(self, other):
6969
self.min_value = min(self.min_value, other.min_value)
7070
self.max_value = max(self.max_value, other.max_value)
7171

72+
def asdict(self):
73+
return dataclasses.asdict(self)
74+
75+
@staticmethod
76+
def fromdict(d):
77+
return VcfFieldSummary(**d)
78+
7279

7380
@dataclasses.dataclass
7481
class VcfField:
@@ -185,6 +192,8 @@ def fromdict(d):
185192
)
186193
fields = [VcfField.fromdict(fd) for fd in d["fields"]]
187194
partitions = [VcfPartition(**pd) for pd in d["partitions"]]
195+
for p in partitions:
196+
p.region = vcf_utils.Region(**p.region)
188197
d = d.copy()
189198
d["fields"] = fields
190199
d["partitions"] = partitions
@@ -270,7 +279,7 @@ def scan_vcf(path, target_num_partitions):
270279

271280

272281
def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1):
273-
logger.info(f"Scanning {len(paths)} VCFs")
282+
logger.info(f"Scanning {len(paths)} VCFs attempting to split into {target_num_partitions} partitions.")
274283
progress_config = core.ProgressConfig(
275284
total=len(paths),
276285
units="files",
@@ -279,7 +288,7 @@ def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1):
279288
)
280289
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
281290
for path in paths:
282-
pwm.submit(scan_vcf, path, target_num_partitions)
291+
pwm.submit(scan_vcf, path, max(1, target_num_partitions//len(paths)))
283292
results = list(pwm.results_as_completed())
284293

285294
# Sort to make the ordering deterministic
@@ -308,6 +317,7 @@ def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1):
308317
key=lambda x: (contig_index_map[x.region.contig], x.region.start)
309318
)
310319
vcf_metadata.partitions = all_partitions
320+
logger.info(f"Scan complete, resulting in {len(all_partitions)} partitions.")
311321
return vcf_metadata, header
312322

313323

@@ -872,24 +882,65 @@ def num_columns(self):
872882
return len(self.columns)
873883

874884
def mkdirs(self):
885+
logger.info(f"Creating {len(self.columns) * self.num_partitions} directories")
875886
self.path.mkdir()
876887
for col in self.columns.values():
877888
col.path.mkdir(parents=True)
878889
for j in range(self.num_partitions):
879890
part_path = col.path / f"p{j}"
880891
part_path.mkdir()
881892

893+
def write_metadata(self, final=False):
894+
logger.info(f"Writing metadata ({'final' if final else 'initial'})")
895+
with open(self.path / f"metadata.{'wip.' if not final else ''}json", "w") as f:
896+
json.dump(self.metadata.asdict(), f, indent=4)
897+
if final:
898+
try:
899+
os.remove(self.path / "metadata.wip.json")
900+
except FileNotFoundError:
901+
pass
902+
903+
def write_header(self):
904+
logger.info(f"Writing header")
905+
with open(self.path / "header.txt", "w") as f:
906+
f.write(self.vcf_header)
907+
908+
def load_partition_summaries(self):
909+
summaries = []
910+
not_found = []
911+
for j in range(self.num_partitions):
912+
try:
913+
with open(self.path / f"p{j}_metadata.json") as f:
914+
summary = json.load(f)
915+
for k, v in summary['field_summaries'].items():
916+
summary['field_summaries'][k] = VcfFieldSummary(**v)
917+
summaries.append(summary)
918+
except FileNotFoundError:
919+
not_found.append(j)
920+
if not_found:
921+
raise FileNotFoundError(
922+
f"Partition metadata not found for {len(not_found)} partitions: {not_found}"
923+
)
924+
return summaries
925+
926+
882927
@staticmethod
883928
def load(path):
884929
path = pathlib.Path(path)
885-
with open(path / "metadata.json") as f:
886-
metadata = VcfMetadata.fromdict(json.load(f))
930+
final = True
931+
try:
932+
with open(path / "metadata.json") as f:
933+
metadata = VcfMetadata.fromdict(json.load(f))
934+
except FileNotFoundError:
935+
with open(path / "metadata.wip.json") as f:
936+
metadata = VcfMetadata.fromdict(json.load(f))
937+
final = False
887938
with open(path / "header.txt") as f:
888939
header = f.read()
889940
pcvcf = PickleChunkedVcf(path, metadata, header)
890941
logger.info(
891942
f"Loaded PickleChunkedVcf(partitions={pcvcf.num_partitions}, "
892-
f"records={pcvcf.num_records}, columns={pcvcf.num_columns})"
943+
f"records={pcvcf.num_records}, columns={pcvcf.num_columns}), final={final}"
893944
)
894945
return pcvcf
895946

@@ -949,70 +1000,96 @@ def convert_partition(
9491000
# is that we get a significant pause at the end of the counter while
9501001
# all the "small" fields get flushed. Possibly not much to be done about it.
9511002
core.update_progress(1)
952-
1003+
partition_metadata = {
1004+
"num_records": num_records,
1005+
"field_summaries": {k:v.asdict() for k,v in tcw.field_summaries.items()}
1006+
}
1007+
with open(out_path / f"p{partition_index}_metadata.json", "w") as f:
1008+
json.dump(partition_metadata, f, indent=4)
9531009
logger.info(
9541010
f"Finish p{partition_index} {partition.vcf_path}__{partition.region}="
9551011
f"{num_records} records"
9561012
)
957-
return partition_index, tcw.field_summaries, num_records
9581013

9591014
@staticmethod
960-
def convert(
961-
vcfs, out_path, *, column_chunk_size=16, worker_processes=1, show_progress=False
962-
):
1015+
def convert_init(vcfs, out_path, *, num_partitions=1, worker_processes=1, show_progress=False):
9631016
out_path = pathlib.Path(out_path)
9641017
# TODO make scan work in parallel using general progress code too
965-
target_num_partitions = max(1, worker_processes * 4)
9661018
vcf_metadata, header = scan_vcfs(
9671019
vcfs,
9681020
worker_processes=worker_processes,
9691021
show_progress=show_progress,
970-
target_num_partitions=target_num_partitions,
1022+
target_num_partitions=num_partitions,
9711023
)
9721024
pcvcf = PickleChunkedVcf(out_path, vcf_metadata, header)
9731025
pcvcf.mkdirs()
9741026

975-
logger.info(
976-
f"Exploding {pcvcf.num_columns} columns {vcf_metadata.num_records} variants "
977-
f"{pcvcf.num_samples} samples"
978-
)
1027+
pcvcf.write_metadata(final=False)
1028+
pcvcf.write_header()
1029+
return pcvcf
1030+
1031+
def convert_slice(self, start, stop, *, worker_processes=1, show_progress=False, column_chunk_size=16):
1032+
if start < 0:
1033+
raise ValueError(f"start={start} must be non-negative")
1034+
if stop > self.num_partitions:
1035+
raise ValueError(f"stop={stop} must be less than the number of partitions")
1036+
if start == 0 and stop == self.num_partitions:
1037+
num_records_to_process = self.num_records
1038+
logger.info(
1039+
f"Exploding {self.num_columns} columns {self.metadata.num_records} variants "
1040+
f"{self.num_samples} samples"
1041+
)
1042+
else:
1043+
num_records_to_process = None
1044+
logger.info(
1045+
f"Exploding {self.num_columns} columns {self.num_samples} samples"
1046+
f" from partitions {start} to {stop}"
1047+
)
1048+
9791049
progress_config = core.ProgressConfig(
980-
total=vcf_metadata.num_records,
1050+
total=num_records_to_process,
9811051
units="vars",
9821052
title="Explode",
9831053
show=show_progress,
9841054
)
9851055
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
986-
for j, partition in enumerate(vcf_metadata.partitions):
1056+
for j in range(start, stop):
9871057
pwm.submit(
9881058
PickleChunkedVcf.convert_partition,
989-
vcf_metadata,
1059+
self.metadata,
9901060
j,
991-
out_path,
1061+
self.path,
9921062
column_chunk_size=column_chunk_size,
9931063
)
994-
num_records = 0
995-
partition_summaries = []
996-
for index, summary, num_records in pwm.results_as_completed():
997-
partition_summaries.append(summary)
998-
vcf_metadata.partitions[index].num_records = num_records
1064+
for _ in pwm.results_as_completed():
1065+
pass
9991066

1067+
def convert_finalise(self):
1068+
partition_summaries = self.load_partition_summaries()
1069+
for index, summary in enumerate(partition_summaries):
1070+
self.metadata.partitions[index].num_records = summary['num_records']
10001071
total_records = sum(
1001-
partition.num_records for partition in vcf_metadata.partitions
1072+
partition.num_records for partition in self.metadata.partitions
10021073
)
1003-
assert total_records == pcvcf.num_records
1074+
assert total_records == self.num_records
10041075

1005-
for field in vcf_metadata.fields:
1076+
for field in self.metadata.fields:
10061077
# Clear the summary to avoid problems when running in debug
1007-
# syncronous mode
1078+
# synchronous mode
10081079
field.summary = VcfFieldSummary()
10091080
for summary in partition_summaries:
1010-
field.summary.update(summary[field.full_name])
1081+
field.summary.update(summary["field_summaries"][field.full_name])
1082+
1083+
self.write_metadata(final=True)
1084+
1085+
@staticmethod
1086+
def convert(
1087+
vcfs, out_path, *, column_chunk_size=16, worker_processes=1, show_progress=False
1088+
):
1089+
pcvcf = PickleChunkedVcf.convert_init(vcfs, out_path, num_partitions=max(1, worker_processes * 4), worker_processes=worker_processes, show_progress=show_progress)
1090+
pcvcf.convert_slice(0, len(pcvcf.metadata.partitions), worker_processes=worker_processes, show_progress=show_progress, column_chunk_size=column_chunk_size)
1091+
pcvcf.convert_finalise()
10111092

1012-
with open(out_path / "metadata.json", "w") as f:
1013-
json.dump(vcf_metadata.asdict(), f, indent=4)
1014-
with open(out_path / "header.txt", "w") as f:
1015-
f.write(header)
10161093

10171094

10181095
def explode(
@@ -1036,6 +1113,33 @@ def explode(
10361113
)
10371114
return PickleChunkedVcf.load(out_path)
10381115

1116+
def explode_init(vcfs, out_path, *, target_num_partitions=1, worker_processes=1, show_progress=False):
1117+
out_path = pathlib.Path(out_path)
1118+
if out_path.exists():
1119+
shutil.rmtree(out_path)
1120+
# Error if num_parts less than number of files
1121+
if target_num_partitions < len(vcfs):
1122+
raise ValueError("num_partitions must be greater than or equal to the number of input VCFs")
1123+
return PickleChunkedVcf.convert_init(
1124+
vcfs,
1125+
out_path,
1126+
num_partitions=target_num_partitions,
1127+
worker_processes=worker_processes,
1128+
show_progress=show_progress,
1129+
)
1130+
1131+
def explode_partition_count(out_path):
1132+
pcvcf = PickleChunkedVcf.load(out_path)
1133+
return pcvcf.num_partitions
1134+
1135+
1136+
def explode_slice(out_path, start, stop, *, worker_processes=1, show_progress=False, column_chunk_size=16):
1137+
pcvcf = PickleChunkedVcf.load(out_path)
1138+
pcvcf.convert_slice(start, stop, worker_processes=worker_processes, show_progress=show_progress, column_chunk_size=column_chunk_size)
1139+
1140+
def explode_finalise(out_path):
1141+
pcvcf = PickleChunkedVcf.load(out_path)
1142+
pcvcf.convert_finalise()
10391143

10401144
def inspect(path):
10411145
path = pathlib.Path(path)

bio2zarr/vcf_utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,6 @@ def partition_into_regions(
464464
elif target_part_size_bytes is not None:
465465
num_parts = ceildiv(file_length, target_part_size_bytes)
466466
part_lengths = np.array([i * target_part_size_bytes for i in range(num_parts)])
467-
468467
file_offsets, region_contig_indexes, region_positions = self.index.offsets()
469468

470469
# Search the file offsets to find which indexes the part lengths fall at

0 commit comments

Comments
 (0)