Skip to content

Commit 383538a

Browse files
committed
Improve documentation of workflow outputs
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
1 parent 1ee59b3 commit 383538a

File tree

4 files changed

+39
-26
lines changed

4 files changed

+39
-26
lines changed

modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class PublishOp {
5252

5353
private IndexOpts indexOpts
5454

55-
private List indexRecords = []
55+
private List publishedValues = []
5656

5757
private DataflowVariable target
5858

@@ -106,7 +106,7 @@ class PublishOp {
106106
* @param value
107107
*/
108108
protected void onNext(value) {
109-
log.trace "Publish operator received: $value"
109+
log.trace "Received value for workflow output '${name}': ${value}"
110110

111111
// evaluate dynamic path
112112
final targetResolver = getTargetDir(value)
@@ -130,10 +130,11 @@ class PublishOp {
130130
publisher.apply(files, sourceDir)
131131
}
132132

133-
// append record to index
134-
final normalized = normalizePaths(value, targetResolver)
135-
log.trace "Normalized record for index file: ${normalized}"
136-
indexRecords << normalized
133+
// publish value to workflow output
134+
final normalizedValue = normalizeValue(value, targetResolver)
135+
136+
log.trace "Published value to workflow output '${name}': ${normalizedValue}"
137+
publishedValues << normalizedValue
137138
}
138139

139140
/**
@@ -216,37 +217,37 @@ class PublishOp {
216217
*/
217218
protected void onComplete() {
218219
// publish individual record if source is a value channel
219-
final value = CH.isValue(source)
220-
? indexRecords.first()
221-
: indexRecords
220+
final outputValue = CH.isValue(source)
221+
? publishedValues.first()
222+
: publishedValues
222223

223224
// publish workflow output
224225
final indexPath = indexOpts
225226
? session.outputDir.resolve(indexOpts.path)
226227
: null
227-
session.notifyWorkflowOutput(new WorkflowOutputEvent(name, value, indexPath))
228+
session.notifyWorkflowOutput(new WorkflowOutputEvent(name, outputValue, indexPath))
228229

229230
// write value to index file
230231
if( indexOpts ) {
231232
final ext = indexPath.getExtension()
232233
indexPath.parent.mkdirs()
233234
if( ext == 'csv' ) {
234-
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexPath)
235+
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(publishedValues, indexPath)
235236
}
236237
else if( ext == 'json' ) {
237-
indexPath.text = DumpHelper.prettyPrintJson(value)
238+
indexPath.text = DumpHelper.prettyPrintJson(outputValue)
238239
}
239240
else if( ext == 'yaml' || ext == 'yml' ) {
240-
indexPath.text = DumpHelper.prettyPrintYaml(value)
241+
indexPath.text = DumpHelper.prettyPrintYaml(outputValue)
241242
}
242243
else {
243244
throw new ScriptRuntimeException("Invalid extension '${ext}' for index file '${indexOpts.path}' -- should be CSV, JSON, or YAML")
244245
}
245246
session.notifyFilePublish(new FilePublishEvent(null, indexPath, publishOpts.labels as List))
246247
}
247248

248-
log.trace "Publish operator complete"
249-
target.bind(indexPath ?: value)
249+
log.trace "Completed workflow output '${name}'"
250+
target.bind(indexPath ?: outputValue)
250251
}
251252

252253
/**
@@ -283,7 +284,7 @@ class PublishOp {
283284
* @param value
284285
* @param targetResolver
285286
*/
286-
protected Object normalizePaths(value, targetResolver) {
287+
protected Object normalizeValue(value, targetResolver) {
287288
if( value instanceof Path ) {
288289
return normalizePath(value, targetResolver)
289290
}
@@ -293,9 +294,9 @@ class PublishOp {
293294
if( el instanceof Path )
294295
return normalizePath(el, targetResolver)
295296
if( el instanceof Collection<Path> )
296-
return normalizePaths(el, targetResolver)
297+
return normalizeValue(el, targetResolver)
297298
if( el instanceof Map )
298-
return normalizePaths(el, targetResolver)
299+
return normalizeValue(el, targetResolver)
299300
return el
300301
}
301302
}
@@ -305,9 +306,9 @@ class PublishOp {
305306
if( v instanceof Path )
306307
return [k, normalizePath(v, targetResolver)]
307308
if( v instanceof Collection<Path> )
308-
return [k, normalizePaths(v, targetResolver)]
309+
return [k, normalizeValue(v, targetResolver)]
309310
if( v instanceof Map )
310-
return [k, normalizePaths(v, targetResolver)]
311+
return [k, normalizeValue(v, targetResolver)]
311312
return [k, v]
312313
}
313314
}

modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,19 @@ interface TraceObserverV2 {
121121
default void onFlowError(TaskEvent event) {}
122122

123123
/**
124-
* Invoked when a workflow output is published.
124+
* Invoked when a workflow output is completed.
125+
*
126+
* For a given workflow output, this event is guaranteed
127+
* to be emitted after all files for that output have been
128+
* published via onFilePublish().
125129
*
126130
* @param event
127131
*/
128132
default void onWorkflowOutput(WorkflowOutputEvent event) {}
129133

130134
/**
131-
* Invoked when a file is published, either by a `publishDir` directive
132-
* or a workflow output.
135+
* Invoked when a file is published by a workflow output
136+
* or `publishDir` directive.
133137
*
134138
* @param event
135139
*/

modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import groovy.transform.Canonical
2222
import groovy.transform.CompileStatic
2323

2424
/**
25-
* Models a file publish event.
25+
* Models a file publish event, which is emitted for each file
26+
* that is published in a workflow output or publishDir.
2627
*
2728
* @author Ben Sherman <bentshermann@gmail.com>
2829
*/

modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import groovy.transform.Canonical
2222
import groovy.transform.CompileStatic
2323

2424
/**
25-
* Models a workflow output event.
25+
* Models a workflow output event, which is emitted for each
26+
* workflow output when it is completed.
2627
*
2728
* @author Ben Sherman <bentshermann@gmail.com>
2829
*/
@@ -34,7 +35,13 @@ class WorkflowOutputEvent {
3435
*/
3536
String name
3637
/**
37-
* The value of the workflow output. It will be null if the index file was specified.
38+
* The value of the workflow output.
39+
*
40+
* If the source is a dataflow channel. this value is an unordered
41+
* collection of the published values from the channel. If the source
42+
* is a dataflow value, this value is the published value.
43+
*
44+
* If the index file was enabled, this value is null.
3845
*/
3946
Object value
4047
/**

0 commit comments

Comments
 (0)