Queue channel join - conditional process execution #4006
-
Hi! I'm a beginner Nextflow user and I'm having a bit of trouble with structuring some processes in my pipeline. I have 2 processes that should run one after the other on the same samples, but I want the second process to only run on samples that have passed the filters of the first process. First I'm creating the first channel with all my samples. After running the first process, I'm outputting a bed file into 2 channels, one which I use later down the line and one with which I'm filtering my samples (because the bed is generated only if all conditions are met). sv_list = Channel.fromPath(params.input_file, checkIfExists: true).splitCsv(header: true, sep: '\t', strip: true)
.map{ row -> [ row.sample, file(row.sv) ] }.view()
process parse_svs {
tag { sample }
input:
tuple val(sample), file(sv) from sv_list
path mappability
path chr from chr_sizes
output:
tuple val(sample), file("${sample}.sv_snv.ann.bed"), optional: true into(svs_exist, filter_by_sv_snv)
tuple val(sample), file("${sample}.sv.ann.txt"), optional: true into annotate_with_sv_info
shell:
'''
if [[ $(zgrep -v "^#" !{sv} | wc -l) -gt 0 ]]
then
svname=$(bcftools query -l !{sv} | sed -n 2p)
Rscript !{baseDir}/simple-event-annotation.R !{sv} !{sample}
bcftools sort -Oz !{sample}.sv.ann.vcf > !{sample}.sv.ann.vcf.gz
tabix -p vcf !{sample}.sv.ann.vcf.gz
bcftools view -s $svname -f 'PASS' --regions-file !{mappability} !{sample}.sv.ann.vcf.gz | bcftools sort -Oz > !{sample}.sv.ann.filt.vcf.gz
if [[ $(zgrep -v "^#" !{sample}.sv.ann.filt.vcf.gz | wc -l) -gt 0 ]]
then
bcftools query -f '%CHROM\t%POS\t%POS\n' !{sample}.sv.ann.filt.vcf.gz > sv.bed
bcftools query -f '%CHROM\t%POS\t%SVLEN\t%SIMPLE_TYPE\n' !{sample}.sv.ann.filt.vcf.gz > !{sample}.sv.ann.txt
bedtools slop -i sv.bed -g !{chr} -b !{params.closer_value} | sort -k1,1 -k2,2n | bedtools merge > closer.bed
bedtools slop -i sv.bed -g !{chr} -b !{params.close_value} > cluster.bed
bedtools complement -i cluster.bed -g !{chr} | sort -k1,1 -k2,2n | bedtools merge > unclustered.bed
bedtools subtract -a cluster.bed -b closer.bed | sort -k1,1 -k2,2n | bedtools merge > close.bed
awk -v OFS='\t' '{print $1,$2,$3,"CLOSER"}' closer.bed > closer.ann.bed
awk -v OFS='\t' '{print $1,$2,$3,"CLOSE"}' close.bed > close.ann.bed
awk -v OFS='\t' '{print $1,$2,$3,"UNCLUSTERED"}' unclustered.bed > unclustered.ann.bed
cat *ann.bed | sort -k 1,1 -k2,2n > !{sample}.sv_snv.ann.bed
fi
fi
'''
} Then I create my second channel from the same file and I join it with the beds from snv_list = Channel.fromPath(params.input_file, checkIfExists: true).splitCsv(header: true, sep: '\t', strip: true)
.map{ row -> [ row.sample, file(row.snv) ] }.join(svs_exist).view()
process parse_snvs {
tag { sample }
input:
tuple val(sample), file(snv) from snv_list
path mappability
output:
tuple val(sample), file("${sample}.snv.filt.vcf.gz") into snvs_to_randomise
shell:
'''
snvname=$(bcftools query -l !{snv} | sed -n 2p)
tabix -p vcf !{snv}
bcftools view -s $snvname -f 'PASS' --types snps --regions-file !{mappability} !{snv} | bcftools sort -Oz > !{sample}.snv.filt.vcf.gz
tabix -p vcf !{sample}.snv.filt.vcf.gz
'''
} My first question is: I sometimes get a warning that the The second question which is a bit broader: is there a better way of building this? I tried a few different options, for example outputting a shell variable for pass/fail in the first process, but then I didn't quite know how to use it to form the second queue channel and I'm not entirely sure that this would be cleaner than the current solution. It just seems a bit clunky outputting a second set of bed files that will not actually be used in the second process. Thanks in advance, |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Hi @maia-munteanu , I converted your issue into a discussion so that it remains open for other users who have similar questions. When you join the two channels, you will get items that look like ch_snv
.join( ch_bed )
.map { sample, snv_file, bed_file -> [ sample, snv_file ] } When mapping over a tuple, you should be able to break out the tuple elements like that, or you could just do To your second question, this is the correct way to do it. Channels are always the way to define dependencies between processes. It's a bit messy because you have to create two identical channels, but that's mostly an artifact of using DSL1. If you migrate to DSL2, you can reuse the same channel and all of your channel logic goes in a workflow block, so it looks much cleaner. Here's a sketch: workflow {
sv_list = // ...
snv_list = // ...
ch_bed, ch_ann = parse_svs( sv_list, mappability, chr_sizes )
snv_list_filtered = snv_list.join( sv_list ) // ...
ch_snv = parse_snvs( snv_list_filtered, mappability )
} |
Beta Was this translation helpful? Give feedback.
Hi @maia-munteanu , I converted your issue into a discussion so that it remains open for other users who have similar questions.
When you join the two channels, you will get items that look like
[ sample, snv_file, bed_file ]
. Since you don't have strict mode enabled, Nextflow only warns you about the tuple mismatch rather than throwing an error. You should be able to fix it by using themap
operator as you said:When mapping over a tuple, you should be able to break out the tuple elements like that, or you could just do
it -> [ it[0], it[1] ]
.To your second question, this is the correct way to do it. …