Skip to content
Open
72 changes: 72 additions & 0 deletions ishlib/formats/fastxpp_bench.mojo
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import sys
from time.time import perf_counter
from ishlib.vendor.kseq import FastxReader, BufferedReader, KRead


# ── thin wrapper so FileHandle implements KRead ─────────────────────────
struct FileReader(KRead):
var fh: FileHandle

fn __init__(out self, owned fh: FileHandle):
self.fh = fh^

fn __moveinit__(out self, owned other: Self):
self.fh = other.fh^

fn unbuffered_read[
o: MutableOrigin
](mut self, buffer: Span[UInt8, o]) raises -> Int:
return Int(self.fh.read(buffer.unsafe_ptr(), len(buffer)))


# ────────────────────────────────────────────────────────────────────────


fn bench_original(path: String) raises -> (Int, Int, Float64):
var fh = open(path, "r")
var rdr = FastxReader[read_comment=False](BufferedReader(FileReader(fh^)))
var rec = 0
var seq = 0
var t0 = perf_counter()
while rdr.read() > 0:
rec += 1
seq += len(rdr.seq)
return (rec, seq, perf_counter() - t0)


fn bench_fastxpp(path: String) raises -> (Int, Int, Float64):
var fh = open(path, "r")
var rdr = FastxReader[read_comment=False](BufferedReader(FileReader(fh^)))
var rec = 0
var seq = 0
var t0 = perf_counter()
while True:
var n = rdr.read_fastxpp()
if n < 0:
break
rec += 1
seq += n
return (rec, seq, perf_counter() - t0)


fn main() raises:
var argv = sys.argv()
if len(argv) < 2 or len(argv) > 3:
print("Usage: mojo run fastxpp_bench.mojo <file> [fastxpp]")
return

var path = String(argv[1])
var use_fast = (len(argv) == 3) and (String(argv[2]) == "fastxpp")

if use_fast:
var tup = bench_fastxpp(path) # (Int, Int, Float64)
var r = tup[0] # records
var s = tup[1] # bases
var t = tup[2] # seconds
print("mode=fastxpp records=", r, " bases=", s, " time=", t, "s")
else:
var tup = bench_original(path)
var r = tup[0]
var s = tup[1]
var t = tup[2]
print("mode=orig records=", r, " bases=", s, " time=", t, "s")
115 changes: 115 additions & 0 deletions ishlib/formats/generate_fastxpp.mojo
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import sys
from collections import Optional
from ExtraMojo.io.buffered import BufferedReader, BufferedWriter

# ---------- helpers -------------------------------------------------


fn string_count(s: String) -> Int:
var n: Int = 0
for _ in s.codepoints():
n = n + 1
return n


fn read_line(mut rdr: BufferedReader) raises -> String:
var buf = List[UInt8]()
var n = rdr.read_until(buf, ord("\n"))
if n == 0:
return ""
var s = String()
s.write_bytes(Span(buf))
return s


# ---------- FASTX++ builder -----------------------------------------


fn generate_fastxpp(
marker: String,
header: String,
seq_lines: List[String],
qualities: Optional[List[String]] = None,
) -> String:
var seq_len: Int = 0
for i in range(len(seq_lines)):
seq_len = seq_len + string_count(seq_lines[i])

var meta = String(string_count(header)) + ":" + String(
seq_len
) + ":" + String(len(seq_lines))

var rec = marker + "`" + meta + "`" + header + "\n"

for i in range(len(seq_lines)):
rec.write(seq_lines[i], "\n")

if qualities:
var q = qualities.value()
rec += "+\n"
for i in range(len(q)):
rec.write(q[i], "\n")

return rec


# ---------- main ----------------------------------------------------


fn main() raises:
var argv = sys.argv()
if len(argv) != 3:
print(
"Usage: mojo run generate_fastxpp.mojo <input.fastx>"
" <output.fastxpp>"
)
return

var reader = BufferedReader(
open(String(argv[1]), "r"), buffer_capacity=128 * 1024
)
var writer = BufferedWriter(
open(String(argv[2]), "w"), buffer_capacity=128 * 1024
)

var pending_header = String() # carries a header we already read

