diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy index a26c16c58f..38188f09ae 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy @@ -52,7 +52,7 @@ class PublishOp { private IndexOpts indexOpts - private List indexRecords = [] + private List publishedValues = [] private DataflowVariable target @@ -106,7 +106,7 @@ class PublishOp { * @param value */ protected void onNext(value) { - log.trace "Publish operator received: $value" + log.trace "Received value for workflow output '${name}': ${value}" // evaluate dynamic path final targetResolver = getTargetDir(value) @@ -130,10 +130,11 @@ class PublishOp { publisher.apply(files, sourceDir) } - // append record to index - final normalized = normalizePaths(value, targetResolver) - log.trace "Normalized record for index file: ${normalized}" - indexRecords << normalized + // publish value to workflow output + final normalizedValue = normalizeValue(value, targetResolver) + + log.trace "Published value to workflow output '${name}': ${normalizedValue}" + publishedValues << normalizedValue } /** @@ -216,28 +217,28 @@ class PublishOp { */ protected void onComplete() { // publish individual record if source is a value channel - final value = CH.isValue(source) - ? indexRecords.first() - : indexRecords + final outputValue = CH.isValue(source) + ? publishedValues.first() + : publishedValues // publish workflow output final indexPath = indexOpts ? session.outputDir.resolve(indexOpts.path) : null - session.notifyWorkflowOutput(new WorkflowOutputEvent(name, value, indexPath)) + session.notifyWorkflowOutput(new WorkflowOutputEvent(name, outputValue, indexPath)) // write value to index file if( indexOpts ) { final ext = indexPath.getExtension() indexPath.parent.mkdirs() if( ext == 'csv' ) { - new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexPath) + new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(publishedValues, indexPath) } else if( ext == 'json' ) { - indexPath.text = DumpHelper.prettyPrintJson(value) + indexPath.text = DumpHelper.prettyPrintJson(outputValue) } else if( ext == 'yaml' || ext == 'yml' ) { - indexPath.text = DumpHelper.prettyPrintYaml(value) + indexPath.text = DumpHelper.prettyPrintYaml(outputValue) } else { throw new ScriptRuntimeException("Invalid extension '${ext}' for index file '${indexOpts.path}' -- should be CSV, JSON, or YAML") @@ -245,8 +246,8 @@ class PublishOp { session.notifyFilePublish(new FilePublishEvent(null, indexPath, publishOpts.labels as List)) } - log.trace "Publish operator complete" - target.bind(indexPath ?: value) + log.trace "Completed workflow output '${name}'" + target.bind(indexPath ?: outputValue) } /** @@ -283,7 +284,7 @@ class PublishOp { * @param value * @param targetResolver */ - protected Object normalizePaths(value, targetResolver) { + protected Object normalizeValue(value, targetResolver) { if( value instanceof Path ) { return normalizePath(value, targetResolver) } @@ -293,9 +294,9 @@ class PublishOp { if( el instanceof Path ) return normalizePath(el, targetResolver) if( el instanceof Collection ) - return normalizePaths(el, targetResolver) + return normalizeValue(el, targetResolver) if( el instanceof Map ) - return normalizePaths(el, targetResolver) + return normalizeValue(el, targetResolver) return el } } @@ -305,9 +306,9 @@ class PublishOp { if( v instanceof Path ) return [k, normalizePath(v, targetResolver)] if( v instanceof Collection ) - return [k, normalizePaths(v, targetResolver)] + return [k, normalizeValue(v, targetResolver)] if( v instanceof Map ) - return [k, normalizePaths(v, targetResolver)] + return [k, normalizeValue(v, targetResolver)] return [k, v] } } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy index 82beed1e90..5b27b2324c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy @@ -121,15 +121,19 @@ interface TraceObserverV2 { default void onFlowError(TaskEvent event) {} /** - * Invoked when a workflow output is published. + * Invoked when a workflow output is completed. + * + * For a given workflow output, this event is guaranteed + * to be emitted after all files for that output have been + * published via onFilePublish(). * * @param event */ default void onWorkflowOutput(WorkflowOutputEvent event) {} /** - * Invoked when a file is published, either by a `publishDir` directive - * or a workflow output. + * Invoked when a file is published by a workflow output + * or `publishDir` directive. * * @param event */ diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy index 5623c2b3e0..b10240f1c7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy @@ -22,7 +22,8 @@ import groovy.transform.Canonical import groovy.transform.CompileStatic /** - * Models a file publish event. + * Models a file publish event, which is emitted for each file + * that is published in a workflow output or publishDir. * * @author Ben Sherman */ diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy index c51944db0a..acc58affab 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy @@ -22,7 +22,8 @@ import groovy.transform.Canonical import groovy.transform.CompileStatic /** - * Models a workflow output event. + * Models a workflow output event, which is emitted for each + * workflow output when it is completed. * * @author Ben Sherman */ @@ -34,7 +35,13 @@ class WorkflowOutputEvent { */ String name /** - * The value of the workflow output. It will be null if the index file was specified. + * The value of the workflow output. + * + * If the source is a dataflow channel. this value is an unordered + * collection of the published values from the channel. If the source + * is a dataflow value, this value is the published value. + * + * If the index file was enabled, this value is null. */ Object value /**