diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index eb5e55e852..6c6e8efc2a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -79,6 +79,7 @@ import nextflow.trace.WorkflowStatsObserver import nextflow.trace.event.FilePublishEvent import nextflow.trace.event.TaskEvent import nextflow.trace.event.WorkflowOutputEvent +import nextflow.trace.event.WorkflowPublishEvent import nextflow.util.Barrier import nextflow.util.ConfigHelper import nextflow.util.Duration @@ -1067,15 +1068,19 @@ class Session implements ISession { notifyEvent(observersV2, ob -> ob.onFlowCreate(this)) } - void notifyWorkflowOutput(WorkflowOutputEvent event) { - notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event)) - } - void notifyFilePublish(FilePublishEvent event) { notifyEvent(observersV1, ob -> ob.onFilePublish(event.target, event.source)) notifyEvent(observersV2, ob -> ob.onFilePublish(event)) } + void notifyWorkflowPublish(WorkflowPublishEvent event) { + notifyEvent(observersV2, ob -> ob.onWorkflowPublish(event)) + } + + void notifyWorkflowOutput(WorkflowOutputEvent event) { + notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event)) + } + void notifyFlowComplete() { notifyEvent(observersV1, ob -> ob.onFlowComplete()) notifyEvent(observersV2, ob -> ob.onFlowComplete()) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy index 4dfa78880e..d9cdfcb409 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy @@ -26,6 +26,7 @@ import nextflow.exception.ScriptRuntimeException import nextflow.processor.PublishDir import nextflow.trace.event.FilePublishEvent import nextflow.trace.event.WorkflowOutputEvent +import nextflow.trace.event.WorkflowPublishEvent import nextflow.util.CsvWriter /** * Publish a workflow output. @@ -51,7 +52,7 @@ class PublishOp { private IndexOpts indexOpts - private List indexRecords = [] + private List publishedValues = [] private volatile boolean complete @@ -86,7 +87,7 @@ class PublishOp { * @param value */ protected void onNext(value) { - log.trace "Publish operator received: $value" + log.trace "Received value for workflow output '${name}': ${value}" // evaluate dynamic path final targetResolver = getTargetDir(value) @@ -110,10 +111,12 @@ class PublishOp { publisher.apply(files, sourceDir) } - // append record to index - final normalized = normalizePaths(value, targetResolver) - log.trace "Normalized record for index file: ${normalized}" - indexRecords << normalized + // publish value to workflow output + final normalizedValue = normalizeValue(value, targetResolver) + + log.trace "Published value to workflow output '${name}': ${normalizedValue}" + publishedValues << normalizedValue + session.notifyWorkflowPublish(new WorkflowPublishEvent(name, normalizedValue)) } /** @@ -195,26 +198,26 @@ class PublishOp { */ protected void onComplete(nope) { // publish individual record if source is a value channel - final value = CH.isValue(source) - ? indexRecords.first() - : indexRecords + final outputValue = CH.isValue(source) + ? publishedValues.first() + : publishedValues // publish workflow output final indexPath = indexOpts ? indexOpts.path : null - session.notifyWorkflowOutput(new WorkflowOutputEvent(name, value, indexPath)) + session.notifyWorkflowOutput(new WorkflowOutputEvent(name, outputValue, indexPath)) // write value to index file if( indexOpts ) { final ext = indexPath.getExtension() indexPath.parent.mkdirs() if( ext == 'csv' ) { - new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexPath) + new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(publishedValues, indexPath) } else if( ext == 'json' ) { - indexPath.text = DumpHelper.prettyPrintJson(value) + indexPath.text = DumpHelper.prettyPrintJson(outputValue) } else if( ext == 'yaml' || ext == 'yml' ) { - indexPath.text = DumpHelper.prettyPrintYaml(value) + indexPath.text = DumpHelper.prettyPrintYaml(outputValue) } else { log.warn "Invalid extension '${ext}' for index file '${indexPath}' -- should be CSV, JSON, or YAML" @@ -222,7 +225,7 @@ class PublishOp { session.notifyFilePublish(new FilePublishEvent(null, indexPath, publishOpts.labels as List)) } - log.trace "Publish operator complete" + log.trace "Completed workflow output '${name}'" this.complete = true } @@ -260,7 +263,7 @@ class PublishOp { * @param value * @param targetResolver */ - protected Object normalizePaths(value, targetResolver) { + protected Object normalizeValue(value, targetResolver) { if( value instanceof Path ) { return normalizePath(value, targetResolver) } @@ -270,9 +273,9 @@ class PublishOp { if( el instanceof Path ) return normalizePath(el, targetResolver) if( el instanceof Collection ) - return normalizePaths(el, targetResolver) + return normalizeValue(el, targetResolver) if( el instanceof Map ) - return normalizePaths(el, targetResolver) + return normalizeValue(el, targetResolver) return el } } @@ -282,9 +285,9 @@ class PublishOp { if( v instanceof Path ) return [k, normalizePath(v, targetResolver)] if( v instanceof Collection ) - return [k, normalizePaths(v, targetResolver)] + return [k, normalizeValue(v, targetResolver)] if( v instanceof Map ) - return [k, normalizePaths(v, targetResolver)] + return [k, normalizeValue(v, targetResolver)] return [k, v] } } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy index 82beed1e90..9483c7cbce 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy @@ -23,6 +23,7 @@ import nextflow.processor.TaskProcessor import nextflow.trace.event.FilePublishEvent import nextflow.trace.event.TaskEvent import nextflow.trace.event.WorkflowOutputEvent +import nextflow.trace.event.WorkflowPublishEvent /** * Interface for consuming workflow lifecycle events. @@ -121,18 +122,34 @@ interface TraceObserverV2 { default void onFlowError(TaskEvent event) {} /** - * Invoked when a workflow output is published. + * Invoked when a file is published by a workflow output + * or `publishDir` directive. * * @param event */ - default void onWorkflowOutput(WorkflowOutputEvent event) {} + default void onFilePublish(FilePublishEvent event) {} /** - * Invoked when a file is published, either by a `publishDir` directive - * or a workflow output. + * Invoked when a workflow output receives a value from + * a dataflow source. + * + * For a given published value, this event is guaranteed + * to be emitted after all files in that value have been + * published via onFilePublish(). * * @param event */ - default void onFilePublish(FilePublishEvent event) {} + default void onWorkflowPublish(WorkflowPublishEvent event) {} + + /** + * Invoked when a workflow output is completed. + * + * For a given workflow output, this event is guaranteed + * to be emitted after all values for that output have been + * published via onWorkflowPublish(). + * + * @param event + */ + default void onWorkflowOutput(WorkflowOutputEvent event) {} } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy index 5623c2b3e0..b10240f1c7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy @@ -22,7 +22,8 @@ import groovy.transform.Canonical import groovy.transform.CompileStatic /** - * Models a file publish event. + * Models a file publish event, which is emitted for each file + * that is published in a workflow output or publishDir. * * @author Ben Sherman */ diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy index c51944db0a..acc58affab 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy @@ -22,7 +22,8 @@ import groovy.transform.Canonical import groovy.transform.CompileStatic /** - * Models a workflow output event. + * Models a workflow output event, which is emitted for each + * workflow output when it is completed. * * @author Ben Sherman */ @@ -34,7 +35,13 @@ class WorkflowOutputEvent { */ String name /** - * The value of the workflow output. It will be null if the index file was specified. + * The value of the workflow output. + * + * If the source is a dataflow channel. this value is an unordered + * collection of the published values from the channel. If the source + * is a dataflow value, this value is the published value. + * + * If the index file was enabled, this value is null. */ Object value /** diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy new file mode 100644 index 0000000000..6ab6a0be94 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy @@ -0,0 +1,42 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.trace.event + +import groovy.transform.Canonical +import groovy.transform.CompileStatic + +/** + * Models a workflow publish event, which is emitted for each value + * that is published to a workflow output from a dataflow source. + * + * @author Rob Syme + */ +@Canonical +@CompileStatic +class WorkflowPublishEvent { + /** + * The name of the workflow output. + */ + String name + /** + * The published value. + * + * File paths from the work directory are normalized to + * their corresponding path in the output directory. + */ + Object value +} diff --git a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy index f398c0ecab..030a8a9681 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy @@ -10,6 +10,7 @@ import nextflow.Session import nextflow.SysEnv import nextflow.trace.event.FilePublishEvent import nextflow.trace.event.WorkflowOutputEvent +import nextflow.trace.event.WorkflowPublishEvent import spock.lang.Specification /** * @@ -88,6 +89,8 @@ class OutputDslTest extends Specification { and: 1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('foo/file1.txt'), null)) 1 * session.notifyFilePublish(new FilePublishEvent(file2, outputDir.resolve('barbar/file2.txt'), ['foo', 'bar'])) + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', outputDir.resolve('foo/file1.txt'))) + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('bar', outputDir.resolve('barbar/file2.txt'))) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('foo/file1.txt')], null)) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('bar', [outputDir.resolve('barbar/file2.txt')], outputDir.resolve('index.csv'))) 1 * session.notifyFilePublish(new FilePublishEvent(null, outputDir.resolve('index.csv'), ['foo', 'bar'])) @@ -128,6 +131,7 @@ class OutputDslTest extends Specification { outputDir.resolve('file1.txt').text == 'Hello' and: 1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('file1.txt'), null)) + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', outputDir.resolve('file1.txt'))) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('file1.txt')], null)) cleanup: @@ -166,6 +170,7 @@ class OutputDslTest extends Specification { await(dsl) then: 0 * session.notifyFilePublish(_) + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', record)) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [ record ], null)) cleanup: