Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions tests/pipelines/cluster/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Snakemake outputs
results/
logs/
.snakemake/

# Cluster job outputs
slurm-*.out
*.o*
*.e*
277 changes: 277 additions & 0 deletions tests/pipelines/cluster/Snakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
# Cluster test pipeline for charmer development
# Designed to create many pending jobs and demonstrate failure handling
#
# DAG Structure (~76 jobs):
# process_sample (12) -> align_sample (12) -> call_variants (48)
# |
# merge_sample_variants (12)
# |
# group_analysis (2)
# |
# final_merge (1)
# |
# generate_report (1)

configfile: "config/config.yaml"

# Load samples configuration
import yaml
with open(config["samples_file"]) as f:
samples_config = yaml.safe_load(f)

SAMPLES = list(samples_config["samples"].keys())
CHROMOSOMES = config["chromosomes"]
GROUPS = config["groups"]
DELAYS = config["delays"]

# Helper to get sample metadata
def get_sample_meta(sample, key, default=None):
return samples_config["samples"].get(sample, {}).get(key, default)

# Get samples that should succeed (no fail_at defined)
SUCCESSFUL_SAMPLES = [s for s in SAMPLES if get_sample_meta(s, "fail_at") is None]

# Get samples by group (excluding failure group)
def get_samples_by_group(group):
return [s for s in SAMPLES
if get_sample_meta(s, "group") == group
and get_sample_meta(s, "fail_at") is None]

rule all:
input:
"results/final_report.txt"

rule process_sample:
"""Initial sample processing - first step for all samples."""
output:
"results/processed/{sample}.txt"
params:
delay=DELAYS["process"],
fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"),
exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1)
threads: 1
resources:
mem_mb=2000,
runtime=10
log:
"logs/process_sample/{sample}.log"
shell:
"""
exec > {log} 2>&1
echo "Starting processing for {wildcards.sample}..."
echo "INFO: Initializing sample processing pipeline"
echo "INFO: Checking input data integrity"

# Check if this sample should fail at this stage
if [ "{params.fail_at}" = "process" ]; then
echo "ERROR: Data validation failed for {wildcards.sample}"
echo "ERROR: Input file appears corrupted or malformed"
echo "ERROR: Cannot proceed with processing"
sleep 5
exit {params.exit_code}
fi

sleep {params.delay}
echo "INFO: Sample {wildcards.sample} processed successfully"
echo "Processed: {wildcards.sample}" > {output}
"""

rule align_sample:
"""Alignment step - depends on processing."""
input:
"results/processed/{sample}.txt"
output:
"results/aligned/{sample}.bam"
params:
delay=DELAYS["align"],
fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"),
exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1)
threads: 2
resources:
mem_mb=4000,
runtime=15
log:
"logs/align_sample/{sample}.log"
shell:
"""
exec > {log} 2>&1
echo "Starting alignment for {wildcards.sample}..."
echo "INFO: Loading reference genome"
sleep 5
echo "INFO: Indexing reference"
sleep 3
echo "INFO: Running aligner on {wildcards.sample}"

# Check if this sample should fail at this stage
if [ "{params.fail_at}" = "align" ]; then
echo "ERROR: Out of memory while aligning {wildcards.sample}"
echo "ERROR: Process killed by OOM killer"
echo "ERROR: Peak memory usage: 32.5 GB (limit: 16 GB)"
echo "ERROR: Consider increasing memory allocation"
sleep 2
exit {params.exit_code}
fi

sleep {params.delay}
echo "INFO: Alignment complete for {wildcards.sample}"
echo "INFO: Mapped reads: 45,234,567"
echo "INFO: Unmapped reads: 1,234,567"
echo "Aligned: {wildcards.sample}" > {output}
"""

rule call_variants:
"""Variant calling per chromosome - creates fan-out pattern."""
input:
bam="results/aligned/{sample}.bam"
output:
vcf="results/variants/{sample}_{chrom}.vcf"
params:
delay=DELAYS["variants"],
fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"),
exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1)
threads: 2
resources:
mem_mb=4000,
runtime=20
log:
"logs/call_variants/{sample}_{chrom}.log"
shell:
"""
exec > {log} 2>&1
echo "Starting variant calling for {wildcards.sample} on {wildcards.chrom}..."
echo "INFO: Loading BAM file"
sleep 3
echo "INFO: Scanning {wildcards.chrom} for variants"

# Check if this sample should fail at this stage
if [ "{params.fail_at}" = "call_variants" ]; then
echo "ERROR: Variant calling timed out for {wildcards.sample} on {wildcards.chrom}"
echo "ERROR: Wall clock limit exceeded"
echo "ERROR: Job ran for 3600 seconds (limit: 1800 seconds)"
sleep 2
exit {params.exit_code}
fi

sleep {params.delay}
echo "INFO: Found 12,345 variants on {wildcards.chrom}"
echo "INFO: SNPs: 10,234 Indels: 2,111"
echo "Variants: {wildcards.sample} {wildcards.chrom}" > {output.vcf}
"""

