diff --git a/tests/pipelines/cluster/.gitignore b/tests/pipelines/cluster/.gitignore new file mode 100644 index 0000000..6bf305c --- /dev/null +++ b/tests/pipelines/cluster/.gitignore @@ -0,0 +1,9 @@ +# Snakemake outputs +results/ +logs/ +.snakemake/ + +# Cluster job outputs +slurm-*.out +*.o* +*.e* diff --git a/tests/pipelines/cluster/Snakefile b/tests/pipelines/cluster/Snakefile new file mode 100644 index 0000000..58eaa61 --- /dev/null +++ b/tests/pipelines/cluster/Snakefile @@ -0,0 +1,277 @@ +# Cluster test pipeline for charmer development +# Designed to create many pending jobs and demonstrate failure handling +# +# DAG Structure (~76 jobs): +# process_sample (12) -> align_sample (12) -> call_variants (48) +# | +# merge_sample_variants (12) +# | +# group_analysis (2) +# | +# final_merge (1) +# | +# generate_report (1) + +configfile: "config/config.yaml" + +# Load samples configuration +import yaml +with open(config["samples_file"]) as f: + samples_config = yaml.safe_load(f) + +SAMPLES = list(samples_config["samples"].keys()) +CHROMOSOMES = config["chromosomes"] +GROUPS = config["groups"] +DELAYS = config["delays"] + +# Helper to get sample metadata +def get_sample_meta(sample, key, default=None): + return samples_config["samples"].get(sample, {}).get(key, default) + +# Get samples that should succeed (no fail_at defined) +SUCCESSFUL_SAMPLES = [s for s in SAMPLES if get_sample_meta(s, "fail_at") is None] + +# Get samples by group (excluding failure group) +def get_samples_by_group(group): + return [s for s in SAMPLES + if get_sample_meta(s, "group") == group + and get_sample_meta(s, "fail_at") is None] + +rule all: + input: + "results/final_report.txt" + +rule process_sample: + """Initial sample processing - first step for all samples.""" + output: + "results/processed/{sample}.txt" + params: + delay=DELAYS["process"], + fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"), + exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1) + threads: 1 + resources: + mem_mb=2000, + runtime=10 + log: + "logs/process_sample/{sample}.log" + shell: + """ + exec > {log} 2>&1 + echo "Starting processing for {wildcards.sample}..." + echo "INFO: Initializing sample processing pipeline" + echo "INFO: Checking input data integrity" + + # Check if this sample should fail at this stage + if [ "{params.fail_at}" = "process" ]; then + echo "ERROR: Data validation failed for {wildcards.sample}" + echo "ERROR: Input file appears corrupted or malformed" + echo "ERROR: Cannot proceed with processing" + sleep 5 + exit {params.exit_code} + fi + + sleep {params.delay} + echo "INFO: Sample {wildcards.sample} processed successfully" + echo "Processed: {wildcards.sample}" > {output} + """ + +rule align_sample: + """Alignment step - depends on processing.""" + input: + "results/processed/{sample}.txt" + output: + "results/aligned/{sample}.bam" + params: + delay=DELAYS["align"], + fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"), + exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1) + threads: 2 + resources: + mem_mb=4000, + runtime=15 + log: + "logs/align_sample/{sample}.log" + shell: + """ + exec > {log} 2>&1 + echo "Starting alignment for {wildcards.sample}..." + echo "INFO: Loading reference genome" + sleep 5 + echo "INFO: Indexing reference" + sleep 3 + echo "INFO: Running aligner on {wildcards.sample}" + + # Check if this sample should fail at this stage + if [ "{params.fail_at}" = "align" ]; then + echo "ERROR: Out of memory while aligning {wildcards.sample}" + echo "ERROR: Process killed by OOM killer" + echo "ERROR: Peak memory usage: 32.5 GB (limit: 16 GB)" + echo "ERROR: Consider increasing memory allocation" + sleep 2 + exit {params.exit_code} + fi + + sleep {params.delay} + echo "INFO: Alignment complete for {wildcards.sample}" + echo "INFO: Mapped reads: 45,234,567" + echo "INFO: Unmapped reads: 1,234,567" + echo "Aligned: {wildcards.sample}" > {output} + """ + +rule call_variants: + """Variant calling per chromosome - creates fan-out pattern.""" + input: + bam="results/aligned/{sample}.bam" + output: + vcf="results/variants/{sample}_{chrom}.vcf" + params: + delay=DELAYS["variants"], + fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"), + exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1) + threads: 2 + resources: + mem_mb=4000, + runtime=20 + log: + "logs/call_variants/{sample}_{chrom}.log" + shell: + """ + exec > {log} 2>&1 + echo "Starting variant calling for {wildcards.sample} on {wildcards.chrom}..." + echo "INFO: Loading BAM file" + sleep 3 + echo "INFO: Scanning {wildcards.chrom} for variants" + + # Check if this sample should fail at this stage + if [ "{params.fail_at}" = "call_variants" ]; then + echo "ERROR: Variant calling timed out for {wildcards.sample} on {wildcards.chrom}" + echo "ERROR: Wall clock limit exceeded" + echo "ERROR: Job ran for 3600 seconds (limit: 1800 seconds)" + sleep 2 + exit {params.exit_code} + fi + + sleep {params.delay} + echo "INFO: Found 12,345 variants on {wildcards.chrom}" + echo "INFO: SNPs: 10,234 Indels: 2,111" + echo "Variants: {wildcards.sample} {wildcards.chrom}" > {output.vcf} + """ + +rule merge_sample_variants: + """Merge variants per sample across chromosomes - fan-in pattern.""" + input: + expand("results/variants/{{sample}}_{chrom}.vcf", chrom=CHROMOSOMES) + output: + "results/merged/{sample}_merged.vcf" + params: + delay=DELAYS["merge"], + fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"), + exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1) + threads: 1 + resources: + mem_mb=2000, + runtime=10 + log: + "logs/merge_sample/{sample}.log" + shell: + """ + exec > {log} 2>&1 + echo "Starting variant merge for {wildcards.sample}..." + echo "INFO: Merging VCF files from {CHROMOSOMES}" + + # Check if this sample should fail at this stage + if [ "{params.fail_at}" = "merge" ]; then + echo "ERROR: Segmentation fault while merging {wildcards.sample}" + echo "ERROR: Memory corruption detected at 0x7fff5fbff8c0" + echo "ERROR: Stack trace:" + echo "ERROR: #0 merge_vcf_records() at vcfmerge.c:234" + echo "ERROR: #1 main() at vcfmerge.c:567" + sleep 2 + exit {params.exit_code} + fi + + sleep {params.delay} + cat {input} > {output} + echo "INFO: Merge complete for {wildcards.sample}" + echo "INFO: Total variants: 49,380" + """ + +rule group_analysis: + """Per-group analysis - aggregates samples by group.""" + input: + lambda wc: expand("results/merged/{sample}_merged.vcf", + sample=get_samples_by_group(wc.group)) + output: + "results/groups/{group}_analysis.txt" + params: + delay=DELAYS["group_analysis"] + threads: 2 + resources: + mem_mb=4000, + runtime=15 + log: + "logs/group_analysis/{group}.log" + shell: + """ + exec > {log} 2>&1 + echo "Starting group analysis for {wildcards.group}..." + echo "INFO: Aggregating samples in group" + echo "INFO: Input samples: {input}" + sleep {params.delay} + echo "Group {wildcards.group} analysis complete" > {output} + echo "INFO: Analysis complete for {wildcards.group}" + """ + +rule final_merge: + """Final merge of all group analyses.""" + input: + expand("results/groups/{group}_analysis.txt", group=GROUPS) + output: + "results/final_analysis.txt" + params: + delay=DELAYS["final_merge"] + threads: 1 + resources: + mem_mb=2000, + runtime=10 + log: + "logs/final_merge.log" + shell: + """ + exec > {log} 2>&1 + echo "Starting final merge of all groups..." + echo "INFO: Combining group analyses" + sleep {params.delay} + cat {input} > {output} + echo "INFO: Final merge complete" + """ + +rule generate_report: + """Generate final pipeline report.""" + input: + analysis="results/final_analysis.txt", + merged=expand("results/merged/{sample}_merged.vcf", sample=SUCCESSFUL_SAMPLES) + output: + "results/final_report.txt" + params: + delay=DELAYS["report"] + threads: 1 + resources: + mem_mb=1000, + runtime=5 + log: + "logs/generate_report.log" + shell: + """ + exec > {log} 2>&1 + echo "Generating final report..." + echo "INFO: Collecting pipeline statistics" + sleep {params.delay} + echo "=== Pipeline Complete ===" > {output} + echo "Successful samples: {SUCCESSFUL_SAMPLES}" >> {output} + echo "Groups analyzed: {GROUPS}" >> {output} + echo "Chromosomes: {CHROMOSOMES}" >> {output} + date >> {output} + echo "INFO: Report generated successfully" + """ diff --git a/tests/pipelines/cluster/cluster/lsf/config.yaml b/tests/pipelines/cluster/cluster/lsf/config.yaml new file mode 100644 index 0000000..6b67c5b --- /dev/null +++ b/tests/pipelines/cluster/cluster/lsf/config.yaml @@ -0,0 +1,48 @@ +# Snakemake LSF Executor Profile Configuration +# ============================================= +# This profile configures Snakemake to submit jobs to LSF clusters +# Usage: snakemake --profile cluster/lsf + +# Executor settings +executor: lsf +jobs: 50 # Maximum concurrent jobs +latency-wait: 15 # Wait time for output files (seconds) + +# Default resources for all rules +default-resources: + - "runtime=30" + - "mem_mb=4000" + - "lsf_queue=rna" + - "lsf_project=charmer-test" + - 'lsf_extra=""' + +# Rule-specific resource overrides +set-resources: + - process_sample:runtime=10 + - process_sample:mem_mb=2000 + + - align_sample:runtime=15 + - align_sample:mem_mb=4000 + + - call_variants:runtime=20 + - call_variants:mem_mb=4000 + + - merge_sample_variants:runtime=10 + - merge_sample_variants:mem_mb=2000 + + - group_analysis:runtime=15 + - group_analysis:mem_mb=4000 + + - final_merge:runtime=10 + - final_merge:mem_mb=2000 + + - generate_report:runtime=5 + - generate_report:mem_mb=1000 + +# Behavior settings +rerun-incomplete: true +keep-going: true + +# Output settings +printshellcmds: true +show-failed-logs: true diff --git a/tests/pipelines/cluster/cluster/slurm/config.yaml b/tests/pipelines/cluster/cluster/slurm/config.yaml new file mode 100644 index 0000000..280c7da --- /dev/null +++ b/tests/pipelines/cluster/cluster/slurm/config.yaml @@ -0,0 +1,63 @@ +# Snakemake SLURM Executor Profile Configuration +# ================================================ +# This profile configures Snakemake to submit jobs to SLURM clusters +# Usage: snakemake --profile cluster/slurm + +# Executor settings +executor: slurm +jobs: 50 # Maximum concurrent jobs +latency-wait: 60 # Wait time for output files (seconds) + +# Default resources for all rules +default-resources: + slurm_partition: "amilan" # Default CPU partition (Alpine) + slurm_account: "amc-general" # Alpine allocation account + slurm_qos: "normal" # Default QoS + runtime: 30 # Default walltime in minutes + mem_mb: 4000 # Default memory in MB (4GB) + cpus_per_task: 1 # Default CPUs per task + +# Rule-specific resource overrides +set-resources: + process_sample: + runtime: 10 + mem_mb: 2000 + cpus_per_task: 1 + + align_sample: + runtime: 15 + mem_mb: 4000 + cpus_per_task: 2 + + call_variants: + runtime: 20 + mem_mb: 4000 + cpus_per_task: 2 + + merge_sample_variants: + runtime: 10 + mem_mb: 2000 + cpus_per_task: 1 + + group_analysis: + runtime: 15 + mem_mb: 4000 + cpus_per_task: 2 + + final_merge: + runtime: 10 + mem_mb: 2000 + cpus_per_task: 1 + + generate_report: + runtime: 5 + mem_mb: 1000 + cpus_per_task: 1 + +# Behavior settings +rerun-incomplete: true +keep-going: true + +# Output settings +printshellcmds: true +show-failed-logs: true diff --git a/tests/pipelines/cluster/config/config.yaml b/tests/pipelines/cluster/config/config.yaml new file mode 100644 index 0000000..760f9f5 --- /dev/null +++ b/tests/pipelines/cluster/config/config.yaml @@ -0,0 +1,27 @@ +# Pipeline configuration for cluster test pipeline + +project: "charmer-test" +samples_file: "config/samples.yaml" +output_dir: "results" + +# Chromosomes for variant calling (creates fan-out) +chromosomes: + - chr1 + - chr2 + - chr3 + - chr4 + +# Groups for analysis +groups: + - control + - treatment + +# Job timing (seconds) - short for testing +delays: + process: 30 + align: 60 + variants: 45 + merge: 20 + group_analysis: 30 + final_merge: 15 + report: 10 diff --git a/tests/pipelines/cluster/config/samples.yaml b/tests/pipelines/cluster/config/samples.yaml new file mode 100644 index 0000000..8f866d3 --- /dev/null +++ b/tests/pipelines/cluster/config/samples.yaml @@ -0,0 +1,62 @@ +# Sample configuration for cluster test pipeline +# 12 samples: 8 successful + 4 designed to fail at different stages + +samples: + # Control group - 4 samples + ctrl-rep1: + group: control + replicate: 1 + + ctrl-rep2: + group: control + replicate: 2 + + ctrl-rep3: + group: control + replicate: 3 + + ctrl-rep4: + group: control + replicate: 4 + + # Treatment group - 4 samples + treat-rep1: + group: treatment + replicate: 1 + + treat-rep2: + group: treatment + replicate: 2 + + treat-rep3: + group: treatment + replicate: 3 + + treat-rep4: + group: treatment + replicate: 4 + + # Samples designed to FAIL with different errors + fail-oom: + group: failure + replicate: 1 + fail_at: align # Fails during alignment + exit_code: 137 # OOM kill signal + + fail-timeout: + group: failure + replicate: 2 + fail_at: call_variants # Fails during variant calling + exit_code: 124 # Timeout signal + + fail-error: + group: failure + replicate: 3 + fail_at: process # Fails during initial processing + exit_code: 1 # Generic error + + fail-segfault: + group: failure + replicate: 4 + fail_at: merge # Fails during merge + exit_code: 139 # Segfault signal