Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/snippets/pipe-with-closure.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
process foo {
input:
val message
val suffix

output:
val result, emit: suffixed

exec:
result = "${message}${suffix}"
}

workflow {
suffix = ' world!'
channel.from('Hello','Hola','Ciao')
| map { it.toUpperCase() }
| { _ -> foo(_, suffix) }
| view
}
3 changes: 3 additions & 0 deletions docs/snippets/pipe-with-closure.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
HELLO world!
HOLA world!
CIAO world!
14 changes: 14 additions & 0 deletions docs/snippets/pipe.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
process foo {
input:
val message

output:
val result

exec:
result = "$message world"
}

workflow {
channel.from('Hello','Hola','Ciao') | foo | map { it.toUpperCase() } | view
}
3 changes: 3 additions & 0 deletions docs/snippets/pipe.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
HELLO WORLD
HOLA WORLD
CIAO WORLD
18 changes: 18 additions & 0 deletions docs/snippets/process-named-stdout.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
process sayHello {
input:
val cheers

output:
stdout emit: verbiage

script:
"""
echo -n $cheers
"""
}

workflow {
things = channel.of('Hello world!', 'Yo, dude!', 'Duck!')
sayHello(things)
sayHello.out.verbiage.view()
}
3 changes: 3 additions & 0 deletions docs/snippets/process-named-stdout.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Hello world!
Yo, dude!
Duck!
49 changes: 14 additions & 35 deletions docs/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,8 @@ See {ref}`process-multiple-outputs` for more details.

The `emit` option can also be used to name a `stdout` output:

```groovy
process sayHello {
input:
val cheers

output:
stdout emit: verbiage

script:
"""
echo -n $cheers
"""
}

workflow {
things = channel.of('Hello world!', 'Yo, dude!', 'Duck!')
sayHello(things)
sayHello.out.verbiage.view()
}
```{literalinclude} snippets/process-named-stdout.nf
:language: groovy
```

:::{note}
Expand Down Expand Up @@ -334,26 +317,12 @@ The fully qualified process name can be used as a {ref}`process selector <config

The `|` *pipe* operator can be used to compose Nextflow processes and operators. For example:

