Skip to content

Commit 6038115

Browse files
Fix up region size check
Closes #146 note
1 parent 2b0d1a4 commit 6038115

File tree

4 files changed

+77
-23
lines changed

4 files changed

+77
-23
lines changed

bio2zarr/vcf.py

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ class VcfPartition:
144144
num_records: int = -1
145145

146146

147+
# TODO bump this before current PR is done!
147148
ICF_METADATA_FORMAT_VERSION = "0.2"
148149
ICF_DEFAULT_COMPRESSOR = numcodecs.Blosc(
149150
cname="zstd", clevel=7, shuffle=numcodecs.Blosc.NOSHUFFLE
@@ -903,6 +904,40 @@ def num_columns(self):
903904
return len(self.columns)
904905

905906

907+
@dataclasses.dataclass
908+
class IcfPartitionMetadata:
909+
num_records: int
910+
last_position: int
911+
field_summaries: dict
912+
913+
def asdict(self):
914+
return dataclasses.asdict(self)
915+
916+
def asjson(self):
917+
return json.dumps(self.asdict(), indent=4)
918+
919+
@staticmethod
920+
def fromdict(d):
921+
md = IcfPartitionMetadata(**d)
922+
for k, v in md.field_summaries.items():
923+
md.field_summaries[k] = VcfFieldSummary.fromdict(v)
924+
return md
925+
926+
927+
def check_overlapping_partitions(partitions):
928+
for i in range(1, len(partitions)):
929+
prev_region = partitions[i - 1].region
930+
current_region = partitions[i].region
931+
if prev_region.contig == current_region.contig:
932+
assert prev_region.end is not None
933+
# Regions are *inclusive*
934+
if prev_region.end >= current_region.start:
935+
raise ValueError(
936+
f"Overlapping VCF regions in partitions {i - 1} and {i}: "
937+
f"{prev_region} and {current_region}"
938+
)
939+
940+
906941
class IntermediateColumnarFormatWriter:
907942
def __init__(self, path):
908943
self.path = pathlib.Path(path)
@@ -974,11 +1009,8 @@ def load_partition_summaries(self):
9741009
not_found = []
9751010
for j in range(self.num_partitions):
9761011
try:
977-
with open(self.wip_path / f"p{j}_summary.json") as f:
978-
summary = json.load(f)
979-
for k, v in summary["field_summaries"].items():
980-
summary["field_summaries"][k] = VcfFieldSummary.fromdict(v)
981-
summaries.append(summary)
1012+
with open(self.wip_path / f"p{j}.json") as f:
1013+
summaries.append(IcfPartitionMetadata.fromdict(json.load(f)))
9821014
except FileNotFoundError:
9831015
not_found.append(j)
9841016
if len(not_found) > 0:
@@ -995,7 +1027,7 @@ def load_metadata(self):
9951027

9961028
def process_partition(self, partition_index):
9971029
self.load_metadata()
998-
summary_path = self.wip_path / f"p{partition_index}_summary.json"
1030+
summary_path = self.wip_path / f"p{partition_index}.json"
9991031
# If someone is rewriting a summary path (for whatever reason), make sure it
10001032
# doesn't look like it's already been completed.
10011033
# NOTE to do this properly we probably need to take a lock on this file - but
@@ -1016,6 +1048,7 @@ def process_partition(self, partition_index):
10161048
else:
10171049
format_fields.append(field)
10181050

1051+
last_position = None
10191052
with IcfPartitionWriter(
10201053
self.metadata,
10211054
self.path,
@@ -1025,6 +1058,7 @@ def process_partition(self, partition_index):
10251058
num_records = 0
10261059
for variant in ivcf.variants(partition.region):
10271060
num_records += 1
1061+
last_position = variant.POS
10281062
tcw.append("CHROM", variant.CHROM)
10291063
tcw.append("POS", variant.POS)
10301064
tcw.append("QUAL", variant.QUAL)
@@ -1049,15 +1083,16 @@ def process_partition(self, partition_index):
10491083
f"flushing buffers"
10501084
)
10511085

1052-
partition_metadata = {
1053-
"num_records": num_records,
1054-
"field_summaries": {k: v.asdict() for k, v in tcw.field_summaries.items()},
1055-
}
1086+
partition_metadata = IcfPartitionMetadata(
1087+
num_records=num_records,
1088+
last_position=last_position,
1089+
field_summaries=tcw.field_summaries,
1090+
)
10561091
with open(summary_path, "w") as f:
1057-
json.dump(partition_metadata, f, indent=4)
1092+
f.write(partition_metadata.asjson())
10581093
logger.info(
1059-
f"Finish p{partition_index} {partition.vcf_path}__{partition.region}="
1060-
f"{num_records} records"
1094+
f"Finish p{partition_index} {partition.vcf_path}__{partition.region} "
1095+
f"{num_records} records last_pos={last_position}"
10611096
)
10621097

10631098
def explode(self, *, worker_processes=1, show_progress=False):
@@ -1099,8 +1134,9 @@ def finalise(self):
10991134
partition_summaries = self.load_partition_summaries()
11001135
total_records = 0
11011136
for index, summary in enumerate(partition_summaries):
1102-
partition_records = summary["num_records"]
1137+
partition_records = summary.num_records
11031138
self.metadata.partitions[index].num_records = partition_records
1139+
self.metadata.partitions[index].region.end = summary.last_position
11041140
total_records += partition_records
11051141
if not np.isinf(self.metadata.num_records):
11061142
# Note: this is just telling us that there's a bug in the
@@ -1110,9 +1146,11 @@ def finalise(self):
11101146
assert total_records == self.metadata.num_records
11111147
self.metadata.num_records = total_records
11121148

1149+
check_overlapping_partitions(self.metadata.partitions)
1150+
11131151
for field in self.metadata.fields:
11141152
for summary in partition_summaries:
1115-
field.summary.update(summary["field_summaries"][field.full_name])
1153+
field.summary.update(summary.field_summaries[field.full_name])
11161154

11171155
logger.info("Finalising metadata")
11181156
with open(self.path / "metadata.json", "w") as f:
@@ -1756,7 +1794,7 @@ def encode_partition(self, partition_index):
17561794
final_path = self.partition_path(partition_index)
17571795
logger.info(f"Finalising {partition_index} at {final_path}")
17581796
if final_path.exists():
1759-
logger.warning("Removing existing partition at {final_path}")
1797+
logger.warning(f"Removing existing partition at {final_path}")
17601798
shutil.rmtree(final_path)
17611799
os.rename(partition_path, final_path)
17621800

bio2zarr/vcf_utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ def read_bytes_as_tuple(f: IO[Any], fmt: str) -> Sequence[Any]:
7676

7777
@dataclass
7878
class Region:
79+
"""
80+
A htslib style region, where coordinates are 1-based and inclusive.
81+
"""
82+
7983
contig: str
8084
start: Optional[int] = None
8185
end: Optional[int] = None
@@ -86,7 +90,7 @@ def __post_init__(self):
8690
assert self.start > 0
8791
if self.end is not None:
8892
self.end = int(self.end)
89-
assert self.end > self.start
93+
assert self.end >= self.start
9094

9195
def __str__(self):
9296
s = f"{self.contig}"

tests/test_icf.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def test_finalise_missing_partition_fails(self, tmp_path, partition):
147147
def test_explode_partition(self, tmp_path, partition):
148148
icf_path = tmp_path / "x.icf"
149149
vcf.explode_init(icf_path, [self.data_path])
150-
summary_file = icf_path / "wip" / f"p{partition}_summary.json"
150+
summary_file = icf_path / "wip" / f"p{partition}.json"
151151
assert not summary_file.exists()
152152
vcf.explode_partition(icf_path, partition)
153153
assert summary_file.exists()
@@ -156,12 +156,12 @@ def test_double_explode_partition(self, tmp_path):
156156
partition = 1
157157
icf_path = tmp_path / "x.icf"
158158
vcf.explode_init(icf_path, [self.data_path])
159-
summary_file = icf_path / "wip" / f"p{partition}_summary.json"
159+
summary_file = icf_path / "wip" / f"p{partition}.json"
160160
assert not summary_file.exists()
161-
vcf.explode_partition(icf_path, partition, worker_processes=0)
161+
vcf.explode_partition(icf_path, partition)
162162
with open(summary_file) as f:
163163
s1 = f.read()
164-
vcf.explode_partition(icf_path, partition, worker_processes=0)
164+
vcf.explode_partition(icf_path, partition)
165165
with open(summary_file) as f:
166166
s2 = f.read()
167167
assert s1 == s2
@@ -173,6 +173,16 @@ def test_explode_partition_out_of_range(self, tmp_path, partition):
173173
with pytest.raises(ValueError, match="Partition index must be in the range"):
174174
vcf.explode_partition(icf_path, partition)
175175

176+
def test_explode_same_file_twice(self, tmp_path):
177+
icf_path = tmp_path / "x.icf"
178+
with pytest.raises(ValueError, match="Duplicate path provided"):
179+
vcf.explode(icf_path, [self.data_path, self.data_path])
180+
181+
def test_explode_same_data_twice(self, tmp_path):
182+
icf_path = tmp_path / "x.icf"
183+
with pytest.raises(ValueError, match="Overlapping VCF regions"):
184+
vcf.explode(icf_path, [self.data_path, "tests/data/vcf/sample.bcf"])
185+
176186

177187
class TestGeneratedFieldsExample:
178188
data_path = "tests/data/vcf/field_type_combos.vcf.gz"

tests/test_vcf.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ def test_call_GQ(self, schema):
334334
[("1", 100, 200), ("1", 150, 250)],
335335
# Overlap by one position
336336
[("1", 100, 201), ("1", 200, 300)],
337+
# End coord is *inclusive*
338+
[("1", 100, 201), ("1", 201, 300)],
337339
# Contained overlap
338340
[("1", 100, 300), ("1", 150, 250)],
339341
# Exactly equal
@@ -345,8 +347,8 @@ def test_check_overlap(regions):
345347
vcf.VcfPartition("", region=vcf_utils.Region(contig, start, end))
346348
for contig, start, end in regions
347349
]
348-
with pytest.raises(ValueError, match="Multiple VCFs have the region"):
349-
vcf.check_overlap(partitions)
350+
with pytest.raises(ValueError, match="Overlapping VCF regions"):
351+
vcf.check_overlapping_partitions(partitions)
350352

351353

352354
class TestVcfDescriptions:

0 commit comments

Comments
 (0)