while True:
var header_line = pending_header
if header_line == "":
header_line = read_line(reader)
pending_header = String()

if header_line == "":
break

var marker = String(header_line[0:1])
var header = String(header_line[1:])

var seq = List[String]()
var line: String

while True:
line = read_line(reader)
if line == "":
break
if (
line.startswith(">")
or line.startswith("@")
or (marker == "@" and line.startswith("+"))
):
pending_header = line # save for the next record
break
seq.append(line)

var qual: Optional[List[String]] = None
if marker == "@" and line.startswith("+"):
var qlines = List[String]()
for _ in range(len(seq)):
qlines.append(read_line(reader))
qual = Optional[List[String]](qlines)

writer.write(generate_fastxpp(marker, header, seq, qual))

writer.flush()
writer.close()
183 changes: 182 additions & 1 deletion ishlib/vendor/kseq.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def main():
print(count, slen, qlen, sep="\t")
```
"""
import sys
from memory import UnsafePointer, memcpy
from utils import StringSlice

Expand All @@ -36,6 +37,68 @@ alias ASCII_FASTA_RECORD_START = ord(">")
alias ASCII_FASTQ_RECORD_START = ord("@")
alias ASCII_FASTQ_SEPARATOR = ord("+")

# ──────────────────────────────────────────────────────────────
# Helpers for reading in fastx++ files
# ──────────────────────────────────────────────────────────────


@always_inline
fn strip_newlines_in_place(
mut bs: ByteString, disk: Int, expected: Int
) -> Bool:
"""Compact `bs` by removing every `\n` byte in‑place; return True if the
resulting length equals `expected`.
SIMD search for newline, shifts the bytes to the left, and resizes the buffer.
Avoids allocating a new buffer and copying the data.

bs: Mutable buffer that already holds the raw FASTQ/FASTA chunk just read from disk
disk: The number of bytes that were actually read into bs
expected: how many bases/quality bytes should remain after stripping newlines;
used as a quick integrity check.

Returns:
True if the resulting buffer's length equals `expected`, False otherwise.
"""
# read_pos always starts the loop at the first byte that has not yet been examined.
var read_pos: Int = 0
# write_pos always starts at the first byte that has not yet been written into its final position
var write_pos: Int = 0
# Before the first newline, every byte is kept, so the pointers march together (no gap)
# After the first newline, the pointers may diverge, and we will need to copy bytes

while read_pos < disk:
var span_rel = memchr[do_alignment=True](
Span[UInt8, __origin_of(bs.ptr)](
ptr=bs.ptr.offset(read_pos), length=disk - read_pos
),
UInt8(ASCII_NEWLINE),
)
# If there are no new lines we dont have to adjust buffer
# If there was newlines, compute the contiguous span without newlines
var end_pos = disk if span_rel == -1 else read_pos + span_rel
var span_len = end_pos - read_pos
# We only need to copy if there are newlines that would made gaps resulting in write_pos != read_pos
# See read_pos and write_pos comments above
if span_len > 0 and write_pos != read_pos:
memcpy(bs.ptr.offset(write_pos), bs.ptr.offset(read_pos), span_len)
write_pos += span_len
read_pos = end_pos + 1 # skip the '\n' (or exit loop if none)
bs.resize(write_pos)
return write_pos == expected


# This is long way of https://lemire.me/blog/2022/01/21/swar-explained-parsing-eight-digits/
# not directly used, read_fastxpp uses cur = cur * 10 + Int(ch - UInt8(ord("0"))) directly
# Keeping here for comparison to SWAR implementation
fn ascii_to_int[O: Origin](buf: Span[UInt8, O], s: Int, e: Int) -> Int:
var v: Int = 0
for i in range(s, e):
v = v * 10 + Int(buf[i] - ord("0"))
return v


# ──────────────────────────────────────────────────────────────


@value
@register_passable("trivial")
Expand Down Expand Up @@ -384,7 +447,7 @@ struct FastxReader[R: KRead, read_comment: Bool = True](Movable):
if c < 0:
return Int(c) # EOF Error

# Reset all members
# Reset all buffers for reuse
self.seq.clear()
self.qual.clear()
self.comment.clear()
Expand Down Expand Up @@ -455,3 +518,121 @@ struct FastxReader[R: KRead, read_comment: Bool = True](Movable):
if len(self.qual) != len(self.seq):
return -2 # error: qual string is of different length
return len(self.seq)

fn read_fastxpp(mut self) raises -> Int:
# Locate next header
if self.last_char == 0:
var c = self.reader.read_byte()
while (
c >= 0
and c != ASCII_FASTA_RECORD_START
and c != ASCII_FASTQ_RECORD_START
):
c = self.reader.read_byte()
if c < 0:
return Int(c) # EOF
else:
var c = self.last_char
self.last_char = 0

# Clear buffers for this record
self.seq.clear()
self.qual.clear()
self.comment.clear()
self.name.clear()

# Read header line (without copying '\n')
var r = self.reader.read_until[SearchChar.Newline](self.name, 0, False)
if r < 0:
return Int(r)

var hdr = self.name.as_span()
if len(hdr) == 0 or hdr[0] != UInt8(ord("`")):
return -3 # not FASTX++ header

