Skip to content

Commit 34b771b

Browse files
jdidionclaude
andcommitted
subsample: Fix skip-ahead bias, reproducibility, bgz, and error handling
- Skip-ahead no longer always starts at record 0; the first jump is drawn before selecting any record - Tile processing order is deterministic: tiles sorted by bin_key, per-tile RNG derived from base_seed + bin_key (no shared mutex) - Remove .bgz from is_gzipped() to match fastq::fs which only recognizes .gz - Temp file creation errors propagate as SubsampleError instead of panicking via unwrap() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 93869f4 commit 34b771b

File tree

2 files changed

+34
-24
lines changed

2 files changed

+34
-24
lines changed

src/commands/subsample.rs

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{
44
io::{self, BufRead, BufReader, BufWriter, Write},
55
ops::{Bound, RangeBounds},
66
path::{Path, PathBuf},
7-
sync::{Mutex, mpsc},
7+
sync::mpsc,
88
};
99

1010
use bitvec::vec::BitVec;
@@ -85,12 +85,7 @@ pub fn subsample(args: SubsampleArgs) -> Result<(), SubsampleError> {
8585
}
8686

8787
fn is_gzipped(path: &Path) -> bool {
88-
path
89-
.extension()
90-
.and_then(|e| e.to_str())
91-
.map(|ext| ["gz", "bgz"]
92-
.contains(&ext))
93-
.unwrap_or(false)
88+
path.extension().and_then(|e| e.to_str()) == Some("gz")
9489
}
9590

9691
fn subsample_approximate<Rng>(
@@ -563,6 +558,7 @@ impl Write for OutputWriter {
563558
}
564559

565560
struct TileInfo {
561+
bin_key: u64,
566562
record_count: usize,
567563
r1_path: PathBuf,
568564
r2_path: Option<PathBuf>,
@@ -609,24 +605,31 @@ fn write_tile_temp_files(
609605
}
610606

611607
if let Some(bin_key) = parse_tile_bin(r1_rec.name()) {
612-
let tw = tile_writers.entry(bin_key).or_insert_with(|| {
608+
if !tile_writers.contains_key(&bin_key) {
613609
let r1_path = temp_dir.join(format!("{bin_key}.r1.fq"));
614-
let r1_file = BufWriter::new(File::create(&r1_path).unwrap());
610+
let r1_file = BufWriter::new(
611+
File::create(&r1_path)
612+
.map_err(|e| SubsampleError::CreateFile(e, r1_path.clone()))?,
613+
);
615614
let (r2_writer, r2_path) = if paired {
616615
let p = temp_dir.join(format!("{bin_key}.r2.fq"));
617-
let f = BufWriter::new(File::create(&p).unwrap());
616+
let f = BufWriter::new(
617+
File::create(&p)
618+
.map_err(|e| SubsampleError::CreateFile(e, p.clone()))?,
619+
);
618620
(Some(fastq::io::Writer::new(f)), Some(p))
619621
} else {
620622
(None, None)
621623
};
622-
TileWriter {
624+
tile_writers.insert(bin_key, TileWriter {
623625
r1: fastq::io::Writer::new(r1_file),
624626
r1_path,
625627
r2: r2_writer,
626628
r2_path,
627629
record_count: 0,
628-
}
629-
});
630+
});
631+
}
632+
let tw = tile_writers.get_mut(&bin_key).unwrap();
630633

631634
tw.r1.write_record(&r1_rec)?;
632635
if let Some(r2w) = tw.r2.as_mut() {
@@ -639,22 +642,25 @@ fn write_tile_temp_files(
639642
}
640643

641644
// Collect TileInfo (writers are dropped here, flushing buffers)
642-
let tiles: Vec<TileInfo> = tile_writers
643-
.into_values()
644-
.map(|tw| TileInfo {
645+
let mut tiles: Vec<TileInfo> = tile_writers
646+
.into_iter()
647+
.map(|(key, tw)| TileInfo {
648+
bin_key: key,
645649
record_count: tw.record_count,
646650
r1_path: tw.r1_path,
647651
r2_path: tw.r2_path,
648652
})
649653
.collect();
654+
// Sort by bin key for deterministic processing order
655+
tiles.sort_unstable_by_key(|t| t.bin_key);
650656

651657
Ok((tiles, parse_failures))
652658
}
653659

654660
fn subsample_by_tile<Rng>(
655661
(r1_src, r1_dst): (&Path, &Path),
656662
(r2_src, r2_dst): (Option<&Path>, Option<&Path>),
657-
rng: Rng,
663+
mut rng: Rng,
658664
mode: TileCountMode,
659665
fast: bool,
660666
sampling_threads: usize,
@@ -775,8 +781,12 @@ where
775781
retained_count
776782
);
777783

784+
// Derive a base seed from the caller's RNG once, then derive per-tile
785+
// seeds deterministically from base_seed + bin_key. This avoids a shared
786+
// mutex and makes output reproducible regardless of thread scheduling.
787+
let base_seed: u64 = rng.random();
788+
778789
let (tx, rx) = mpsc::sync_channel::<TileResult>(threads * 2);
779-
let rng = Mutex::new(rng);
780790

781791
let selected_records = std::thread::scope(|s| -> Result<usize, SubsampleError> {
782792
// Writer thread: receives tile results and writes to output files
@@ -813,14 +823,11 @@ where
813823
.into_iter()
814824
.map(|chunk| {
815825
let tx = tx.clone();
816-
let rng = &rng;
817826

818827
s.spawn(move || -> Result<(), SubsampleError> {
819828
for tile in chunk {
820-
let mut tile_rng = {
821-
let mut rng = rng.lock().unwrap();
822-
SmallRng::seed_from_u64(rng.random())
823-
};
829+
let mut tile_rng =
830+
SmallRng::seed_from_u64(base_seed.wrapping_add(tile.bin_key));
824831

825832
// Skip-ahead only works for single-end; paired-end
826833
// always uses exact indexed sampling.

src/fastq/io/indexed_reader.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,12 @@ impl<R: Read + Seek> IndexedReader<R> {
117117

118118
let mean_jump = total as f64 / target as f64;
119119
let mut record = Record::default();
120-
let mut current: usize = 0;
121120
let mut count = 0;
122121

122+
// Draw the first jump to avoid always starting at record 0
123+
let first = (-mean_jump * rng.random::<f64>().ln()).ceil() as usize;
124+
let mut current: usize = first.min(total.saturating_sub(1));
125+
123126
while count < target && current < total {
124127
self.read_record_at(current, &mut record)?;
125128
callback(&record)?;

0 commit comments

Comments
 (0)