1
1
import sys
2
2
3
+ import msprime
3
4
import numpy .testing as nt
4
5
import pysam
5
6
import pytest
8
9
from bio2zarr import vcf2zarr
9
10
10
11
12
+ def run_simulation (ploidy = 1 ):
13
+ ts = msprime .sim_ancestry (
14
+ 2 ,
15
+ population_size = 10 ** 4 ,
16
+ ploidy = ploidy ,
17
+ sequence_length = 100_000 ,
18
+ random_seed = 42 ,
19
+ )
20
+ tables = ts .dump_tables ()
21
+ for u in ts .samples ():
22
+ site = tables .sites .add_row (u + 1 , "A" )
23
+ tables .mutations .add_row (site , derived_state = "T" , node = u )
24
+ return tables .tree_sequence ()
25
+
26
+
27
+ def write_vcf (ts , vcf_path , contig_id = "1" ):
28
+ with open (vcf_path , "w" ) as f :
29
+ ts .write_vcf (f , contig_id = contig_id )
30
+ # This also compresses the input file
31
+ pysam .tabix_index (str (vcf_path ), preset = "vcf" )
32
+ return vcf_path .with_suffix (vcf_path .suffix + ".gz" )
33
+
34
+
11
35
@pytest .mark .skipif (sys .platform == "darwin" , reason = "msprime OSX pip packages broken" )
12
36
class TestTskitRoundTripVcf :
13
37
@pytest .mark .parametrize ("ploidy" , [1 , 2 , 3 , 4 ])
14
38
def test_ploidy (self , ploidy , tmp_path ):
15
- # FIXME importing here so pytest.skip avoids importing msprime.
16
- import msprime
17
-
18
- ts = msprime .sim_ancestry (
19
- 2 ,
20
- population_size = 10 ** 4 ,
21
- ploidy = ploidy ,
22
- sequence_length = 100_000 ,
23
- random_seed = 42 ,
24
- )
25
- tables = ts .dump_tables ()
26
- for u in ts .samples ():
27
- site = tables .sites .add_row (u + 1 , "A" )
28
- tables .mutations .add_row (site , derived_state = "T" , node = u )
29
- ts = tables .tree_sequence ()
30
- vcf_file = tmp_path / "sim.vcf"
31
- with open (vcf_file , "w" ) as f :
32
- ts .write_vcf (f )
33
- # This also compresses the input file
34
- pysam .tabix_index (str (vcf_file ), preset = "vcf" )
39
+ ts = run_simulation (ploidy = ploidy )
40
+ vcf_path = write_vcf (ts , tmp_path / "sim.vcf" )
35
41
out = tmp_path / "example.vcf.zarr"
36
- vcf2zarr .convert ([tmp_path / "sim.vcf.gz" ], out )
42
+ vcf2zarr .convert ([vcf_path ], out )
37
43
ds = sg .load_dataset (out )
38
44
assert ds .sizes ["ploidy" ] == ploidy
39
45
assert ds .sizes ["variants" ] == ts .num_sites
@@ -46,3 +52,28 @@ def test_ploidy(self, ploidy, tmp_path):
46
52
nt .assert_equal (ds .variant_allele [:, 0 ].values , "A" )
47
53
nt .assert_equal (ds .variant_allele [:, 1 ].values , "T" )
48
54
nt .assert_equal (ds .variant_position , ts .sites_position )
55
+
56
+ # @pytest.mark.parametrize("contig_ids", [["A"], ["1", "2"]])
57
+ # def test_multi_contig(self, contig_ids, tmp_path):
58
+ # ts = run_simulation()
59
+ # # for contig in range(num_contigs):
60
+ # # vcf_file = tmp_path / "sim.vcf"
61
+ # # with open(vcf_file, "w") as f:
62
+ # # ts.write_vcf(f, contig_id=)
63
+ # # # This also compresses the input file
64
+ # # pysam.tabix_index(str(vcf_file), preset="vcf")
65
+
66
+ # out = tmp_path / "example.vcf.zarr"
67
+ # vcf2zarr.convert([tmp_path / "sim.vcf.gz"], out)
68
+ # ds = sg.load_dataset(out)
69
+ # assert ds.sizes["ploidy"] == ploidy
70
+ # assert ds.sizes["variants"] == ts.num_sites
71
+ # assert ds.sizes["samples"] == ts.num_individuals
72
+ # # Msprime guarantees that this will be true.
73
+ # nt.assert_array_equal(
74
+ # ts.genotype_matrix().reshape((ts.num_sites, ts.num_individuals, ploidy)),
75
+ # ds.call_genotype.values,
76
+ # )
77
+ # nt.assert_equal(ds.variant_allele[:, 0].values, "A")
78
+ # nt.assert_equal(ds.variant_allele[:, 1].values, "T")
79
+ # nt.assert_equal(ds.variant_position, ts.sites_position)
0 commit comments