# Find closing back‑tick with one memchr
var after = Span[UInt8, __origin_of(hdr)](
ptr=hdr.unsafe_ptr().offset(1), length=len(hdr) - 1
)
var rel = memchr(after, UInt8(ord("`")))
if rel == -1:
return -3
var pos2 = rel + 1

# Parse hlen, slen, lcnt in one pass
var hlen: Int = 0
var slen: Int = 0
var lcnt: Int = 0
var cur: Int = 0
var which: Int = 0
for i in range(1, pos2):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can save the swap here by doing a loop per variable, which should be fine. And then it can be a single method you can optimize to death (in a good way :) ).

var ch = hdr[i]
if ch == UInt8(ord(":")):
if which == 0:
hlen = cur
elif which == 1:
slen = cur
cur = 0
which += 1
else:
cur = cur * 10 + Int(ch - UInt8(ord("0")))
lcnt = cur

# Validate header length and resize name
if len(hdr) - (pos2 + 1) != hlen:
return -3
self.name.resize(hlen)

# ── Sequence block ────────────────────────────────────────────
var disk = slen + lcnt # bytes on disk (bases+LFs)
var need = disk # local copy; will be mutated
self.seq.clear()
self.seq.reserve(need)
var got = self.reader.read_bytes(self.seq, need)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be faster if you did loop of memcpy instead of stripping the newlines after the fact. Maybe we need to adjust the format a bit to make that easy? like use "bytes per line", "lines", last_line_bytes" or something.

That way we only have to touch each byte one time, to copy it into the list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we just need total bytes and bytes per line.

var copied_bytes = 0
while copied_bytes < total_bytes:
    copied_bytes += self.reader.read_bytes(self.seq, min(bytes_per_line, total_bytes - copied_bytes)
    var _newline = self.reader.read_byte() 

if got != disk:
return -3
if not strip_newlines_in_place(self.seq, disk, slen):
return -2

# ── Quality block (FASTQ) ─────────────────────────────────────
if hdr[0] == UInt8(ASCII_FASTQ_RECORD_START):
if (
self.reader.read_byte() != ASCII_NEWLINE
): # consume LF after '+' line
return -3
var needq = disk # separate mutable copy
self.qual.clear()
self.qual.reserve(needq)
var gotq = self.reader.read_bytes(self.qual, needq)
if gotq != disk:
return -3
if not strip_newlines_in_place(self.qual, disk, slen):
return -2

return len(self.seq)


# ──────────────────────────────────────────────────────────────
# Main for debugging
# ──────────────────────────────────────────────────────────────
#
# fn main() raises:
# var argv = sys.argv()
# if len(argv) != 2:
# print("Usage: mojo run kseq.mojo <file.fastxpp>")
# return

# var fh = open(String(argv[1]), "r")
# var reader = FastxReader[read_comment=False](
# BufferedReader(FileReader(fh^)) # move FileHandle into FileReader
# )

# var code = reader.read_fastxpp()
# print("first‑read returned", code)
# var count = 0
# while True:
# var n = reader.read_fastxpp()
# if n < 0:
# break
# count += 1
# print("rec#", count, "seq_len", n, "hdr_len", len(reader.name))