diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 911489d2..da2c4d98 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,16 +101,6 @@ jobs: python -m bio2zarr tskit2zarr convert tests/data/ts/example.trees ts.vcz deactivate - python -m venv env-plink - source env-plink/bin/activate - python -m pip install . - python -m bio2zarr plink2zarr convert tests/data/plink/example plink.vcz > plink.txt 2>&1 || echo $? > plink_exit.txt - test "$(cat plink_exit.txt)" = "1" - grep -q "This process requires the optional bed_reader module. Install it with: pip install bio2zarr\[plink\]" plink.txt - python -m pip install '.[plink]' - python -m bio2zarr plink2zarr convert tests/data/plink/example plink.vcz - deactivate - python -m venv env-vcf source env-vcf/bin/activate python -m pip install . diff --git a/bio2zarr/plink.py b/bio2zarr/plink.py index e170ddea..6ae2b873 100644 --- a/bio2zarr/plink.py +++ b/bio2zarr/plink.py @@ -3,12 +3,50 @@ import pathlib import numpy as np +import pandas as pd from bio2zarr import constants, core, vcz logger = logging.getLogger(__name__) +FAM_FIELDS = [ + ("family_id", str, "U"), + ("individual_id", str, "U"), + ("paternal_id", str, "U"), + ("maternal_id", str, "U"), + ("sex", str, "int8"), + ("phenotype", str, "int8"), +] +FAM_DF_DTYPE = dict([(f[0], f[1]) for f in FAM_FIELDS]) +FAM_ARRAY_DTYPE = dict([(f[0], f[2]) for f in FAM_FIELDS]) + +BIM_FIELDS = [ + ("contig", str, "U"), + ("variant_id", str, "U"), + ("cm_position", "float32", "float32"), + ("position", "int32", "int32"), + ("allele_1", str, "S"), + ("allele_2", str, "S"), +] +BIM_DF_DTYPE = dict([(f[0], f[1]) for f in BIM_FIELDS]) +BIM_ARRAY_DTYPE = dict([(f[0], f[2]) for f in BIM_FIELDS]) + + +def read_fam(path, sep=None): + # See: https://www.cog-genomics.org/plink/1.9/formats#fam + names = [f[0] for f in FAM_FIELDS] + df = pd.read_csv(path, sep=sep, names=names, dtype=FAM_DF_DTYPE) + return df + + +def read_bim(path, sep=None): + # See: https://www.cog-genomics.org/plink/1.9/formats#bim + names = [f[0] for f in BIM_FIELDS] + df = pd.read_csv(str(path), sep=sep, names=names, dtype=BIM_DF_DTYPE) + return df + + @dataclasses.dataclass class PlinkPaths: bed_path: str @@ -16,26 +54,92 @@ class PlinkPaths: fam_path: str +class BedReader: + def __init__(self, path, num_variants, num_samples): + self.num_variants = num_variants + self.num_samples = num_samples + self.path = path + # bytes per variant: 1 byte per 4 samples, rounded up + self.bytes_per_variant = (self.num_samples + 3) // 4 + + # TODO open this as a persistent file and support reading from a + # stream + with open(self.path, "rb") as f: + magic = f.read(3) + if magic != b"\x6c\x1b\x01": + raise ValueError("Invalid BED file magic bytes") + + # We could check the size of the bed file here, but that would + # mean we can't work with streams. + + # Initialize the lookup table with shape (256, 4, 2) + # 256 possible byte values, 4 samples per byte, 2 alleles per sample + lookup = np.zeros((256, 4, 2), dtype=np.int8) + + # For each possible byte value (0-255) + for byte in range(256): + # For each of the 4 samples encoded in this byte + for sample in range(4): + # Extract the 2 bits for this sample + bits = (byte >> (sample * 2)) & 0b11 + # Convert PLINK's bit encoding to genotype values + if bits == 0b00: + lookup[byte, sample] = [1, 1] + elif bits == 0b01: + lookup[byte, sample] = [-1, -1] + elif bits == 0b10: + lookup[byte, sample] = [0, 1] + elif bits == 0b11: + lookup[byte, sample] = [0, 0] + + self.byte_lookup = lookup + + def decode(self, start, stop): + chunk_size = stop - start + + # Calculate file offsets for the required data + # 3 bytes for the magic number at the beginning of the file + start_offset = 3 + (start * self.bytes_per_variant) + bytes_to_read = chunk_size * self.bytes_per_variant + + # TODO make it possible to read sequentially from the same file handle, + # seeking only when necessary. + with open(self.path, "rb") as f: + f.seek(start_offset) + chunk_data = f.read(bytes_to_read) + + data_bytes = np.frombuffer(chunk_data, dtype=np.uint8) + data_matrix = data_bytes.reshape(chunk_size, self.bytes_per_variant) + + # Apply lookup table to get genotypes + # Shape becomes: (chunk_size, bytes_per_variant, 4, 2) + all_genotypes = self.byte_lookup[data_matrix] + + # Reshape to get all samples in one dimension + # (chunk_size, bytes_per_variant*4, 2) + samples_padded = self.bytes_per_variant * 4 + genotypes_reshaped = all_genotypes.reshape(chunk_size, samples_padded, 2) + + return genotypes_reshaped[:, : self.num_samples] + + class PlinkFormat(vcz.Source): - @core.requires_optional_dependency("bed_reader", "plink") def __init__(self, prefix): - import bed_reader - # TODO we will need support multiple chromosomes here to join # plinks into on big zarr. So, these will require multiple # bed and bim files, but should share a .fam self.prefix = str(prefix) - paths = PlinkPaths( + self.paths = PlinkPaths( self.prefix + ".bed", self.prefix + ".bim", self.prefix + ".fam", ) - self.bed = bed_reader.open_bed( - paths.bed_path, - bim_location=paths.bim_path, - fam_location=paths.fam_path, - num_threads=1, - count_A1=True, + self.bim = read_bim(self.paths.bim_path) + self.fam = read_fam(self.paths.fam_path) + self._num_records = self.bim.shape[0] + self._num_samples = self.fam.shape[0] + self.bed_reader = BedReader( + self.paths.bed_path, self.num_records, self.num_samples ) @property @@ -44,59 +148,54 @@ def path(self): @property def num_records(self): - return self.bed.sid_count + return self._num_records @property - def samples(self): - return [vcz.Sample(id=sample) for sample in self.bed.iid] + def num_samples(self): + return self._num_samples @property - def contigs(self): - return [vcz.Contig(id=str(chrom)) for chrom in np.unique(self.bed.chromosome)] + def samples(self): + return [vcz.Sample(id=iid) for iid in self.fam.individual_id] @property - def num_samples(self): - return len(self.samples) + def contigs(self): + return [vcz.Contig(id=str(chrom)) for chrom in self.bim.contig.unique()] def iter_contig(self, start, stop): chrom_to_contig_index = {contig.id: i for i, contig in enumerate(self.contigs)} - for chrom in self.bed.chromosome[start:stop]: + for chrom in self.bim.contig[start:stop]: yield chrom_to_contig_index[str(chrom)] def iter_field(self, field_name, shape, start, stop): assert field_name == "position" # Only position field is supported from plink - yield from self.bed.bp_position[start:stop] + yield from self.bim.position[start:stop] def iter_id(self, start, stop): - yield from self.bed.sid[start:stop] + yield from self.bim.variant_id[start:stop] def iter_alleles_and_genotypes(self, start, stop, shape, num_alleles): - alt_field = self.bed.allele_1 - ref_field = self.bed.allele_2 - bed_chunk = self.bed.read(slice(start, stop), dtype=np.int8).T - gt = np.zeros(shape, dtype=np.int8) - phased = np.zeros(shape[:-1], dtype=bool) + alt_field = self.bim.allele_1.values + ref_field = self.bim.allele_2.values + gt = self.bed_reader.decode(start, stop) + phased = np.zeros(gt.shape[:2], dtype=bool) for i, (ref, alt) in enumerate( zip(ref_field[start:stop], alt_field[start:stop]) ): alleles = np.full(num_alleles, constants.STR_FILL, dtype="O") alleles[0] = ref alleles[1 : 1 + len(alt)] = alt - gt[:] = 0 - gt[bed_chunk[i] == -127] = -1 - gt[bed_chunk[i] == 2] = 1 - gt[bed_chunk[i] == 1, 1] = 1 # rlen is the length of the REF in PLINK as there's no END annotations - yield vcz.VariantData(len(alleles[0]), alleles, gt, phased) + yield vcz.VariantData(len(alleles[0]), alleles, gt[i], phased[i]) def generate_schema( self, variants_chunk_size=None, samples_chunk_size=None, ): - n = self.bed.iid_count - m = self.bed.sid_count + n = self.num_samples + m = self.num_records logging.info(f"Scanned plink with {n} samples and {m} variants") dimensions = vcz.standard_dimensions( variants_size=m, @@ -119,7 +218,7 @@ def generate_schema( ) # If we don't have SVLEN or END annotations, the rlen field is defined # as the length of the REF - max_len = self.bed.allele_2.itemsize + max_len = self.bim.allele_2.values.itemsize array_specs = [ vcz.ZarrArraySpec( @@ -156,7 +255,7 @@ def generate_schema( ), vcz.ZarrArraySpec( name="variant_contig", - dtype=core.min_int_dtype(0, len(np.unique(self.bed.chromosome))), + dtype=core.min_int_dtype(0, len(np.unique(self.bim.contig))), dimensions=["variants"], description="Contig/chromosome index for each variant", ), diff --git a/pyproject.toml b/pyproject.toml index 17898968..4831cf35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,8 @@ dependencies = [ # cyvcf2 also pulls in coloredlogs and click "coloredlogs", "click", + # Using pandas for reading plink files, but will be useful more generally + "pandas" ] requires-python = ">=3.10" classifiers = [ @@ -68,11 +70,9 @@ dev = [ ] # TODO Using dev version of tskit for CI, FIXME before release tskit = ["tskit @ git+https://github.com/tskit-dev/tskit.git@main#subdirectory=python"] -plink = ["bed_reader"] vcf = ["cyvcf2"] all = [ "tskit @ git+https://github.com/tskit-dev/tskit.git@main#subdirectory=python", - "bed_reader", "cyvcf2" ] diff --git a/tests/test_plink.py b/tests/test_plink.py index 661b62f6..0f863232 100644 --- a/tests/test_plink.py +++ b/tests/test_plink.py @@ -1,4 +1,4 @@ -from unittest import mock +import os.path import bed_reader import numpy as np @@ -11,20 +11,102 @@ from bio2zarr import plink, vcf -def test_missing_dependency(): - with mock.patch( - "importlib.import_module", - side_effect=ImportError("No module named 'bed_reader'"), - ): - with pytest.raises(ImportError) as exc_info: - plink.convert( - "UNUSED_PATH", - "UNUSED_PATH", - ) - assert ( - "This process requires the optional bed_reader module. " - "Install it with: pip install bio2zarr[plink]" in str(exc_info.value) +class TestReadBim: + def test_example(self): + path = "tests/data/plink/example.bim" + df = plink.read_bim(path) + # chrom, vid, cm_pos, pos, a1, a2 + # 1 1_10 0 10 A GG + # 1 1_20 0 20 TTT C + nt.assert_array_equal(df["contig"].values, ["1", "1"]) + nt.assert_array_equal(df["variant_id"].values, ["1_10", "1_20"]) + nt.assert_array_equal(df["cm_position"].values, [0, 0]) + nt.assert_array_equal(df["position"].values, [10, 20]) + nt.assert_array_equal(df["allele_1"].values, ["A", "TTT"]) + nt.assert_array_equal(df["allele_2"].values, ["GG", "C"]) + + +class TestReadFam: + def test_example(self): + path = "tests/data/plink/example.fam" + df = plink.read_fam(path) + # FID IID FATHER MOTHER SEX PHENOTYPE + # ind0 ind0 0 0 0 -9 + # ind1 ind1 0 0 0 -9 + # ind2 ind2 0 0 0 -9 + # ind3 ind3 0 0 0 -9 + # ind4 ind4 0 0 0 -9 + # ind5 ind5 0 0 0 -9 + # ind6 ind6 0 0 0 -9 + # ind7 ind7 0 0 0 -9 + # ind8 ind8 0 0 0 -9 + # ind9 ind9 0 0 0 -9 + nt.assert_array_equal(df["family_id"].values, [f"ind{j}" for j in range(10)]) + nt.assert_array_equal( + df["individual_id"].values, [f"ind{j}" for j in range(10)] ) + nt.assert_array_equal(df["paternal_id"].values, ["0" for j in range(10)]) + nt.assert_array_equal(df["maternal_id"].values, ["0" for j in range(10)]) + nt.assert_array_equal(df["sex"].values, ["0" for j in range(10)]) + nt.assert_array_equal(df["phenotype"].values, ["-9" for j in range(10)]) + + +class TestBedReader: + @pytest.mark.parametrize( + "path", + [ + "tests/data/plink/example.bim", + "tests/data/vcf/sample.vcf.gz", + ], + ) + def test_bad_file_type(self, path): + with pytest.raises(ValueError, match="Invalid BED file magic bytes"): + plink.BedReader(path, 1, 1) + + @pytest.mark.parametrize( + ("num_variants", "num_samples"), + [ + (1, 1), + (1, 2), + (1, 3), + (1, 4), + (1, 5), + (1, 6), + (1, 7), + (1, 8), + (1, 9), + (2, 1), + (3, 1), + (10, 1), + (100, 1), + (10, 2), + (10, 3), + (10, 4), + (10, 5), + (20, 20), + (30, 3), + ], + ) + def test_generated_bed_files(self, tmp_path, num_variants, num_samples): + bed_file = tmp_path / "a_file.bed" + # Generate a regular pattern of all possible values + data = np.arange(num_variants * num_samples, dtype=int) % 4 + data[data == 3] = -127 + data = data.reshape((num_variants, num_samples)) + + bed_reader.to_bed(bed_file, data.T, num_threads=1) + + bytes_per_variant = (num_samples + 3) // 4 + expected_size = 3 + bytes_per_variant * num_variants + assert os.path.getsize(bed_file) == expected_size + + br_map = {0: (0, 0), 1: (0, 1), 2: (1, 1), -127: (-1, -1)} + reader = plink.BedReader(bed_file, num_variants, num_samples) + g = reader.decode(0, num_variants) + assert g.shape == (num_variants, num_samples, 2) + for j in range(num_variants): + for k in range(num_samples): + assert br_map[data[j, k]] == tuple(g[j, k]) class TestSmallExample: