Skip to content

Commit fdaaefb

Browse files
committed
Add CLI for explode slices
1 parent fbe585e commit fdaaefb

File tree

4 files changed

+110
-6
lines changed

4 files changed

+110
-6
lines changed

bio2zarr/cli.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,17 @@
1414
"-p", "--worker-processes", type=int, default=1, help="Number of worker processes"
1515
)
1616

17+
column_chunk_size = click.option(
18+
"-c",
19+
"--column-chunk-size",
20+
type=int,
21+
default=64,
22+
help="Chunk size in the columns dimension",
23+
)
24+
1725
# Note: -l and -w were chosen when these were called "width" and "length".
1826
# possibly there are better letters now.
27+
# TODO help text
1928
variants_chunk_size = click.option(
2029
"-l",
2130
"--variants-chunk-size",
@@ -55,7 +64,7 @@ def setup_logging(verbosity):
5564
@click.argument("out_path", type=click.Path())
5665
@verbose
5766
@worker_processes
58-
@click.option("-c", "--column-chunk-size", type=int, default=64)
67+
@column_chunk_size
5968
def explode(vcfs, out_path, verbose, worker_processes, column_chunk_size):
6069
"""
6170
Convert VCF(s) to columnar intermediate format
@@ -69,6 +78,53 @@ def explode(vcfs, out_path, verbose, worker_processes, column_chunk_size):
6978
show_progress=True,
7079
)
7180

81+
@click.command
82+
@click.argument("vcfs", nargs=-1, required=True)
83+
@click.argument("out_path", type=click.Path())
84+
@verbose
85+
@worker_processes
86+
def explode_init(vcfs, out_path, verbose, worker_processes):
87+
"""
88+
Initial step for parallel conversion of VCF(s) to columnar intermediate format
89+
"""
90+
setup_logging(verbose)
91+
vcf.explode_init(
92+
vcfs,
93+
out_path,
94+
worker_processes=worker_processes,
95+
show_progress=True,
96+
)
97+
98+
@click.command
99+
@click.argument("out_path", type=click.Path(), required=True)
100+
@click.argument("start", type=int, required=True)
101+
@click.argument("end", type=int, required=True)
102+
@verbose
103+
@worker_processes
104+
@column_chunk_size
105+
def explode_slice(out_path, start, end, verbose, worker_processes, column_chunk_size):
106+
"""
107+
Convert VCF(s) to columnar intermediate format
108+
"""
109+
setup_logging(verbose)
110+
vcf.explode_slice(
111+
out_path,
112+
start,
113+
end,
114+
worker_processes=worker_processes,
115+
column_chunk_size=column_chunk_size,
116+
show_progress=True,
117+
)
118+
119+
@click.command
120+
@click.argument("out_path", type=click.Path(), required=True)
121+
@verbose
122+
def explode_finalise(out_path, verbose):
123+
"""
124+
Final step for parallel conversion of VCF(s) to columnar intermediate format
125+
"""
126+
setup_logging(verbose)
127+
vcf.explode_finalise(out_path)
72128

73129
@click.command
74130
@click.argument("if_path", type=click.Path())
@@ -189,6 +245,9 @@ def vcf2zarr():
189245

190246
# TODO figure out how to get click to list these in the given order.
191247
vcf2zarr.add_command(explode)
248+
vcf2zarr.add_command(explode_init)
249+
vcf2zarr.add_command(explode_slice)
250+
vcf2zarr.add_command(explode_finalise)
192251
vcf2zarr.add_command(inspect)
193252
vcf2zarr.add_command(mkschema)
194253
vcf2zarr.add_command(encode)

bio2zarr/vcf.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1):
282282
)
283283
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
284284
for path in paths:
285-
pwm.submit(scan_vcf, path, target_num_partitions)
285+
pwm.submit(scan_vcf, path, target_num_partitions//len(paths))
286286
results = list(pwm.results_as_completed())
287287

288288
# Sort to make the ordering deterministic
@@ -885,6 +885,10 @@ def mkdirs(self):
885885
def write_metadata(self):
886886
with open(self.path / "metadata.json", "w") as f:
887887
json.dump(self.metadata.asdict(), f, indent=4)
888+
# Write number of partitions in a convenience file for
889+
# workflows
890+
with open(self.path / "num_partitions.txt", "w") as f:
891+
f.write(str(self.num_partitions))
888892

889893
def write_header(self):
890894
with open(self.path / "header.txt", "w") as f:
@@ -983,15 +987,14 @@ def convert_partition(
983987
)
984988

985989
@staticmethod
986-
def convert_init(vcfs, out_path, *, worker_processes=1, show_progress=False):
990+
def convert_init(vcfs, out_path, *, num_partitions=1, worker_processes=1, show_progress=False):
987991
out_path = pathlib.Path(out_path)
988992
# TODO make scan work in parallel using general progress code too
989-
target_num_partitions = max(1, worker_processes * 4)
990993
vcf_metadata, header = scan_vcfs(
991994
vcfs,
992995
worker_processes=worker_processes,
993996
show_progress=show_progress,
994-
target_num_partitions=target_num_partitions,
997+
target_num_partitions=num_partitions,
995998
)
996999
pcvcf = PickleChunkedVcf(out_path, vcf_metadata, header)
9971000
pcvcf.mkdirs()
@@ -1075,6 +1078,29 @@ def explode(
10751078
)
10761079
return PickleChunkedVcf.load(out_path)
10771080

1081+
def explode_init(vcfs, out_path, *, num_partitions=1, worker_processes=1, show_progress=False):
1082+
out_path = pathlib.Path(out_path)
1083+
if out_path.exists():
1084+
shutil.rmtree(out_path)
1085+
# Error if num_parts less than number of files
1086+
if num_partitions < len(vcfs):
1087+
raise ValueError("num_partitions must be greater than or equal to the number of input VCFs")
1088+
return PickleChunkedVcf.convert_init(
1089+
vcfs,
1090+
out_path,
1091+
num_partitions=num_partitions,
1092+
worker_processes=worker_processes,
1093+
show_progress=show_progress,
1094+
)
1095+
1096+
1097+
def explode_slice(out_path, start, stop, *, worker_processes=1, show_progress=False, column_chunk_size=16):
1098+
pcvcf = PickleChunkedVcf.load(out_path)
1099+
pcvcf.convert_slice(start, stop, worker_processes=worker_processes, show_progress=show_progress, column_chunk_size=column_chunk_size)
1100+
1101+
def explode_finalise(out_path):
1102+
pcvcf = PickleChunkedVcf.load(out_path)
1103+
pcvcf.convert_finalise()
10781104

10791105
def inspect(path):
10801106
path = pathlib.Path(path)

bio2zarr/vcf_utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,6 @@ def partition_into_regions(
464464
elif target_part_size_bytes is not None:
465465
num_parts = ceildiv(file_length, target_part_size_bytes)
466466
part_lengths = np.array([i * target_part_size_bytes for i in range(num_parts)])
467-
468467
file_offsets, region_contig_indexes, region_positions = self.index.offsets()
469468

470469
# Search the file offsets to find which indexes the part lengths fall at

tests/test_vcf_examples.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,3 +782,23 @@ def test_by_validating_split(source, suffix, files, tmp_path):
782782
out = tmp_path / "test.zarr"
783783
vcf.convert(split_files, out, worker_processes=0)
784784
vcf.validate(source_path, out)
785+
786+
787+
def test_split_explode(tmp_path):
788+
paths = [
789+
"tests/data/vcf/sample.vcf.gz.3.split/19:1-.vcf.gz",
790+
"tests/data/vcf/sample.vcf.gz.3.split/20.vcf.gz",
791+
"tests/data/vcf/sample.vcf.gz.3.split/X.vcf.gz",
792+
]
793+
out = tmp_path / "test.explode"
794+
pcvcf = vcf.explode_init(paths, out, num_partitions=15)
795+
with open(out / "num_partitions.txt", "r") as f:
796+
num_partitions = int(f.read())
797+
assert pcvcf.num_partitions == num_partitions
798+
assert num_partitions == 3
799+
vcf.explode_slice(out, 0, num_partitions)
800+
vcf.explode_finalise(out)
801+
802+
vcf.encode(out, tmp_path / "test.zarr")
803+
804+
vcf.validate("tests/data/vcf/sample.vcf.gz", tmp_path / "test.zarr")

0 commit comments

Comments
 (0)