Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,10 @@ class ProcessConfig implements Map<String,Object>, Cloneable {

/// input parameters

InParam _in_val( obj ) {
new ValueInParam(this).bind(obj)
InParam _in_val( Map opts=null, Object obj ) {
new ValueInParam(this)
.setOptions(opts)
.bind(obj)
}

InParam _in_file( obj ) {
Expand Down
48 changes: 34 additions & 14 deletions modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -158,26 +158,46 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef {

String getType() { 'process' }

private String missMatchErrMessage(String name, int expected, int actual) {
final ch = expected > 1 ? "channels" : "channel"
return "Process `$name` declares ${expected} input ${ch} but ${actual} were specified"
}

@Override
Object run(Object[] args) {
// initialise process config
initialize()

// get params
final params = ChannelOut.spread(args)
// sanity check
if( params.size() != declaredInputs.size() )
throw new ScriptRuntimeException(missMatchErrMessage(processName, declaredInputs.size(), params.size()))
// separate named args and positional args
def namedArgs = [:]
def indexArgs = ChannelOut.spread(args)
if( !indexArgs.isEmpty() && indexArgs[0] instanceof Map )
namedArgs = indexArgs.remove(0) as Map

log.debug("named args: ${namedArgs}, positional args: ${indexArgs}")

// set named args
def names = namedArgs.keySet().collect()
def remainingInputs = []

for( def input : declaredInputs ) {
final inParam = (BaseInParam)input
final name = inParam.channelTakeName ?: inParam.name

if( name && namedArgs.containsKey(name) ) {
inParam.setFrom(namedArgs[name])
inParam.init()
names.remove(name)
}
else
remainingInputs << inParam
}

if( !names.isEmpty() )
throw new ScriptRuntimeException("Process `$name` was invoked with invalid named arguments: ${names.join(', ')}")

// set positional args
if( indexArgs.size() != remainingInputs.size() )
throw new ScriptRuntimeException("Process `$name` was invoked with ${indexArgs.size()} positional argument(s) but ${remainingInputs.size()} were expected")

// set input channels
for( int i=0; i<params.size(); i++ ) {
final inParam = (declaredInputs[i] as BaseInParam)
inParam.setFrom(params[i])
for( int i = 0; i < indexArgs.size(); i++ ) {
final inParam = (BaseInParam)remainingInputs[i]
inParam.setFrom(indexArgs[i])
inParam.init()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,41 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec


protected void collectInputs(Binding context, Object[] args) {
final params = ChannelOut.spread(args)
if( params.size() != declaredInputs.size() ) {
// separate named args and positional args
def namedArgs = [:]
def indexArgs = ChannelOut.spread(args)
if( !indexArgs.isEmpty() && indexArgs[0] instanceof Map )
namedArgs = indexArgs.remove(0) as Map

log.debug("named args: ${namedArgs}, positional args: ${indexArgs}")

// set named args
def names = namedArgs.keySet().collect()
def remainingInputs = new ArrayList<String>()

for( String name : declaredInputs ) {
if( name && namedArgs.containsKey(name) ) {
context.setProperty( name, namedArgs[name] )
names.remove(name)
}
else
remainingInputs << name
}

if( !names.isEmpty() ) {
final prefix = name ? "Workflow `$name`" : "Main workflow"
throw new IllegalArgumentException("$prefix was invoked with invalid named arguments: ${names.join(', ')}")
}

// set positional args
if( indexArgs.size() != remainingInputs.size() ) {
final prefix = name ? "Workflow `$name`" : "Main workflow"
throw new IllegalArgumentException("$prefix declares ${declaredInputs.size()} input channels but ${params.size()} were given")
throw new IllegalArgumentException("$prefix was invoked with ${indexArgs.size()} positional argument(s) but ${remainingInputs.size()} were expected")
}

// attach declared inputs with the invocation arguments
for( int i=0; i< declaredInputs.size(); i++ ) {
final name = declaredInputs[i]
context.setProperty( name, params[i] )
for( int i = 0; i < indexArgs.size(); i++ ) {
final name = remainingInputs[i]
context.setProperty( name, indexArgs[i] )
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import nextflow.exception.ScriptRuntimeException
import nextflow.extension.CH
import nextflow.script.ProcessConfig
import nextflow.script.TokenVar
import nextflow.util.ConfigHelper
/**
* Model a process generic input parameter
*
Expand All @@ -47,6 +48,8 @@ abstract class BaseInParam extends BaseParam implements InParam {
*/
private inChannel

String channelTakeName

/**
* @return The input channel instance used by this parameter to receive the process inputs
*/
Expand Down Expand Up @@ -236,4 +239,19 @@ abstract class BaseInParam extends BaseParam implements InParam {
return value
}

BaseInParam setTake( value ) {
if( isNestedParam() )
throw new IllegalArgumentException("Input `take` option is not allowed in tuple components")
if( !value )
throw new IllegalArgumentException("Missing input `take` name")
if( !ConfigHelper.isValidIdentifier(value) ) {
final msg = "Input take '$value' is not a valid name -- Make sure it starts with an alphabetic or underscore character and it does not contain any blank, dot or other special characters"
if( NF.strictMode )
throw new IllegalArgumentException(msg)
log.warn(msg)
}
this.channelTakeName = value
return this
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ interface InParam extends Cloneable {

def decodeInputs( List values )

String getChannelTakeName()

}
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class ScriptDslTest extends Dsl2Spec {

then:
def err = thrown(ScriptRuntimeException)
err.message == 'Process `bar` declares 1 input channel but 0 were specified'
err.message == 'Process `bar` was invoked with 0 positional argument(s) but 1 were expected'
}

def 'should report error accessing undefined out/a' () {
Expand Down Expand Up @@ -451,7 +451,7 @@ class ScriptDslTest extends Dsl2Spec {

then:
def err = thrown(ScriptRuntimeException)
err.message == "Process `bar` declares 1 input channel but 0 were specified"
err.message == "Process `bar` was invoked with 0 positional argument(s) but 1 were expected"
}

def 'should report error accessing undefined out/e' () {
Expand Down
32 changes: 32 additions & 0 deletions tests/process-named-inputs.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env nextflow

process foo {
input:
val bar
val baz
output:
stdout

script:
"""
echo $bar
echo $baz
"""
}


workflow foo_wrapper {
take:
bar
baz
main:
foo(bar: bar, baz: baz)
emit:
foo.out
}


workflow {
foo_wrapper(bar: 'bar', baz: 'baz')
| view
}