```groovy
process foo {
input:
val data

output:
val result

exec:
result = "$data world"
}

workflow {
channel.from('Hello','Hola','Ciao') | foo | map { it.toUpperCase() } | view
}
```{literalinclude} snippets/pipe.nf
:language: groovy
```

The above snippet defines a process named `foo` and invokes it with the `data` channel. The result is then piped to the {ref}`operator-map` operator, which converts each string to uppercase, and finally to the {ref}`operator-view` operator which prints it.

:::{tip}
Statements can also be split across multiple lines for better readability:

```groovy
Expand All @@ -364,8 +333,18 @@ workflow {
| view
}
```

:::{versionadded} 23.12.0-edge
:::

When using the pipe operator, the right operand can also be a closure that receives the output of the left operand and returns the result of a process, workflow, or operator invocation. This form is a useful way to define a custom mapping between the left-hand outputs and right-hand inputs, including the use of additional input channels aside from the left-hand outputs. For example:

```{literalinclude} snippets/pipe-with-closure.nf
:language: groovy
```

When the left operand is a process, the closure argument is equivalent to the `.out` of that process, and the output channels can be accessed by index or by name as described in [Process invocation](#process-invocation). For example, the `view` operation in the above example can be rewritten as `{ _ -> _.suffixed.view() }` to access the `suffixed` output of process `foo`.

### And `&`

The `&` *and* operator can be used to feed multiple processes with the same channel(s). For example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,62 +106,87 @@ class ChannelEx {
}

/**
* Implements pipe operation between a channel and a process or a sub-workflow
* Pipe a channel INTO a process or workflow.
*
* @param left A dataflow channel instance
* @param right A {@link ChainableDef} object eg. a nextflow process
* @return The channel resulting the pipe operation
* @param left
* @param right
*/
static Object or(DataflowWriteChannel left, ChainableDef right) {
checkContext('or', right)
return right.invoke_o(left)
}

/**
* Implements pipe operation between a channel WITH a operator
* Pipe a channel INTO an operator.
*
* @param left A {@code DataflowWriteChannel} channel as left operand
* @param right A {@code OpCall} object representing a operator call as right operand
* @return The resulting channel object
* @param left
* @param right
*/
static Object or(DataflowWriteChannel left, OpCall right) {
checkContext('or', right)
return right.setSource(left).call()
}

/**
* Implements pipe operation between a multi-channels WITH a process or a sub-workflow
* Pipe a channel INTO a closure that defines a custom
* invocation of a process, workflow, or operator.
*
* @param left A {@code ChannelOut} multi-channel object as left operand
* @param right A {@code ChainableDef} object representing a process or sub-workflow call as right operand
* @return The resulting channel object
* @param left
* @param right
*/
static Object or(DataflowWriteChannel left, Closure right) {
def out = right.call(left)
if( out instanceof DataflowWriteChannel || out instanceof ChannelOut )
return out
throw new ScriptRuntimeException("Closure component did not return a channel")
}

/**
* Pipe a multi-channel INTO a process or workflow.
*
* @param left
* @param right
*/
static Object or(ChannelOut left, ChainableDef right) {
checkContext('or', right)
return right.invoke_o(left)
}

/**
* Implements pipe operation between a multi-channels WITH a operator
* Pipe a multi-channel INTO an operator.
*
* @param left A {@code ChannelOut} multi-channel object as left operand
* @param right A {@code OpCall} object representing a operator call as right operand
* @return The resulting channel object
* @param left
* @param right
*/
static Object or(ChannelOut left, OpCall right) {
checkContext('or', right)
right.setSource(left).call()
}

/**
* Implements pipe operation between a process or sub-workflow WITH a operator
* Pipe a multi-channel INTO a closure that defines a custom
* invocation of a process, workflow, or operator.
*
* @param left A {@code ChainableDef} object representing a process or sub-workflow call as left operand
* @param right A {@code OpCall} object representing a operator call as right operand
* @return The resulting channel object
* @param left
* @param right
*/
static Object or(ChainableDef left, OpCall right) {
static Object or(ChannelOut left, Closure right) {
def out = right.call(left)
if( out instanceof DataflowWriteChannel || out instanceof ChannelOut )
return out
throw new ScriptRuntimeException("Closure component did not return a channel")
}

/**
* Pipe a process or workflow INTO another process or workflow.
*
* @param left
* @param right
*/
static Object or(ChainableDef left, ChainableDef right) {
checkContext('or', left)
checkContext('or', right)

def out = left.invoke_a(InvokerHelper.EMPTY_ARGS)

if( out instanceof DataflowWriteChannel )
Expand All @@ -174,16 +199,33 @@ class ChannelEx {
}

/**
* Implements pipe operation between a process or sub-workflow WITH another process or sub-workflow
* Pipe a process or workflow INTO an operator.
*
* @param left A {@code ChainableDef} object representing a process or sub-workflow call as left operand
* @param right A {@code ChainableDef} object representing a process or sub-workflow call as right operand
* @return
* @param left
* @param right
*/
static Object or(ChainableDef left, ChainableDef right) {
static Object or(ChainableDef left, OpCall right) {
checkContext('or', left)
checkContext('or', right)
def out = left.invoke_a(InvokerHelper.EMPTY_ARGS)

if( out instanceof DataflowWriteChannel )
return or((DataflowWriteChannel)out, right)

if( out instanceof ChannelOut )
return or((ChannelOut)out, right)

throw new ScriptRuntimeException("Cannot pipe ${fmtType(out)} with ${fmtType(right)}")
}

/**
* Pipe a process or workflow INTO a closure that defines a custom
* invocation of a process, workflow, or operator.
*
* @param left
* @param right
*/
static Object or(ChainableDef left, Closure right) {
checkContext('or', left)
def out = left.invoke_a(InvokerHelper.EMPTY_ARGS)

if( out instanceof DataflowWriteChannel )
Expand Down