Skip to content

Commit 2e84d7e

Browse files
feat: Add cluster test pipeline for SLURM and LSF (#2)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 4ad3460 commit 2e84d7e

File tree

6 files changed

+486
-0
lines changed

6 files changed

+486
-0
lines changed

tests/pipelines/cluster/.gitignore

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Snakemake outputs
2+
results/
3+
logs/
4+
.snakemake/
5+
6+
# Cluster job outputs
7+
slurm-*.out
8+
*.o*
9+
*.e*

tests/pipelines/cluster/Snakefile

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
# Cluster test pipeline for charmer development
2+
# Designed to create many pending jobs and demonstrate failure handling
3+
#
4+
# DAG Structure (~76 jobs):
5+
# process_sample (12) -> align_sample (12) -> call_variants (48)
6+
# |
7+
# merge_sample_variants (12)
8+
# |
9+
# group_analysis (2)
10+
# |
11+
# final_merge (1)
12+
# |
13+
# generate_report (1)
14+
15+
configfile: "config/config.yaml"
16+
17+
# Load samples configuration
18+
import yaml
19+
with open(config["samples_file"]) as f:
20+
samples_config = yaml.safe_load(f)
21+
22+
SAMPLES = list(samples_config["samples"].keys())
23+
CHROMOSOMES = config["chromosomes"]
24+
GROUPS = config["groups"]
25+
DELAYS = config["delays"]
26+
27+
# Helper to get sample metadata
28+
def get_sample_meta(sample, key, default=None):
29+
return samples_config["samples"].get(sample, {}).get(key, default)
30+
31+
# Get samples that should succeed (no fail_at defined)
32+
SUCCESSFUL_SAMPLES = [s for s in SAMPLES if get_sample_meta(s, "fail_at") is None]
33+
34+
# Get samples by group (excluding failure group)
35+
def get_samples_by_group(group):
36+
return [s for s in SAMPLES
37+
if get_sample_meta(s, "group") == group
38+
and get_sample_meta(s, "fail_at") is None]
39+
40+
rule all:
41+
input:
42+
"results/final_report.txt"
43+
44+
rule process_sample:
45+
"""Initial sample processing - first step for all samples."""
46+
output:
47+
"results/processed/{sample}.txt"
48+
params:
49+
delay=DELAYS["process"],
50+
fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"),
51+
exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1)
52+
threads: 1
53+
resources:
54+
mem_mb=2000,
55+
runtime=10
56+
log:
57+
"logs/process_sample/{sample}.log"
58+
shell:
59+
"""
60+
exec > {log} 2>&1
61+
echo "Starting processing for {wildcards.sample}..."
62+
echo "INFO: Initializing sample processing pipeline"
63+
echo "INFO: Checking input data integrity"
64+
65+
# Check if this sample should fail at this stage
66+
if [ "{params.fail_at}" = "process" ]; then
67+
echo "ERROR: Data validation failed for {wildcards.sample}"
68+
echo "ERROR: Input file appears corrupted or malformed"
69+
echo "ERROR: Cannot proceed with processing"
70+
sleep 5
71+
exit {params.exit_code}
72+
fi
73+
74+
sleep {params.delay}
75+
echo "INFO: Sample {wildcards.sample} processed successfully"
76+
echo "Processed: {wildcards.sample}" > {output}
77+
"""
78+
79+
rule align_sample:
80+
"""Alignment step - depends on processing."""
81+
input:
82+
"results/processed/{sample}.txt"
83+
output:
84+
"results/aligned/{sample}.bam"
85+
params:
86+
delay=DELAYS["align"],
87+
fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"),
88+
exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1)
89+
threads: 2
90+
resources:
91+
mem_mb=4000,
92+
runtime=15
93+
log:
94+
"logs/align_sample/{sample}.log"
95+
shell:
96+
"""
97+
exec > {log} 2>&1
98+
echo "Starting alignment for {wildcards.sample}..."
99+
echo "INFO: Loading reference genome"
100+
sleep 5
101+
echo "INFO: Indexing reference"
102+
sleep 3
103+
echo "INFO: Running aligner on {wildcards.sample}"
104+
105+
# Check if this sample should fail at this stage
106+
if [ "{params.fail_at}" = "align" ]; then
107+
echo "ERROR: Out of memory while aligning {wildcards.sample}"
108+
echo "ERROR: Process killed by OOM killer"
109+
echo "ERROR: Peak memory usage: 32.5 GB (limit: 16 GB)"
110+
echo "ERROR: Consider increasing memory allocation"
111+
sleep 2
112+
exit {params.exit_code}
113+
fi
114+
115+
sleep {params.delay}
116+
echo "INFO: Alignment complete for {wildcards.sample}"
117+
echo "INFO: Mapped reads: 45,234,567"
118+
echo "INFO: Unmapped reads: 1,234,567"
119+
echo "Aligned: {wildcards.sample}" > {output}
120+
"""
121+
122+
rule call_variants:
123+
"""Variant calling per chromosome - creates fan-out pattern."""
124+
input:
125+
bam="results/aligned/{sample}.bam"
126+
output:
127+
vcf="results/variants/{sample}_{chrom}.vcf"
128+
params:
129+
delay=DELAYS["variants"],
130+
fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"),
131+
exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1)
132+
threads: 2
133+
resources:
134+
mem_mb=4000,
135+
runtime=20
136+
log:
137+
"logs/call_variants/{sample}_{chrom}.log"
138+
shell:
139+
"""
140+
exec > {log} 2>&1
141+
echo "Starting variant calling for {wildcards.sample} on {wildcards.chrom}..."
142+
echo "INFO: Loading BAM file"
143+
sleep 3
144+
echo "INFO: Scanning {wildcards.chrom} for variants"
145+
146+
# Check if this sample should fail at this stage
147+
if [ "{params.fail_at}" = "call_variants" ]; then
148+
echo "ERROR: Variant calling timed out for {wildcards.sample} on {wildcards.chrom}"
149+
echo "ERROR: Wall clock limit exceeded"
150+
echo "ERROR: Job ran for 3600 seconds (limit: 1800 seconds)"
151+
sleep 2
152+
exit {params.exit_code}
153+
fi
154+
155+
sleep {params.delay}
156+
echo "INFO: Found 12,345 variants on {wildcards.chrom}"
157+
echo "INFO: SNPs: 10,234 Indels: 2,111"
158+
echo "Variants: {wildcards.sample} {wildcards.chrom}" > {output.vcf}
159+
"""
160+
161+
rule merge_sample_variants:
162+
"""Merge variants per sample across chromosomes - fan-in pattern."""
163+
input:
164+
expand("results/variants/{{sample}}_{chrom}.vcf", chrom=CHROMOSOMES)
165+
output:
166+
"results/merged/{sample}_merged.vcf"
167+
params:
168+
delay=DELAYS["merge"],
169+
fail_at=lambda wc: get_sample_meta(wc.sample, "fail_at"),
170+
exit_code=lambda wc: get_sample_meta(wc.sample, "exit_code", 1)
171+
threads: 1
172+
resources:
173+
mem_mb=2000,
174+
runtime=10
175+
log:
176+
"logs/merge_sample/{sample}.log"
177+
shell:
178+
"""
179+
exec > {log} 2>&1
180+
echo "Starting variant merge for {wildcards.sample}..."
181+
echo "INFO: Merging VCF files from {CHROMOSOMES}"
182+
183+
# Check if this sample should fail at this stage
184+
if [ "{params.fail_at}" = "merge" ]; then
185+
echo "ERROR: Segmentation fault while merging {wildcards.sample}"
186+
echo "ERROR: Memory corruption detected at 0x7fff5fbff8c0"
187+
echo "ERROR: Stack trace:"
188+
echo "ERROR: #0 merge_vcf_records() at vcfmerge.c:234"
189+
echo "ERROR: #1 main() at vcfmerge.c:567"
190+
sleep 2
191+
exit {params.exit_code}
192+
fi
193+
194+
sleep {params.delay}
195+
cat {input} > {output}
196+
echo "INFO: Merge complete for {wildcards.sample}"
197+
echo "INFO: Total variants: 49,380"
198+
"""
199+
200+
rule group_analysis:
201+
"""Per-group analysis - aggregates samples by group."""
202+
input:
203+
lambda wc: expand("results/merged/{sample}_merged.vcf",
204+
sample=get_samples_by_group(wc.group))
205+
output:
206+
"results/groups/{group}_analysis.txt"
207+
params:
208+
delay=DELAYS["group_analysis"]
209+
threads: 2
210+
resources:
211+
mem_mb=4000,
212+
runtime=15
213+
log:
214+
"logs/group_analysis/{group}.log"
215+
shell:
216+
"""
217+
exec > {log} 2>&1
218+
echo "Starting group analysis for {wildcards.group}..."
219+
echo "INFO: Aggregating samples in group"
220+
echo "INFO: Input samples: {input}"
221+
sleep {params.delay}
222+
echo "Group {wildcards.group} analysis complete" > {output}
223+
echo "INFO: Analysis complete for {wildcards.group}"
224+
"""
225+
226+
rule final_merge:
227+
"""Final merge of all group analyses."""
228+
input:
229+
expand("results/groups/{group}_analysis.txt", group=GROUPS)
230+
output:
231+
"results/final_analysis.txt"
232+
params:
233+
delay=DELAYS["final_merge"]
234+
threads: 1
235+
resources:
236+
mem_mb=2000,
237+
runtime=10
238+
log:
239+
"logs/final_merge.log"
240+
shell:
241+
"""
242+
exec > {log} 2>&1
243+
echo "Starting final merge of all groups..."
244+
echo "INFO: Combining group analyses"
245+
sleep {params.delay}
246+
cat {input} > {output}
247+
echo "INFO: Final merge complete"
248+
"""
249+
250+
rule generate_report:
251+
"""Generate final pipeline report."""
252+
input:
253+
analysis="results/final_analysis.txt",
254+
merged=expand("results/merged/{sample}_merged.vcf", sample=SUCCESSFUL_SAMPLES)
255+
output:
256+
"results/final_report.txt"
257+
params:
258+
delay=DELAYS["report"]
259+
threads: 1
260+
resources:
261+
mem_mb=1000,
262+
runtime=5
263+
log:
264+
"logs/generate_report.log"
265+
shell:
266+
"""
267+
exec > {log} 2>&1
268+
echo "Generating final report..."
269+
echo "INFO: Collecting pipeline statistics"
270+
sleep {params.delay}
271+
echo "=== Pipeline Complete ===" > {output}
272+
echo "Successful samples: {SUCCESSFUL_SAMPLES}" >> {output}
273+
echo "Groups analyzed: {GROUPS}" >> {output}
274+
echo "Chromosomes: {CHROMOSOMES}" >> {output}
275+
date >> {output}
276+
echo "INFO: Report generated successfully"
277+
"""
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Snakemake LSF Executor Profile Configuration
2+
# =============================================
3+
# This profile configures Snakemake to submit jobs to LSF clusters
4+
# Usage: snakemake --profile cluster/lsf
5+
6+
# Executor settings
7+
executor: lsf
8+
jobs: 50 # Maximum concurrent jobs
9+
latency-wait: 15 # Wait time for output files (seconds)
10+
11+
# Default resources for all rules
12+
default-resources:
13+
- "runtime=30"
14+
- "mem_mb=4000"
15+
- "lsf_queue=rna"
16+
- "lsf_project=charmer-test"
17+
- 'lsf_extra=""'
18+
19+
# Rule-specific resource overrides
20+
set-resources:
21+
- process_sample:runtime=10
22+
- process_sample:mem_mb=2000
23+
24+
- align_sample:runtime=15
25+
- align_sample:mem_mb=4000
26+
27+
- call_variants:runtime=20
28+
- call_variants:mem_mb=4000
29+
30+
- merge_sample_variants:runtime=10
31+
- merge_sample_variants:mem_mb=2000
32+
33+
- group_analysis:runtime=15
34+
- group_analysis:mem_mb=4000
35+
36+
- final_merge:runtime=10
37+
- final_merge:mem_mb=2000
38+
39+
- generate_report:runtime=5
40+
- generate_report:mem_mb=1000
41+
42+
# Behavior settings
43+
rerun-incomplete: true
44+
keep-going: true
45+
46+
# Output settings
47+
printshellcmds: true
48+
show-failed-logs: true

0 commit comments

Comments
 (0)