Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions subworkflows/local/concatall.nf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ workflow CONCAT_ALL {
}

ch_input_reads_merged = CAT_FASTQ ( ch_input_singlefq.cat ).reads
.map { meta, reads -> [ meta, [reads].flatten() ] }
.mix( ch_input_singlefq.skip )
emit:
ch_input_reads_merged
Expand Down
31 changes: 28 additions & 3 deletions subworkflows/local/profiling.nf
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,20 @@ def prepareInputs(pairedreads, databases, singleFqTool=False){
unknown: true
}
} else {
// PE-aware tool path: reads preserves original meta.single_end.
// For SE samples: meta.single_end == true, reads == [R1]
// For PE samples: meta.single_end == false, reads == [R1, R2]
// Every tool in this branch MUST use meta.single_end to switch
// between single-file and paired-file CLI arguments.
return reads_with_dbs
.map { meta, reads, db_meta, db ->
def flat_reads = [reads].flatten()
def expected = meta.single_end ? 1 : 2
if ( flat_reads.size() != expected ) {
error("PE-aware tool '${db_meta.tool}': expected ${expected} read file(s) for sample ${meta.id} (single_end=${meta.single_end}), got ${flat_reads.size()}")
}
[ meta, flat_reads, db_meta, db ]
}
.branch { meta, reads, db_meta, db ->
rgi: db_meta.tool == 'rgi'
unknown: true
Expand Down Expand Up @@ -150,7 +163,11 @@ workflow PROFILING {
meta, reads, db_meta, db ->
def new_meta = meta + db_meta
new_meta.db_params = db[0]["db_params"]
reads: [ new_meta, [reads].flatten() ]
def flat_reads = [reads].flatten()
if ( flat_reads.size() != 1 ) {
error("fmhfunprofiler requires exactly one (concatenated) input FASTQ, got ${flat_reads.size()} files for sample ${meta.id}")
}
reads: [ new_meta, flat_reads ]
db: db[0].db_path
}
FMHFUNPROFILER ( ch_input_for_fmhfunprofiler.reads, ch_input_for_fmhfunprofiler.db )
Expand All @@ -169,7 +186,11 @@ workflow PROFILING {
def new_meta = meta + db_meta
//TODO add the params in
// new_meta.db_params = Channel.fromList(db).map{ t -> t.db_params}.collect().flatten() // [0]["db_params"]
reads: [ new_meta, [reads].flatten() ]
def flat_reads = [reads].flatten()
if ( flat_reads.size() != 1 ) {
error("humann_v3 requires exactly one (concatenated) input FASTQ, got ${flat_reads.size()} files for sample ${meta.id}")
}
reads: [ new_meta, flat_reads ]
mpa_db: db.findAll { it.db_entity == "humann_metaphlan" }.first().db_path
nuc_db: db.findAll { it.db_entity == "humann_nucleotide" }.first().db_path
prot_db: db.findAll { it.db_entity == "humann_protein" }.first().db_path
Expand Down Expand Up @@ -198,7 +219,11 @@ workflow PROFILING {
def new_meta = meta + db_meta
//TODO add the params in
// new_meta.db_params = Channel.fromList(db).map{ t -> t.db_params}.collect().flatten() // [0]["db_params"]
reads: [ new_meta, [reads].flatten() ]
def flat_reads = [reads].flatten()
if ( flat_reads.size() != 1 ) {
error("humann_v4 requires exactly one (concatenated) input FASTQ, got ${flat_reads.size()} files for sample ${meta.id}")
}
reads: [ new_meta, flat_reads ]
mpa_db: db.findAll { it.db_entity == "humann_metaphlan" }.first().db_path
nuc_db: db.findAll { it.db_entity == "humann_nucleotide" }.first().db_path
prot_db: db.findAll { it.db_entity == "humann_protein" }.first().db_path
Expand Down
139 changes: 139 additions & 0 deletions subworkflows/local/tests/concatall.nf.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
nextflow_workflow {

name "Test Subworkflow CONCAT_ALL"
script "../concatall.nf"
workflow "CONCAT_ALL"
tag "subworkflows"
tag "concatall"

// SE single-run: only 1 file → goes through the skip branch (no cat needed).
// Output must still be a list wrapping that single file, not a bare path.
test("single-end single-run produces [meta, [file]] (skip branch)") {
when {
workflow {
"""
input[0] = Channel.of([
[ id:'test_se', single_end:true, run_accession:'run1' ],
[ file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_1.fastq.gz', checkIfExists: true) ]
])
"""
}
}
then {
assert workflow.success
def out = workflow.out.ch_input_reads_merged
assert out.size() == 1
// reads element must be a List (not a bare Path)
assert out[0][1] instanceof List
// exactly one file
assert out[0][1].size() == 1
// single_end forced to true by CONCAT_ALL
assert out[0][0].single_end == true
}
}

// PE single-run: R1 + R2 → cat branch concatenates both into a single merged file.
// Output must be a list wrapping the single concatenated file.
test("paired-end single-run produces [meta, [merged]] (cat branch)") {
when {
workflow {
"""
input[0] = Channel.of([
[ id:'test_pe', single_end:false, run_accession:'run1' ],
[ file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_1.fastq.gz', checkIfExists: true),
file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_2.fastq.gz', checkIfExists: true) ]
])
"""
}
}
then {
assert workflow.success
def out = workflow.out.ch_input_reads_merged
assert out.size() == 1
assert out[0][1] instanceof List
// R1 and R2 must have been concatenated into exactly one file
assert out[0][1].size() == 1
assert out[0][1][0].endsWith('.merged.fastq.gz')
}
}

// Multi-run SE: two runs of the same sample → cat branch merges both run files.
// run_accession is dropped when grouping, so both entries collapse into one sample.
test("single-end multi-run produces one merged file per sample (cat branch)") {
when {
workflow {
"""
input[0] = Channel.of(
[ [ id:'test_se', single_end:true, run_accession:'run1' ],
[ file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_1.fastq.gz', checkIfExists: true) ] ],
[ [ id:'test_se', single_end:true, run_accession:'run2' ],
[ file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_2.fastq.gz', checkIfExists: true) ] ]
)
"""
}
}
then {
assert workflow.success
def out = workflow.out.ch_input_reads_merged
// Two runs collapse to one sample
assert out.size() == 1
assert out[0][1] instanceof List
assert out[0][1].size() == 1
assert out[0][1][0].endsWith('.merged.fastq.gz')
}
}

// Multi-run PE: two paired runs of the same sample → all four files (R1a, R2a, R1b, R2b)
// concatenated into a single merged file for tools that only take one input.
test("paired-end multi-run produces one merged file per sample (cat branch)") {
when {
workflow {
"""
input[0] = Channel.of(
[ [ id:'test_pe', single_end:false, run_accession:'run1' ],
[ file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_1.fastq.gz', checkIfExists: true),
file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_2.fastq.gz', checkIfExists: true) ] ],
[ [ id:'test_pe', single_end:false, run_accession:'run2' ],
[ file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test2_1.fastq.gz', checkIfExists: true),
file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test2_2.fastq.gz', checkIfExists: true) ] ]
)
"""
}
}
then {
assert workflow.success
def out = workflow.out.ch_input_reads_merged
assert out.size() == 1
assert out[0][1] instanceof List
assert out[0][1].size() == 1
assert out[0][1][0].endsWith('.merged.fastq.gz')
}
}

// Multiple samples must remain independent — each produces its own output element.
test("multiple samples each produce independent output elements") {
when {
workflow {
"""
input[0] = Channel.of(
[ [ id:'sample_a', single_end:true, run_accession:'run1' ],
[ file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_1.fastq.gz', checkIfExists: true) ] ],
[ [ id:'sample_b', single_end:false, run_accession:'run1' ],
[ file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test2_1.fastq.gz', checkIfExists: true),
file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test2_2.fastq.gz', checkIfExists: true) ] ]
)
"""
}
}
then {
assert workflow.success
def out = workflow.out.ch_input_reads_merged
assert out.size() == 2
// Both outputs must be lists with exactly one file
out.each { meta, reads ->
assert reads instanceof List
assert reads.size() == 1
}
}
}
}
Loading