rule merge_sample_variants:
"""Merge variants per sample across chromosomes - fan-in pattern."""
input:
expand("results/variants/{{sample}}_{chrom}.vcf", chrom=CHROMOSOMES)
output:
"results/merged/{sample}_merged.vcf"
params:
delay=DELAYS["merge"],
fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"),
exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1)
threads: 1
resources:
mem_mb=2000,
runtime=10
log:
"logs/merge_sample/{sample}.log"
shell:
"""
exec > {log} 2>&1
echo "Starting variant merge for {wildcards.sample}..."
echo "INFO: Merging VCF files from {CHROMOSOMES}"

# Check if this sample should fail at this stage
if [ "{params.fail_at}" = "merge" ]; then
echo "ERROR: Segmentation fault while merging {wildcards.sample}"
echo "ERROR: Memory corruption detected at 0x7fff5fbff8c0"
echo "ERROR: Stack trace:"
echo "ERROR: #0 merge_vcf_records() at vcfmerge.c:234"
echo "ERROR: #1 main() at vcfmerge.c:567"
sleep 2
exit {params.exit_code}
fi

sleep {params.delay}
cat {input} > {output}
echo "INFO: Merge complete for {wildcards.sample}"
echo "INFO: Total variants: 49,380"
"""

rule group_analysis:
"""Per-group analysis - aggregates samples by group."""
input:
lambda wc: expand("results/merged/{sample}_merged.vcf",
sample=get_samples_by_group(wc.group))
output:
"results/groups/{group}_analysis.txt"
params:
delay=DELAYS["group_analysis"]
threads: 2
resources:
mem_mb=4000,
runtime=15
log:
"logs/group_analysis/{group}.log"
shell:
"""
exec > {log} 2>&1
echo "Starting group analysis for {wildcards.group}..."
echo "INFO: Aggregating samples in group"
echo "INFO: Input samples: {input}"
sleep {params.delay}
echo "Group {wildcards.group} analysis complete" > {output}
echo "INFO: Analysis complete for {wildcards.group}"
"""

rule final_merge:
"""Final merge of all group analyses."""
input:
expand("results/groups/{group}_analysis.txt", group=GROUPS)
output:
"results/final_analysis.txt"
params:
delay=DELAYS["final_merge"]
threads: 1
resources:
mem_mb=2000,
runtime=10
log:
"logs/final_merge.log"
shell:
"""
exec > {log} 2>&1
echo "Starting final merge of all groups..."
echo "INFO: Combining group analyses"
sleep {params.delay}
cat {input} > {output}
echo "INFO: Final merge complete"
"""

rule generate_report:
"""Generate final pipeline report."""
input:
analysis="results/final_analysis.txt",
merged=expand("results/merged/{sample}_merged.vcf", sample=SUCCESSFUL_SAMPLES)
output:
"results/final_report.txt"
params:
delay=DELAYS["report"]
threads: 1
resources:
mem_mb=1000,
runtime=5
log:
"logs/generate_report.log"
shell:
"""
exec > {log} 2>&1
echo "Generating final report..."
echo "INFO: Collecting pipeline statistics"
sleep {params.delay}
echo "=== Pipeline Complete ===" > {output}
echo "Successful samples: {SUCCESSFUL_SAMPLES}" >> {output}
echo "Groups analyzed: {GROUPS}" >> {output}
echo "Chromosomes: {CHROMOSOMES}" >> {output}
date >> {output}
echo "INFO: Report generated successfully"
"""
48 changes: 48 additions & 0 deletions tests/pipelines/cluster/cluster/lsf/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Snakemake LSF Executor Profile Configuration
# =============================================
# This profile configures Snakemake to submit jobs to LSF clusters
# Usage: snakemake --profile cluster/lsf

# Executor settings
executor: lsf
jobs: 50 # Maximum concurrent jobs
latency-wait: 15 # Wait time for output files (seconds)

# Default resources for all rules
default-resources:
- "runtime=30"
- "mem_mb=4000"
- "lsf_queue=rna"
- "lsf_project=charmer-test"
- 'lsf_extra=""'

# Rule-specific resource overrides
set-resources:
- process_sample:runtime=10
- process_sample:mem_mb=2000

- align_sample:runtime=15
- align_sample:mem_mb=4000

- call_variants:runtime=20
- call_variants:mem_mb=4000

- merge_sample_variants:runtime=10
- merge_sample_variants:mem_mb=2000

- group_analysis:runtime=15
- group_analysis:mem_mb=4000

- final_merge:runtime=10
- final_merge:mem_mb=2000

- generate_report:runtime=5
- generate_report:mem_mb=1000

# Behavior settings
rerun-incomplete: true
keep-going: true

# Output settings
printshellcmds: true
show-failed-logs: true
Loading