Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,6 @@ jobs:
python -m bio2zarr tskit2zarr convert tests/data/ts/example.trees ts.vcz
deactivate

python -m venv env-plink
source env-plink/bin/activate
python -m pip install .
python -m bio2zarr plink2zarr convert tests/data/plink/example plink.vcz > plink.txt 2>&1 || echo $? > plink_exit.txt
test "$(cat plink_exit.txt)" = "1"
grep -q "This process requires the optional bed_reader module. Install it with: pip install bio2zarr\[plink\]" plink.txt
python -m pip install '.[plink]'
python -m bio2zarr plink2zarr convert tests/data/plink/example plink.vcz
deactivate

python -m venv env-vcf
source env-vcf/bin/activate
python -m pip install .
Expand Down
167 changes: 133 additions & 34 deletions bio2zarr/plink.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,143 @@
import pathlib

import numpy as np
import pandas as pd

from bio2zarr import constants, core, vcz

logger = logging.getLogger(__name__)


FAM_FIELDS = [
("family_id", str, "U"),
("individual_id", str, "U"),
("paternal_id", str, "U"),
("maternal_id", str, "U"),
("sex", str, "int8"),
("phenotype", str, "int8"),
]
FAM_DF_DTYPE = dict([(f[0], f[1]) for f in FAM_FIELDS])
FAM_ARRAY_DTYPE = dict([(f[0], f[2]) for f in FAM_FIELDS])

BIM_FIELDS = [
("contig", str, "U"),
("variant_id", str, "U"),
("cm_position", "float32", "float32"),
("position", "int32", "int32"),
("allele_1", str, "S"),
("allele_2", str, "S"),
]
BIM_DF_DTYPE = dict([(f[0], f[1]) for f in BIM_FIELDS])
BIM_ARRAY_DTYPE = dict([(f[0], f[2]) for f in BIM_FIELDS])


def read_fam(path, sep=None):
# See: https://www.cog-genomics.org/plink/1.9/formats#fam
names = [f[0] for f in FAM_FIELDS]
df = pd.read_csv(path, sep=sep, names=names, dtype=FAM_DF_DTYPE)
return df


def read_bim(path, sep=None):
# See: https://www.cog-genomics.org/plink/1.9/formats#bim
names = [f[0] for f in BIM_FIELDS]
df = pd.read_csv(str(path), sep=sep, names=names, dtype=BIM_DF_DTYPE)
return df


@dataclasses.dataclass
class PlinkPaths:
bed_path: str
bim_path: str
fam_path: str


class BedReader:
def __init__(self, path, num_variants, num_samples):
self.num_variants = num_variants
self.num_samples = num_samples
self.path = path
# bytes per variant: 1 byte per 4 samples, rounded up
self.bytes_per_variant = (self.num_samples + 3) // 4

# TODO open this as a persistent file and support reading from a
# stream
with open(self.path, "rb") as f:
magic = f.read(3)
if magic != b"\x6c\x1b\x01":
raise ValueError("Invalid BED file magic bytes")

# We could check the size of the bed file here, but that would
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about streams - I guess there is no way to know if the user has inconsistent bim/fam/bed files, some combinations of which would just give silently corrupted data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's just how it is in the real world. There's no point in ruling out useful functionality just to do checking that other people don't bother with anyway

# mean we can't work with streams.

# Initialize the lookup table with shape (256, 4, 2)
# 256 possible byte values, 4 samples per byte, 2 alleles per sample
lookup = np.zeros((256, 4, 2), dtype=np.int8)

# For each possible byte value (0-255)
for byte in range(256):
# For each of the 4 samples encoded in this byte
for sample in range(4):
# Extract the 2 bits for this sample
bits = (byte >> (sample * 2)) & 0b11
# Convert PLINK's bit encoding to genotype values
if bits == 0b00:
lookup[byte, sample] = [1, 1]
elif bits == 0b01:
lookup[byte, sample] = [-1, -1]
elif bits == 0b10:
lookup[byte, sample] = [0, 1]
elif bits == 0b11:
lookup[byte, sample] = [0, 0]

self.byte_lookup = lookup

def decode(self, start, stop):
chunk_size = stop - start

# Calculate file offsets for the required data
# 3 bytes for the magic number at the beginning of the file
start_offset = 3 + (start * self.bytes_per_variant)
bytes_to_read = chunk_size * self.bytes_per_variant

# TODO make it possible to read sequentially from the same file handle,
# seeking only when necessary.
with open(self.path, "rb") as f:
f.seek(start_offset)
chunk_data = f.read(bytes_to_read)

data_bytes = np.frombuffer(chunk_data, dtype=np.uint8)
data_matrix = data_bytes.reshape(chunk_size, self.bytes_per_variant)

# Apply lookup table to get genotypes
# Shape becomes: (chunk_size, bytes_per_variant, 4, 2)
all_genotypes = self.byte_lookup[data_matrix]

# Reshape to get all samples in one dimension
# (chunk_size, bytes_per_variant*4, 2)
samples_padded = self.bytes_per_variant * 4
genotypes_reshaped = all_genotypes.reshape(chunk_size, samples_padded, 2)

return genotypes_reshaped[:, : self.num_samples]


