Skip to content

Commit ed3d963

Browse files
Merge pull request #12 from fragilefort/genebank-parser
Added a Genebank parser - resolves issue #8
2 parents f4e9b97 + 0690752 commit ed3d963

File tree

4 files changed

+665
-2
lines changed

4 files changed

+665
-2
lines changed

src/biobase/parser/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from biobase.parser import fasta, fastq
1+
from biobase.parser import fasta, fastq, genbank
22

33
# Fasta parser functions
44
FastaParser = fasta.FastaParser
@@ -15,3 +15,14 @@
1515
# Record Class
1616
FastaRecord = fasta.FastaRecord
1717
FastqRecord = fastq.FastqRecord
18+
19+
# GenBank Parser Classes
20+
GenBankParser = genbank.GenBankParser
21+
GenBankRecord = genbank.GenBankRecord
22+
SingleFeature = genbank.SingleFeature
23+
Locus = genbank.Locus
24+
Definition = genbank.Definition
25+
Accession = genbank.Accession
26+
Version = genbank.Version
27+
Origin = genbank.Origin
28+
Features = genbank.Features # The container class for all features

src/biobase/parser/genbank.py

Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
import re
2+
from collections.abc import Generator
3+
from pathlib import Path
4+
from typing import Any, Iterator
5+
6+
7+
def main() -> None:
8+
"""
9+
Main function to demonstrate the GenBank parser.
10+
It downloads a sample file, parses it using the new iterable parser,
11+
prints the contents of all records, and cleans up the file afterward.
12+
"""
13+
# import requests
14+
# import os
15+
16+
# # URL for a sample GenBank file with multiple records (e.g., several accessions)
17+
# multi_record_ids = "NG_017013.2,DQ366810.1,X52541.1"
18+
# url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=nuccore&id={multi_record_ids}&rettype=gb&retmode=text"
19+
# file_path = "temp_multi_record.gb"
20+
21+
# try:
22+
# # 1. Download the file
23+
# print(f"Downloading sample file with multiple records from NCBI: {multi_record_ids}...")
24+
# response = requests.get(url, timeout=10)
25+
# response.raise_for_status() # Raise an exception for bad status codes
26+
27+
# with open(file_path, "w") as f:
28+
# f.write(response.text)
29+
# print(f"File saved to '{file_path}'")
30+
31+
# # 2. Parse the file and iterate over records using the GenBankParser
32+
# print("\n--- Parsing GenBank Records ---")
33+
# parser = GenBankParser(file_path)
34+
35+
# total_records = 0
36+
37+
# # Iterate over the parser, which yields GenBankRecord objects
38+
# for i, record in enumerate(parser, 1):
39+
# total_records += 1
40+
# print(f"\n--- Record {i}: {record!r} ---")
41+
42+
# # Print core attributes
43+
# print(f" ID: {record.id}")
44+
# print(f" Name: {record.name}")
45+
46+
# # Demonstrate access to parsed entries
47+
# if "LOCUS" in record.entries:
48+
# locus = record.entries["LOCUS"]
49+
# print(f" Locus Name: {locus.name}")
50+
# print(f" Sequence Length (from LOCUS): {locus.length} bp")
51+
52+
# if "ORIGIN" in record.entries:
53+
# seq = record.entries["ORIGIN"].sequence
54+
# print(f" Sequence Length (Actual): {len(seq)} bp")
55+
# print(f" Sequence Preview: {seq[:30]}...")
56+
57+
# if "FEATURES" in record.entries:
58+
# features = record.entries["FEATURES"]
59+
# print(f" Number of features: {len(features.entries)}")
60+
# if features.entries:
61+
# print(f" First feature: {features.entries[0]}")
62+
63+
# print(f"\nSuccessfully parsed {total_records} records.")
64+
65+
# except requests.exceptions.RequestException as e:
66+
# print(f"Error downloading file: {e}")
67+
# except Exception as e:
68+
# print(f"An unexpected error occurred during parsing: {e}")
69+
# finally:
70+
# # 3. Clean up the downloaded file
71+
# if os.path.exists(file_path):
72+
# os.remove(file_path)
73+
# print(f"\nCleaned up temporary file '{file_path}'.")
74+
75+
76+
class Locus:
77+
_MOLECULE_TYPE_LIST: list[str] = ["DNA", "RNA", "PROTEIN"]
78+
79+
def __init__(self, line: str) -> None:
80+
self._raw_line: str = line.strip()
81+
self._parts: list[str] = self._raw_line.split()
82+
self.name: str = ""
83+
self.length: int = 0
84+
self.molecule_type: str = ""
85+
self.topology: str = ""
86+
self.date: str = ""
87+
self._set_info()
88+
89+
def _set_info(self):
90+
if not self._parts:
91+
return
92+
93+
# LOCUS name is always second
94+
self.name: str = self._parts[1] if len(self._parts) > 1 else ""
95+
96+
# Try to find the number as the length
97+
for i, token in enumerate(self._parts):
98+
if token.isdigit():
99+
self.length: int = int(token)
100+
break
101+
# Identify molcule type
102+
for token in self._parts:
103+
if token.upper() in self._MOLECULE_TYPE_LIST:
104+
self.molecule_type = token.upper()
105+
break
106+
# Detect topology
107+
for token in self._parts:
108+
if token.lower() in ("linear", "circular"):
109+
self.topology = token.lower()
110+
111+
self.date: str = self._parts[-1] if len(self._parts) > 2 else ""
112+
113+
def __repr__(self) -> str:
114+
return (
115+
f"Locus(name='{self.name}', length={self.length}, "
116+
f"molecule_type='{self.molecule_type}', topology='{self.topology}', "
117+
f"date='{self.date}')"
118+
)
119+
120+
121+
class Definition:
122+
def __init__(self, info: str) -> None:
123+
self.info: str = info.replace("DEFINITION", "").strip()
124+
125+
def __repr__(self) -> str:
126+
preview: str = f"{self.info[:70]}..." if len(self.info) > 70 else self.info
127+
return f"<Definition info='{preview}'>"
128+
129+
130+
class Accession:
131+
def __init__(self, info: str) -> None:
132+
self.info: str = " ".join(info.split()[1:])
133+
134+
def __repr__(self) -> str:
135+
return f"<Accession: {self.info}>"
136+
137+
138+
class Version:
139+
def __init__(self, info: str) -> None:
140+
parts: list[str] = info.split()
141+
self.version: str = parts[1]
142+
self.gi: str | None = None
143+
144+
for part in parts:
145+
if part.startswith("GI:"):
146+
self.gi = part.split(":")[1]
147+
break
148+
149+
def __repr__(self) -> str:
150+
return f"<Version: {self.version} GI:{self.gi}>"
151+
152+
153+
class Origin:
154+
"""Representation of the ORIGIN entry in a GenBank format"""
155+
156+
def __init__(self, raw_text: str) -> None:
157+
self._raw_text: str = raw_text
158+
159+
@property
160+
def sequence(self) -> str:
161+
lines: list[str] = self._raw_text.splitlines()
162+
seq_lines: list[str] = []
163+
164+
for line in lines:
165+
if line.startswith("ORIGIN") or line.startswith("//"):
166+
continue
167+
clean: str = "".join(ch for ch in line if ch.isalpha())
168+
seq_lines.append(clean)
169+
return "".join(seq_lines).lower()
170+
171+
def __repr__(self) -> str:
172+
seq: str = self.sequence
173+
preview: str = seq[:20] + "..." if len(seq) > 20 else seq
174+
return f"<Origin sequence='{preview}' length={len(seq)}>"
175+
176+
177+
class SingleFeature:
178+
"""Representation of one of FEATURES entries, like a 'gene' or a 'CDS' block."""
179+
180+
def __init__(self, key: str, location: str, qualifiers: dict[str, str]) -> None:
181+
self.key = key
182+
self.location = location
183+
self.qualifiers = qualifiers
184+
185+
def __repr__(self) -> str:
186+
return f"Feature(key={self.key}, location={self.location}, qualifiers={len(self.qualifiers)})"
187+
188+
189+
class Features:
190+
"""Parses and stores the FEATURES block of a GenBank file"""
191+
192+
def __init__(self, info: str) -> None:
193+
self.info: str = info
194+
self.entries: list[SingleFeature] = []
195+
self._parse_features()
196+
197+
def __repr__(self) -> str:
198+
return f"<Features containing {len(self.entries)} entries>"
199+
200+
def _parse_features(self) -> None:
201+
"""Extract each feature line block (e.g., 'gene', 'CDS') and its qualifiers.
202+
203+
Expected structure in GenBank:
204+
FEATURE_KEY LOCATION
205+
/qualifier1="..."
206+
/qualifier2="..."
207+
"""
208+
# self.info have all the block of FEATURES
209+
current_key: str = ""
210+
current_loc: str = ""
211+
current_quals: dict[str, str] = {}
212+
213+
for line in self.info.splitlines()[1:]:
214+
# A new feature appears when a non-space appears in the first 21 columns
215+
# GenBank Uses fixed-width indentation
216+
if line[:21].strip() and not line.strip().startswith("/"):
217+
# Store the previous feature
218+
if current_key:
219+
self.entries.append(
220+
SingleFeature(current_key, current_loc, current_quals)
221+
)
222+
parts = line.strip().split(None, 1) # split feature and location
223+
current_key = parts[0]
224+
current_loc = parts[1] if len(parts) > 1 else ""
225+
current_quals = {} # reset qualifiers for the new feature
226+
227+
# This line is a qualifier for the new feature
228+
elif line.strip().startswith("/"):
229+
qualifier = line.strip()[1:] # to remove leading slash
230+
# usually qualifier and values are seperated by "="
231+
if "=" in qualifier:
232+
key, value = qualifier.split("=", 1)
233+
value = value.strip('""') # also remove quotes
234+
current_quals[key] = value
235+
else:
236+
# if qualifier doesn't have "="
237+
current_quals[qualifier] = ""
238+
else:
239+
# This means that this line isn't a new feature nor a new qualifier
240+
# i.e it is a continuation of a previous qualifier
241+
# so add the current line to the last qualifier
242+
if current_quals:
243+
last_key = list(current_quals.keys())[-1]
244+
current_quals[last_key] += " " + line.strip()
245+
# append the last feature
246+
if current_key:
247+
self.entries.append(SingleFeature(current_key, current_loc, current_quals))
248+
249+
250+
class GenBankRecord:
251+
"""Represents a parsed GenBank record with entries"""
252+
253+
_entry_classes: dict[str, type] = {
254+
"LOCUS": Locus,
255+
"DEFINITION": Definition,
256+
"ACCESSION": Accession,
257+
"FEATURES": Features,
258+
"ORIGIN": Origin,
259+
"VERSION": Version,
260+
}
261+
262+
def __init__(
263+
self, entries: dict[str, Any], source_filepath: Path | None = None
264+
) -> None:
265+
# The parser now provides the 'entries' dict directly.
266+
self.entries: dict[str, Any] = entries
267+
self._source_filepath = source_filepath
268+
269+
# Access common fields, handling potential missing keys gracefully
270+
self.name: str = (
271+
self.entries.get("DEFINITION").info
272+
if "DEFINITION" in self.entries
273+
else "N/A"
274+
)
275+
self.seq: str = (
276+
self.entries.get("ORIGIN").sequence if "ORIGIN" in self.entries else ""
277+
)
278+
self.id: str = (
279+
self.entries.get("ACCESSION").info if "ACCESSION" in self.entries else "N/A"
280+
)
281+
282+
def __repr__(self) -> str:
283+
locus_name = self.entries["LOCUS"].name if "LOCUS" in self.entries else "N/A"
284+
return f"<GenBankRecord for '{locus_name}'>"
285+
286+
287+
class GenBankParser:
288+
"""A parser that reads a GenBANK file and splits it into entry blocks"""
289+
290+
_ENTRY_PATTERN = re.compile(r"^[A-Z]+(\s|$)") # Line starts with all caps
291+
_RECORD_SEPARATOR = "//"
292+
293+
def __init__(self, filepath: str | Path) -> None:
294+
self.filepath = Path(filepath)
295+
296+
def read_all(self) -> str:
297+
with open(self.filepath) as f:
298+
return f.read()
299+
300+
def _split_into_blocks(
301+
self, file_contents: str
302+
) -> Generator[tuple[str, str], None, None]:
303+
"""Yeild (key, block) pairs from a GenBank record lazily while preserving duplicates"""
304+
305+
current_key: str | None = None
306+
buffer: list[str] = []
307+
308+
for line in file_contents.splitlines():
309+
# Detect header line
310+
if self._ENTRY_PATTERN.match(line):
311+
# restore the block if there is already a present key
312+
if current_key:
313+
yield current_key, "\n".join(buffer).strip()
314+
buffer.clear()
315+
current_key = line.split()[0]
316+
buffer.append(line)
317+
else:
318+
# continuation line — belongs to current block
319+
buffer.append(line)
320+
# Save the last block
321+
if current_key:
322+
yield current_key, "\n".join(buffer).strip()
323+
324+
def _split_into_records(self, file_contents: str) -> Generator[str, None, None]:
325+
"""Splits the file content into multiple GenBank record blocks"""
326+
for record_block in file_contents.split(f"\n{self._RECORD_SEPARATOR}\n"):
327+
clean_block = record_block.strip()
328+
if clean_block:
329+
# Re-add the seprator if it was present
330+
if not clean_block.endswith(self._RECORD_SEPARATOR):
331+
clean_block += f"\n{self._RECORD_SEPARATOR}"
332+
yield clean_block
333+
334+
def _parse_record(self, record_text: str) -> GenBankRecord:
335+
"""Parse a single record text block and returns a GenBank record object."""
336+
entries: dict[str, Any] = {}
337+
entry_classes = GenBankRecord._entry_classes
338+
339+
for key, block in self._split_into_blocks(record_text):
340+
# if the entry maps to an entry in the _entry_classes else create new one
341+
entry_cls = entry_classes.get(key)
342+
if key in entries:
343+
if isinstance(entries[key], str) and isinstance(block, str):
344+
# append repeated entries
345+
entries[key] += "\n" + block
346+
else:
347+
entries[key] = block
348+
else:
349+
# instantiate using the contructors in the _entry_classes
350+
# entry_cls here is a class (I have to remind myself that it is)
351+
# keep raw text if there is no mapping entry in the _entry_classes
352+
entries[key] = entry_cls(block) if entry_cls else block
353+
return GenBankRecord(entries, source_filepath=self.filepath)
354+
355+
def __iter__(self) -> Iterator[GenBankRecord]:
356+
"Allows iterating over the records in the file"
357+
file_contents = self.read_all()
358+
for record_text in self._split_into_records(file_contents):
359+
# Parse each record block and yield the GenBankRecord object
360+
if (
361+
record_text.strip() != self._RECORD_SEPARATOR
362+
): # Skip empty blocks from splitting
363+
yield self._parse_record(record_text)
364+
365+
366+
if __name__ == "__main__":
367+
main()

0 commit comments

Comments
 (0)