Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PublishOp {

private IndexOpts indexOpts

private List indexRecords = []
private List publishedValues = []

private DataflowVariable target

Expand Down Expand Up @@ -106,7 +106,7 @@ class PublishOp {
* @param value
*/
protected void onNext(value) {
log.trace "Publish operator received: $value"
log.trace "Received value for workflow output '${name}': ${value}"

// evaluate dynamic path
final targetResolver = getTargetDir(value)
Expand All @@ -130,10 +130,11 @@ class PublishOp {
publisher.apply(files, sourceDir)
}

// append record to index
final normalized = normalizePaths(value, targetResolver)
log.trace "Normalized record for index file: ${normalized}"
indexRecords << normalized
// publish value to workflow output
final normalizedValue = normalizeValue(value, targetResolver)

log.trace "Published value to workflow output '${name}': ${normalizedValue}"
publishedValues << normalizedValue
}

/**
Expand Down Expand Up @@ -216,37 +217,37 @@ class PublishOp {
*/
protected void onComplete() {
// publish individual record if source is a value channel
final value = CH.isValue(source)
? indexRecords.first()
: indexRecords
final outputValue = CH.isValue(source)
? publishedValues.first()
: publishedValues

// publish workflow output
final indexPath = indexOpts
? session.outputDir.resolve(indexOpts.path)
: null
session.notifyWorkflowOutput(new WorkflowOutputEvent(name, value, indexPath))
session.notifyWorkflowOutput(new WorkflowOutputEvent(name, outputValue, indexPath))

// write value to index file
if( indexOpts ) {
final ext = indexPath.getExtension()
indexPath.parent.mkdirs()
if( ext == 'csv' ) {
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexPath)
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(publishedValues, indexPath)
}
else if( ext == 'json' ) {
indexPath.text = DumpHelper.prettyPrintJson(value)
indexPath.text = DumpHelper.prettyPrintJson(outputValue)
}
else if( ext == 'yaml' || ext == 'yml' ) {
indexPath.text = DumpHelper.prettyPrintYaml(value)
indexPath.text = DumpHelper.prettyPrintYaml(outputValue)
}
else {
throw new ScriptRuntimeException("Invalid extension '${ext}' for index file '${indexOpts.path}' -- should be CSV, JSON, or YAML")
}
session.notifyFilePublish(new FilePublishEvent(null, indexPath, publishOpts.labels as List))
}

log.trace "Publish operator complete"
target.bind(indexPath ?: value)
log.trace "Completed workflow output '${name}'"
target.bind(indexPath ?: outputValue)
}

/**
Expand Down Expand Up @@ -283,7 +284,7 @@ class PublishOp {
* @param value
* @param targetResolver
*/
protected Object normalizePaths(value, targetResolver) {
protected Object normalizeValue(value, targetResolver) {
if( value instanceof Path ) {
return normalizePath(value, targetResolver)
}
Expand All @@ -293,9 +294,9 @@ class PublishOp {
if( el instanceof Path )
return normalizePath(el, targetResolver)
if( el instanceof Collection<Path> )
return normalizePaths(el, targetResolver)
return normalizeValue(el, targetResolver)
if( el instanceof Map )
return normalizePaths(el, targetResolver)
return normalizeValue(el, targetResolver)
return el
}
}
Expand All @@ -305,9 +306,9 @@ class PublishOp {
if( v instanceof Path )
return [k, normalizePath(v, targetResolver)]
if( v instanceof Collection<Path> )
return [k, normalizePaths(v, targetResolver)]
return [k, normalizeValue(v, targetResolver)]
if( v instanceof Map )
return [k, normalizePaths(v, targetResolver)]
return [k, normalizeValue(v, targetResolver)]
return [k, v]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,19 @@ interface TraceObserverV2 {
default void onFlowError(TaskEvent event) {}

/**
* Invoked when a workflow output is published.
* Invoked when a workflow output is completed.
*
* For a given workflow output, this event is guaranteed
* to be emitted after all files for that output have been
* published via onFilePublish().
*
* @param event
*/
default void onWorkflowOutput(WorkflowOutputEvent event) {}

/**
* Invoked when a file is published, either by a `publishDir` directive
* or a workflow output.
* Invoked when a file is published by a workflow output
* or `publishDir` directive.
*
* @param event
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import groovy.transform.Canonical
import groovy.transform.CompileStatic

/**
* Models a file publish event.
* Models a file publish event, which is emitted for each file
* that is published in a workflow output or publishDir.
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import groovy.transform.Canonical
import groovy.transform.CompileStatic

/**
* Models a workflow output event.
* Models a workflow output event, which is emitted for each
* workflow output when it is completed.
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
Expand All @@ -34,7 +35,13 @@ class WorkflowOutputEvent {
*/
String name
/**
* The value of the workflow output. It will be null if the index file was specified.
* The value of the workflow output.
*
* If the source is a dataflow channel. this value is an unordered
* collection of the published values from the channel. If the source
* is a dataflow value, this value is the published value.
*
* If the index file was enabled, this value is null.
*/
Object value
/**
Expand Down
Loading