|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Create downsampled FASTQ/BAM/CRAM files from a solo pipeline sample table. |
| 3 | +
|
| 4 | +Reads a TSV with columns: |
| 5 | + pipeline_name, giab_id, day_id, aligned_cov, R1fq, R2fq, inBAM, inCRAM |
| 6 | +
|
| 7 | +For each sample, creates downsampled copies at target coverages using: |
| 8 | + - seqkit sample (paired FASTQ) |
| 9 | + - samtools view -s (BAM or CRAM) |
| 10 | +
|
| 11 | +Usage: |
| 12 | + python create_downsampled_datasets.py \\ |
| 13 | + --table solo_pipeline_samples.tsv \\ |
| 14 | + --outdir /fsx/data/downsampled \\ |
| 15 | + [--targets 0.1,0.5,1,1.5,3,5,10,15,20,30,40,50] \\ |
| 16 | + [--reference /path/to/ref.fasta] \\ |
| 17 | + [--threads 8] [--seed 42] [--dry-run] |
| 18 | +""" |
| 19 | +import argparse |
| 20 | +import csv |
| 21 | +import os |
| 22 | +import subprocess |
| 23 | +import sys |
| 24 | +from pathlib import Path |
| 25 | + |
| 26 | +TARGET_COVERAGES_DEFAULT = [0.1, 0.5, 1.0, 1.5, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0, 40.0, 50.0] |
| 27 | +SEED = 42 |
| 28 | + |
| 29 | + |
| 30 | +def parse_args(): |
| 31 | + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) |
| 32 | + p.add_argument("--table", required=True, help="Path to solo_pipeline_samples.tsv") |
| 33 | + p.add_argument("--outdir", required=True, help="Root output directory for downsampled files") |
| 34 | + p.add_argument("--targets", default=None, |
| 35 | + help="Comma-separated target coverages (default: 0.1,0.5,1,1.5,3,5,10,15,20,30,40,50)") |
| 36 | + p.add_argument("--reference", default=None, |
| 37 | + help="Reference FASTA (required for CRAM input; also used for BAM→CRAM if desired)") |
| 38 | + p.add_argument("--threads", type=int, default=8, help="Threads for samtools/seqkit (default: 8)") |
| 39 | + p.add_argument("--seed", type=int, default=SEED, help="Random seed (default: 42)") |
| 40 | + p.add_argument("--dry-run", action="store_true", help="Print commands without executing") |
| 41 | + p.add_argument("--samples", default=None, |
| 42 | + help="Comma-separated day_id values to process (default: all)") |
| 43 | + p.add_argument("--slurm", action="store_true", help="Generate and submit Slurm jobs instead of running directly") |
| 44 | + p.add_argument("--slurm-partition", default="i192mem", help="Slurm partition (default: i192mem)") |
| 45 | + p.add_argument("--slurm-comment", default="RandD", help="Slurm job comment (default: RandD)") |
| 46 | + p.add_argument("--conda-env", default="DOWNSAMPLE", help="Conda env to activate in Slurm jobs (default: DOWNSAMPLE)") |
| 47 | + p.add_argument("--job-dir", default="/fsx/scratch/downsample_jobs", help="Directory for job scripts and logs (default: /fsx/scratch/downsample_jobs)") |
| 48 | + return p.parse_args() |
| 49 | + |
| 50 | + |
| 51 | +def run_cmd(cmd, dry_run=False): |
| 52 | + """Run a shell command, or print it if dry_run.""" |
| 53 | + print(f" CMD: {cmd}", flush=True) |
| 54 | + if dry_run: |
| 55 | + return 0 |
| 56 | + result = subprocess.run(cmd, shell=True) |
| 57 | + if result.returncode != 0: |
| 58 | + print(f" ERROR: command returned {result.returncode}", file=sys.stderr) |
| 59 | + return result.returncode |
| 60 | + |
| 61 | + |
| 62 | +def detect_input_type(row): |
| 63 | + """Return ('fastq', 'bam', or 'cram') and the relevant file path(s).""" |
| 64 | + r1 = (row.get("R1fq") or "").strip() |
| 65 | + r2 = (row.get("R2fq") or "").strip() |
| 66 | + bam = (row.get("inBAM") or "").strip() |
| 67 | + cram = (row.get("inCRAM") or "").strip() |
| 68 | + |
| 69 | + if r1 and r1.lower() not in ("", "na", "tbd"): |
| 70 | + return "fastq", (r1, r2 if r2 and r2.lower() not in ("", "na") else None) |
| 71 | + if bam and bam.lower() not in ("", "na", "tbd"): |
| 72 | + return "bam", bam |
| 73 | + if cram and cram.lower() not in ("", "na", "tbd"): |
| 74 | + return "cram", cram |
| 75 | + return None, None |
| 76 | + |
| 77 | + |
| 78 | +def downsample_fastq(r1, r2, fraction, out_r1, out_r2, seed, threads, dry_run): |
| 79 | + """Downsample paired FASTQ with seqkit sample (same seed = paired sync).""" |
| 80 | + os.makedirs(os.path.dirname(out_r1), exist_ok=True) |
| 81 | + # seqkit sample with same seed keeps pairs in sync for name-sorted FASTQs |
| 82 | + cmd_r1 = (f"seqkit sample -j {threads} --line-width=0 --quiet " |
| 83 | + f"--rand-seed={seed} --seq-type=dna --proportion={fraction:.10f} " |
| 84 | + f'"{r1}" | pigz -p {threads} > "{out_r1}"') |
| 85 | + run_cmd(cmd_r1, dry_run) |
| 86 | + if r2: |
| 87 | + cmd_r2 = (f"seqkit sample -j {threads} --line-width=0 --quiet " |
| 88 | + f"--rand-seed={seed} --seq-type=dna --proportion={fraction:.10f} " |
| 89 | + f'"{r2}" | pigz -p {threads} > "{out_r2}"') |
| 90 | + run_cmd(cmd_r2, dry_run) |
| 91 | + |
| 92 | + |
| 93 | +def _samtools_subsample_arg(seed, fraction): |
| 94 | + """Format samtools -s SEED.FRAC argument (e.g. 42.0996 for seed=42, p=0.0996).""" |
| 95 | + # samtools expects INT.FRAC where the digits after '.' are the probability |
| 96 | + frac_digits = f"{fraction:.10f}"[2:] # strip leading "0." |
| 97 | + result = f"{seed}.{frac_digits}".rstrip("0") |
| 98 | + if result.endswith("."): |
| 99 | + result += "0" |
| 100 | + return result |
| 101 | + |
| 102 | + |
| 103 | +def downsample_bam(bam_path, fraction, out_path, seed, threads, dry_run): |
| 104 | + """Downsample BAM with samtools view -s SEED.FRAC.""" |
| 105 | + os.makedirs(os.path.dirname(out_path), exist_ok=True) |
| 106 | + frac_str = _samtools_subsample_arg(seed, fraction) |
| 107 | + cmd = (f'samtools view -@ {threads} -b -s {frac_str} "{bam_path}" ' |
| 108 | + f'| samtools sort -@ {threads} -o "{out_path}" && ' |
| 109 | + f'samtools index -@ {threads} "{out_path}"') |
| 110 | + run_cmd(cmd, dry_run) |
| 111 | + |
| 112 | + |
| 113 | +def downsample_cram(cram_path, fraction, out_path, reference, seed, threads, dry_run): |
| 114 | + """Downsample CRAM with samtools view -s SEED.FRAC.""" |
| 115 | + if not reference: |
| 116 | + print(f" ERROR: --reference required for CRAM input: {cram_path}", file=sys.stderr) |
| 117 | + return |
| 118 | + os.makedirs(os.path.dirname(out_path), exist_ok=True) |
| 119 | + frac_str = _samtools_subsample_arg(seed, fraction) |
| 120 | + cmd = (f'samtools view -@ {threads} -C -s {frac_str} ' |
| 121 | + f'--reference "{reference}" "{cram_path}" ' |
| 122 | + f'| samtools sort -@ {threads} --reference "{reference}" ' |
| 123 | + f'-O cram -o "{out_path}" && ' |
| 124 | + f'samtools index -@ {threads} "{out_path}"') |
| 125 | + run_cmd(cmd, dry_run) |
| 126 | + |
| 127 | + |
| 128 | +def build_downsample_commands(input_type, input_paths, targets, aligned_cov, outdir, pipeline, giab_id, day_id, reference, seed, threads): |
| 129 | + """Build list of (target_cov, command_string) tuples for a sample.""" |
| 130 | + commands = [] |
| 131 | + for tgt in targets: |
| 132 | + if tgt >= aligned_cov: |
| 133 | + continue |
| 134 | + |
| 135 | + fraction = tgt / aligned_cov |
| 136 | + cov_label = f"{tgt}x".replace(".", "p") |
| 137 | + sample_dir = outdir / pipeline / giab_id / day_id / cov_label |
| 138 | + |
| 139 | + if input_type == "fastq": |
| 140 | + r1, r2 = input_paths |
| 141 | + ext = ".fastq.gz" |
| 142 | + out_r1 = str(sample_dir / f"{giab_id}_{cov_label}_R1{ext}") |
| 143 | + out_r2 = str(sample_dir / f"{giab_id}_{cov_label}_R2{ext}") if r2 else None |
| 144 | + |
| 145 | + cmd_r1 = (f"mkdir -p {sample_dir} && " |
| 146 | + f"seqkit sample -j {threads} --line-width=0 --quiet " |
| 147 | + f"--rand-seed={seed} --seq-type=dna --proportion={fraction:.10f} " |
| 148 | + f'"{r1}" | pigz -p {threads} > "{out_r1}"') |
| 149 | + if r2: |
| 150 | + cmd_r2 = (f"seqkit sample -j {threads} --line-width=0 --quiet " |
| 151 | + f"--rand-seed={seed} --seq-type=dna --proportion={fraction:.10f} " |
| 152 | + f'"{r2}" | pigz -p {threads} > "{out_r2}"') |
| 153 | + cmd = f"{cmd_r1} && {cmd_r2}" |
| 154 | + else: |
| 155 | + cmd = cmd_r1 |
| 156 | + commands.append((tgt, cmd)) |
| 157 | + |
| 158 | + elif input_type == "bam": |
| 159 | + out_bam = str(sample_dir / f"{giab_id}_{cov_label}.bam") |
| 160 | + frac_str = _samtools_subsample_arg(seed, fraction) |
| 161 | + cmd = (f"mkdir -p {sample_dir} && " |
| 162 | + f'samtools view -@ {threads} -b -s {frac_str} "{input_paths}" ' |
| 163 | + f'| samtools sort -@ {threads} -o "{out_bam}" && ' |
| 164 | + f'samtools index -@ {threads} "{out_bam}"') |
| 165 | + commands.append((tgt, cmd)) |
| 166 | + |
| 167 | + elif input_type == "cram": |
| 168 | + out_cram = str(sample_dir / f"{giab_id}_{cov_label}.cram") |
| 169 | + frac_str = _samtools_subsample_arg(seed, fraction) |
| 170 | + cmd = (f"mkdir -p {sample_dir} && " |
| 171 | + f'samtools view -@ {threads} -C -s {frac_str} ' |
| 172 | + f'--reference "{reference}" "{input_paths}" ' |
| 173 | + f'| samtools sort -@ {threads} --reference "{reference}" ' |
| 174 | + f'-O cram -o "{out_cram}" && ' |
| 175 | + f'samtools index -@ {threads} "{out_cram}"') |
| 176 | + commands.append((tgt, cmd)) |
| 177 | + |
| 178 | + return commands |
| 179 | + |
| 180 | + |
| 181 | +def generate_slurm_script(day_id, pipeline, giab_id, commands, args): |
| 182 | + """Generate a Slurm job script for a single sample.""" |
| 183 | + job_name = f"downsample_{pipeline}_{giab_id}_{day_id}" |
| 184 | + script_path = Path(args.job_dir) / f"{job_name}.sh" |
| 185 | + log_path = Path(args.job_dir) / "logs" / f"{job_name}_%j.out" |
| 186 | + |
| 187 | + script_content = f"""#!/bin/bash |
| 188 | +#SBATCH --comment {args.slurm_comment} |
| 189 | +#SBATCH --partition {args.slurm_partition} |
| 190 | +#SBATCH --cpus-per-task={args.threads} |
| 191 | +#SBATCH --mem=64G |
| 192 | +#SBATCH --time=12:00:00 |
| 193 | +#SBATCH --job-name={job_name} |
| 194 | +#SBATCH --output={log_path} |
| 195 | +#SBATCH --error={log_path} |
| 196 | +
|
| 197 | +set -euo pipefail |
| 198 | +
|
| 199 | +echo "=== Downsampling {pipeline}/{giab_id}/{day_id} ===" |
| 200 | +echo "Start: $(date)" |
| 201 | +echo "Host: $(hostname)" |
| 202 | +echo "" |
| 203 | +
|
| 204 | +# Activate conda environment |
| 205 | +source /home/ubuntu/miniconda3/bin/activate {args.conda_env} |
| 206 | +
|
| 207 | +# Run downsampling commands |
| 208 | +""" |
| 209 | + |
| 210 | + for tgt, cmd in commands: |
| 211 | + script_content += f'\necho "Processing {tgt}x..."\n{cmd}\necho " {tgt}x complete"\n' |
| 212 | + |
| 213 | + script_content += f""" |
| 214 | +echo "" |
| 215 | +echo "End: $(date)" |
| 216 | +echo "=== All targets complete for {day_id} ===" |
| 217 | +""" |
| 218 | + |
| 219 | + return script_path, script_content |
| 220 | + |
| 221 | + |
| 222 | +def main(): |
| 223 | + args = parse_args() |
| 224 | + targets = TARGET_COVERAGES_DEFAULT |
| 225 | + if args.targets: |
| 226 | + targets = sorted(float(x) for x in args.targets.split(",")) |
| 227 | + |
| 228 | + filter_ids = None |
| 229 | + if args.samples: |
| 230 | + filter_ids = set(s.strip() for s in args.samples.split(",")) |
| 231 | + |
| 232 | + # Track processed (pipeline, day_id) pairs to handle duplicate day_ids across pipelines |
| 233 | + processed_pairs = set() |
| 234 | + |
| 235 | + outdir = Path(args.outdir) |
| 236 | + outdir.mkdir(parents=True, exist_ok=True) |
| 237 | + |
| 238 | + with open(args.table, newline="") as fh: |
| 239 | + reader = csv.DictReader(fh, delimiter="\t") |
| 240 | + rows = [r for r in reader] |
| 241 | + |
| 242 | + print(f"Loaded {len(rows)} rows from {args.table}") |
| 243 | + print(f"Target coverages: {targets}") |
| 244 | + if args.slurm: |
| 245 | + print(f"*** SLURM MODE — generating job scripts in {args.job_dir} ***") |
| 246 | + Path(args.job_dir).mkdir(parents=True, exist_ok=True) |
| 247 | + (Path(args.job_dir) / "logs").mkdir(parents=True, exist_ok=True) |
| 248 | + if args.dry_run: |
| 249 | + print("*** DRY RUN — no files will be created ***") |
| 250 | + print() |
| 251 | + |
| 252 | + skipped = 0 |
| 253 | + processed = 0 |
| 254 | + jobs_submitted = 0 |
| 255 | + |
| 256 | + for row in rows: |
| 257 | + day_id = row["day_id"].strip() |
| 258 | + pipeline = row["pipeline_name"].strip() |
| 259 | + giab_id = row["giab_id"].strip() |
| 260 | + aligned_cov_str = row["aligned_cov"].strip() |
| 261 | + |
| 262 | + # Skip if already processed this (pipeline, day_id) pair |
| 263 | + pair_key = (pipeline, day_id) |
| 264 | + if pair_key in processed_pairs: |
| 265 | + continue |
| 266 | + processed_pairs.add(pair_key) |
| 267 | + |
| 268 | + if filter_ids and day_id not in filter_ids: |
| 269 | + continue |
| 270 | + |
| 271 | + if aligned_cov_str.upper() in ("TBD", "", "NA"): |
| 272 | + print(f"SKIP {pipeline}/{giab_id} ({day_id}): aligned_cov={aligned_cov_str}") |
| 273 | + skipped += 1 |
| 274 | + continue |
| 275 | + |
| 276 | + aligned_cov = float(aligned_cov_str) |
| 277 | + input_type, input_paths = detect_input_type(row) |
| 278 | + |
| 279 | + if input_type is None: |
| 280 | + print(f"SKIP {pipeline}/{giab_id} ({day_id}): no input file found") |
| 281 | + skipped += 1 |
| 282 | + continue |
| 283 | + |
| 284 | + print(f"=== {pipeline} / {giab_id} / {day_id} === aligned_cov={aligned_cov}x type={input_type}") |
| 285 | + |
| 286 | + if args.slurm: |
| 287 | + # Build all commands for this sample |
| 288 | + commands = build_downsample_commands( |
| 289 | + input_type, input_paths, targets, aligned_cov, outdir, |
| 290 | + pipeline, giab_id, day_id, args.reference, args.seed, args.threads |
| 291 | + ) |
| 292 | + |
| 293 | + if not commands: |
| 294 | + print(f" No valid targets (all >= {aligned_cov}x)") |
| 295 | + continue |
| 296 | + |
| 297 | + # Generate job script |
| 298 | + script_path, script_content = generate_slurm_script(day_id, pipeline, giab_id, commands, args) |
| 299 | + |
| 300 | + print(f" Writing job script: {script_path}") |
| 301 | + with open(script_path, "w") as f: |
| 302 | + f.write(script_content) |
| 303 | + os.chmod(script_path, 0o755) |
| 304 | + |
| 305 | + # Submit job |
| 306 | + if not args.dry_run: |
| 307 | + submit_cmd = f"/opt/slurm/bin/sbatch {script_path}" |
| 308 | + print(f" Submitting: {submit_cmd}") |
| 309 | + result = subprocess.run(submit_cmd, shell=True, capture_output=True, text=True) |
| 310 | + if result.returncode == 0: |
| 311 | + print(f" ✓ {result.stdout.strip()}") |
| 312 | + jobs_submitted += 1 |
| 313 | + else: |
| 314 | + print(f" ✗ sbatch failed: {result.stderr.strip()}", file=sys.stderr) |
| 315 | + else: |
| 316 | + print(f" [DRY RUN] Would submit: /opt/slurm/bin/sbatch {script_path}") |
| 317 | + jobs_submitted += 1 |
| 318 | + |
| 319 | + processed += len(commands) |
| 320 | + |
| 321 | + else: |
| 322 | + # Direct execution mode (original behavior) |
| 323 | + for tgt in targets: |
| 324 | + if tgt >= aligned_cov: |
| 325 | + print(f" {tgt}x: SKIP (>= aligned {aligned_cov}x)") |
| 326 | + continue |
| 327 | + |
| 328 | + fraction = tgt / aligned_cov |
| 329 | + cov_label = f"{tgt}x".replace(".", "p") |
| 330 | + sample_dir = outdir / pipeline / giab_id / day_id / cov_label |
| 331 | + |
| 332 | + if input_type == "fastq": |
| 333 | + r1, r2 = input_paths |
| 334 | + ext = ".fastq.gz" |
| 335 | + out_r1 = str(sample_dir / f"{giab_id}_{cov_label}_R1{ext}") |
| 336 | + out_r2 = str(sample_dir / f"{giab_id}_{cov_label}_R2{ext}") if r2 else None |
| 337 | + print(f" {tgt}x: fraction={fraction:.6f} -> {out_r1}") |
| 338 | + downsample_fastq(r1, r2, fraction, out_r1, out_r2, args.seed, args.threads, args.dry_run) |
| 339 | + |
| 340 | + elif input_type == "bam": |
| 341 | + out_bam = str(sample_dir / f"{giab_id}_{cov_label}.bam") |
| 342 | + print(f" {tgt}x: fraction={fraction:.6f} -> {out_bam}") |
| 343 | + downsample_bam(input_paths, fraction, out_bam, args.seed, args.threads, args.dry_run) |
| 344 | + |
| 345 | + elif input_type == "cram": |
| 346 | + out_cram = str(sample_dir / f"{giab_id}_{cov_label}.cram") |
| 347 | + print(f" {tgt}x: fraction={fraction:.6f} -> {out_cram}") |
| 348 | + downsample_cram(input_paths, fraction, out_cram, args.reference, |
| 349 | + args.seed, args.threads, args.dry_run) |
| 350 | + |
| 351 | + processed += 1 |
| 352 | + |
| 353 | + print() |
| 354 | + if args.slurm: |
| 355 | + print(f"Done. Generated {jobs_submitted} Slurm jobs, {processed} total downsample operations, skipped {skipped} samples.") |
| 356 | + else: |
| 357 | + print(f"Done. Processed {processed} downsample operations, skipped {skipped} samples.") |
| 358 | + |
| 359 | + |
| 360 | +if __name__ == "__main__": |
| 361 | + main() |
| 362 | + |
0 commit comments