|
1 | 1 | import json
|
2 | 2 |
|
| 3 | +import pysam |
3 | 4 | import pytest
|
4 | 5 | import sgkit as sg
|
5 | 6 | import xarray.testing as xt
|
@@ -504,3 +505,64 @@ def test_encode_partition_out_of_range(self, icf_path, tmp_path, partition):
|
504 | 505 | vcf.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3)
|
505 | 506 | with pytest.raises(ValueError, match="Partition index not in the valid range"):
|
506 | 507 | vcf.encode_partition(zarr_path, partition)
|
| 508 | + |
| 509 | + |
| 510 | +class TestClobberFixedFields: |
| 511 | + def generate_vcf(self, path, info_field=None, format_field=None, num_rows=1): |
| 512 | + with open(path, "w") as out: |
| 513 | + print("##fileformat=VCFv4.2", file=out) |
| 514 | + print('##FILTER=<ID=PASS,Description="All filters passed">', file=out) |
| 515 | + print("##contig=<ID=1>", file=out) |
| 516 | + if info_field is not None: |
| 517 | + print( |
| 518 | + f'##INFO=<ID={info_field},Number=1,Type=Float,Description="">', |
| 519 | + file=out, |
| 520 | + ) |
| 521 | + if format_field is not None: |
| 522 | + print( |
| 523 | + f'##FORMAT=<ID={format_field},Number=1,Type=Float,Description="">', |
| 524 | + file=out, |
| 525 | + ) |
| 526 | + header = "\t".join( |
| 527 | + ["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO"] |
| 528 | + ) |
| 529 | + print(header, file=out) |
| 530 | + for k in range(num_rows): |
| 531 | + pos = str(k + 1) |
| 532 | + print("\t".join(["1", pos, "A", "T", ".", ".", ".", "."]), file=out) |
| 533 | + |
| 534 | + print(open(path).read()) |
| 535 | + # This also compresses the input file |
| 536 | + pysam.tabix_index(str(path), preset="vcf") |
| 537 | + |
| 538 | + @pytest.mark.parametrize( |
| 539 | + "field", |
| 540 | + [ |
| 541 | + "contig", |
| 542 | + "id", |
| 543 | + "id_mask", |
| 544 | + "position", |
| 545 | + "allele", |
| 546 | + "filter", |
| 547 | + "quality", |
| 548 | + ], |
| 549 | + ) |
| 550 | + def test_variant_fields(self, tmp_path, field): |
| 551 | + vcf_file = tmp_path / "test.vcf" |
| 552 | + self.generate_vcf(vcf_file, info_field=field) |
| 553 | + with pytest.raises(ValueError, match=f"INFO field name.*{field}"): |
| 554 | + vcf.explode(tmp_path / "x.icf", [tmp_path / "test.vcf.gz"]) |
| 555 | + |
| 556 | + @pytest.mark.parametrize( |
| 557 | + "field", |
| 558 | + [ |
| 559 | + "genotype", |
| 560 | + "genotype_phased", |
| 561 | + "genotype_mask", |
| 562 | + ], |
| 563 | + ) |
| 564 | + def test_call_fields(self, tmp_path, field): |
| 565 | + vcf_file = tmp_path / "test.vcf" |
| 566 | + self.generate_vcf(vcf_file, format_field=field) |
| 567 | + with pytest.raises(ValueError, match=f"FORMAT field name.*{field}"): |
| 568 | + vcf.explode(tmp_path / "x.icf", [tmp_path / "test.vcf.gz"]) |
0 commit comments