|
| 1 | +"""Functions for parsing CSI files into Python objects so they can be inspected. |
| 2 | +
|
| 3 | +The implementation follows the [CSI index file format](http://samtools.github.io/hts-specs/CSIv1.pdf). |
| 4 | +
|
| 5 | +""" |
| 6 | +from dataclasses import dataclass |
| 7 | +from typing import Any, Dict, Optional, Sequence |
| 8 | + |
| 9 | +import numpy as np |
| 10 | + |
| 11 | +from sgkit.io.vcf.utils import ( |
| 12 | + get_file_offset, |
| 13 | + open_gzip, |
| 14 | + read_bytes_as_tuple, |
| 15 | + read_bytes_as_value, |
| 16 | +) |
| 17 | +from sgkit.typing import PathType |
| 18 | + |
| 19 | +CSI_EXTENSION = ".csi" |
| 20 | + |
| 21 | + |
| 22 | +@dataclass |
| 23 | +class Chunk: |
| 24 | + cnk_beg: int |
| 25 | + cnk_end: int |
| 26 | + |
| 27 | + |
| 28 | +@dataclass |
| 29 | +class Bin: |
| 30 | + bin: int |
| 31 | + loffset: int |
| 32 | + chunks: Sequence[Chunk] |
| 33 | + |
| 34 | + |
| 35 | +@dataclass |
| 36 | +class CSIIndex: |
| 37 | + min_shift: int |
| 38 | + depth: int |
| 39 | + aux: str |
| 40 | + bins: Sequence[Sequence[Bin]] |
| 41 | + record_counts: Sequence[int] |
| 42 | + n_no_coor: int |
| 43 | + |
| 44 | + def offsets(self) -> Any: |
| 45 | + pseudo_bin = bin_limit(self.min_shift, self.depth) + 1 |
| 46 | + |
| 47 | + file_offsets = [] |
| 48 | + contig_indexes = [] |
| 49 | + positions = [] |
| 50 | + for contig_index, bins in enumerate(self.bins): |
| 51 | + # bins may be in any order within a contig, so sort by loffset |
| 52 | + for bin in sorted(bins, key=lambda b: b.loffset): |
| 53 | + if bin.bin == pseudo_bin: |
| 54 | + continue # skip pseudo bins |
| 55 | + file_offset = get_file_offset(bin.loffset) |
| 56 | + position = get_first_locus_in_bin(self, bin.bin) |
| 57 | + file_offsets.append(file_offset) |
| 58 | + contig_indexes.append(contig_index) |
| 59 | + positions.append(position) |
| 60 | + |
| 61 | + return np.array(file_offsets), np.array(contig_indexes), np.array(positions) |
| 62 | + |
| 63 | + |
| 64 | +def bin_limit(min_shift: int, depth: int) -> int: |
| 65 | + """Defined in CSI spec""" |
| 66 | + return ((1 << (depth + 1) * 3) - 1) // 7 |
| 67 | + |
| 68 | + |
| 69 | +def get_first_bin_in_level(level: int) -> int: |
| 70 | + return ((1 << level * 3) - 1) // 7 |
| 71 | + |
| 72 | + |
| 73 | +def get_level_size(level: int) -> int: |
| 74 | + return 1 << level * 3 |
| 75 | + |
| 76 | + |
| 77 | +def get_level_for_bin(csi: CSIIndex, bin: int) -> int: |
| 78 | + for i in range(csi.depth, -1, -1): |
| 79 | + if bin >= get_first_bin_in_level(i): |
| 80 | + return i |
| 81 | + raise ValueError(f"Cannot find level for bin {bin}.") # pragma: no cover |
| 82 | + |
| 83 | + |
| 84 | +def get_first_locus_in_bin(csi: CSIIndex, bin: int) -> int: |
| 85 | + level = get_level_for_bin(csi, bin) |
| 86 | + first_bin_on_level = get_first_bin_in_level(level) |
| 87 | + level_size = get_level_size(level) |
| 88 | + max_span = 1 << (csi.min_shift + 3 * csi.depth) |
| 89 | + return (bin - first_bin_on_level) * (max_span // level_size) + 1 |
| 90 | + |
| 91 | + |
| 92 | +def read_csi( |
| 93 | + file: PathType, storage_options: Optional[Dict[str, str]] = None |
| 94 | +) -> CSIIndex: |
| 95 | + """Parse a CSI file into a `CSIIndex` object. |
| 96 | +
|
| 97 | + Parameters |
| 98 | + ---------- |
| 99 | + file : PathType |
| 100 | + The path to the CSI file. |
| 101 | +
|
| 102 | + Returns |
| 103 | + ------- |
| 104 | + CSIIndex |
| 105 | + An object representing a CSI index. |
| 106 | +
|
| 107 | + Raises |
| 108 | + ------ |
| 109 | + ValueError |
| 110 | + If the file is not a CSI file. |
| 111 | + """ |
| 112 | + with open_gzip(file, storage_options=storage_options) as f: |
| 113 | + magic = read_bytes_as_value(f, "4s") |
| 114 | + if magic != b"CSI\x01": |
| 115 | + raise ValueError("File not in CSI format.") |
| 116 | + |
| 117 | + min_shift, depth, l_aux = read_bytes_as_tuple(f, "<3i") |
| 118 | + aux = read_bytes_as_value(f, f"{l_aux}s", "") |
| 119 | + n_ref = read_bytes_as_value(f, "<i") |
| 120 | + |
| 121 | + pseudo_bin = bin_limit(min_shift, depth) + 1 |
| 122 | + |
| 123 | + bins = [] |
| 124 | + record_counts = [] |
| 125 | + |
| 126 | + if n_ref > 0: |
| 127 | + for _ in range(n_ref): |
| 128 | + n_bin = read_bytes_as_value(f, "<i") |
| 129 | + seq_bins = [] |
| 130 | + record_count = -1 |
| 131 | + for _ in range(n_bin): |
| 132 | + bin, loffset, n_chunk = read_bytes_as_tuple(f, "<IQi") |
| 133 | + chunks = [] |
| 134 | + for _ in range(n_chunk): |
| 135 | + chunk = Chunk(*read_bytes_as_tuple(f, "<QQ")) |
| 136 | + chunks.append(chunk) |
| 137 | + seq_bins.append(Bin(bin, loffset, chunks)) |
| 138 | + |
| 139 | + if bin == pseudo_bin: |
| 140 | + assert len(chunks) == 2 |
| 141 | + n_mapped, n_unmapped = chunks[1].cnk_beg, chunks[1].cnk_end |
| 142 | + record_count = n_mapped + n_unmapped |
| 143 | + bins.append(seq_bins) |
| 144 | + record_counts.append(record_count) |
| 145 | + |
| 146 | + n_no_coor = read_bytes_as_value(f, "<Q", 0) |
| 147 | + |
| 148 | + assert len(f.read(1)) == 0 |
| 149 | + |
| 150 | + return CSIIndex(min_shift, depth, aux, bins, record_counts, n_no_coor) |
0 commit comments