class PlinkFormat(vcz.Source):
@core.requires_optional_dependency("bed_reader", "plink")
def __init__(self, prefix):
import bed_reader

# TODO we will need support multiple chromosomes here to join
# plinks into on big zarr. So, these will require multiple
# bed and bim files, but should share a .fam
self.prefix = str(prefix)
paths = PlinkPaths(
self.paths = PlinkPaths(
self.prefix + ".bed",
self.prefix + ".bim",
self.prefix + ".fam",
)
self.bed = bed_reader.open_bed(
paths.bed_path,
bim_location=paths.bim_path,
fam_location=paths.fam_path,
num_threads=1,
count_A1=True,
self.bim = read_bim(self.paths.bim_path)
self.fam = read_fam(self.paths.fam_path)
self._num_records = self.bim.shape[0]
self._num_samples = self.fam.shape[0]
self.bed_reader = BedReader(
self.paths.bed_path, self.num_records, self.num_samples
)

@property
Expand All @@ -44,59 +148,54 @@ def path(self):

@property
def num_records(self):
return self.bed.sid_count
return self._num_records

@property
def samples(self):
return [vcz.Sample(id=sample) for sample in self.bed.iid]
def num_samples(self):
return self._num_samples

@property
def contigs(self):
return [vcz.Contig(id=str(chrom)) for chrom in np.unique(self.bed.chromosome)]
def samples(self):
return [vcz.Sample(id=iid) for iid in self.fam.individual_id]

@property
def num_samples(self):
return len(self.samples)
def contigs(self):
return [vcz.Contig(id=str(chrom)) for chrom in self.bim.contig.unique()]

def iter_contig(self, start, stop):
chrom_to_contig_index = {contig.id: i for i, contig in enumerate(self.contigs)}
for chrom in self.bed.chromosome[start:stop]:
for chrom in self.bim.contig[start:stop]:
yield chrom_to_contig_index[str(chrom)]

def iter_field(self, field_name, shape, start, stop):
assert field_name == "position" # Only position field is supported from plink
yield from self.bed.bp_position[start:stop]
yield from self.bim.position[start:stop]

def iter_id(self, start, stop):
yield from self.bed.sid[start:stop]
yield from self.bim.variant_id[start:stop]

def iter_alleles_and_genotypes(self, start, stop, shape, num_alleles):
alt_field = self.bed.allele_1
ref_field = self.bed.allele_2
bed_chunk = self.bed.read(slice(start, stop), dtype=np.int8).T
gt = np.zeros(shape, dtype=np.int8)
phased = np.zeros(shape[:-1], dtype=bool)
alt_field = self.bim.allele_1.values
ref_field = self.bim.allele_2.values
gt = self.bed_reader.decode(start, stop)
phased = np.zeros(gt.shape[:2], dtype=bool)
for i, (ref, alt) in enumerate(
zip(ref_field[start:stop], alt_field[start:stop])
):
alleles = np.full(num_alleles, constants.STR_FILL, dtype="O")
alleles[0] = ref
alleles[1 : 1 + len(alt)] = alt
gt[:] = 0
gt[bed_chunk[i] == -127] = -1
gt[bed_chunk[i] == 2] = 1
gt[bed_chunk[i] == 1, 1] = 1

# rlen is the length of the REF in PLINK as there's no END annotations
yield vcz.VariantData(len(alleles[0]), alleles, gt, phased)
yield vcz.VariantData(len(alleles[0]), alleles, gt[i], phased[i])

def generate_schema(
self,
variants_chunk_size=None,
samples_chunk_size=None,
):
n = self.bed.iid_count
m = self.bed.sid_count
n = self.num_samples
m = self.num_records
logging.info(f"Scanned plink with {n} samples and {m} variants")
dimensions = vcz.standard_dimensions(
variants_size=m,
Expand All @@ -119,7 +218,7 @@ def generate_schema(
)
# If we don't have SVLEN or END annotations, the rlen field is defined
# as the length of the REF
max_len = self.bed.allele_2.itemsize
max_len = self.bim.allele_2.values.itemsize

array_specs = [
vcz.ZarrArraySpec(
Expand Down Expand Up @@ -156,7 +255,7 @@ def generate_schema(
),
vcz.ZarrArraySpec(
name="variant_contig",
dtype=core.min_int_dtype(0, len(np.unique(self.bed.chromosome))),
dtype=core.min_int_dtype(0, len(np.unique(self.bim.contig))),
dimensions=["variants"],
description="Contig/chromosome index for each variant",
),
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ dependencies = [
# cyvcf2 also pulls in coloredlogs and click
"coloredlogs",
"click",
# Using pandas for reading plink files, but will be useful more generally
"pandas"
]
requires-python = ">=3.10"
classifiers = [
Expand Down Expand Up @@ -68,11 +70,9 @@ dev = [
]
# TODO Using dev version of tskit for CI, FIXME before release
tskit = ["tskit @ git+https://github.com/tskit-dev/tskit.git@main#subdirectory=python"]
plink = ["bed_reader"]
vcf = ["cyvcf2"]
all = [
"tskit @ git+https://github.com/tskit-dev/tskit.git@main#subdirectory=python",
"bed_reader",
"cyvcf2"
]

Expand Down
Loading