From 6333b8c913047fc9c5c7add396d33acfba187077 Mon Sep 17 00:00:00 2001 From: Rob Syme Date: Sat, 3 Jan 2026 18:36:33 -0500 Subject: [PATCH 1/3] Add onWorkflowPublish event to TraceObserverV2 Fire a new event during PublishOp::onNext for each value being published to a workflow output. Unlike onWorkflowOutput which fires after completion, this provides real-time notification with the complete channel value as each emission is published. Signed-off-by: Rob Syme --- .../src/main/groovy/nextflow/Session.groovy | 5 +++ .../nextflow/extension/PublishOp.groovy | 4 ++ .../nextflow/trace/TraceObserverV2.groovy | 10 +++++ .../trace/event/WorkflowPublishEvent.groovy | 39 +++++++++++++++++++ .../nextflow/script/OutputDslTest.groovy | 5 +++ 5 files changed, 63 insertions(+) create mode 100644 modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index eb5e55e852..636ff3f2ea 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -79,6 +79,7 @@ import nextflow.trace.WorkflowStatsObserver import nextflow.trace.event.FilePublishEvent import nextflow.trace.event.TaskEvent import nextflow.trace.event.WorkflowOutputEvent +import nextflow.trace.event.WorkflowPublishEvent import nextflow.util.Barrier import nextflow.util.ConfigHelper import nextflow.util.Duration @@ -1071,6 +1072,10 @@ class Session implements ISession { notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event)) } + void notifyWorkflowPublish(WorkflowPublishEvent event) { + notifyEvent(observersV2, ob -> ob.onWorkflowPublish(event)) + } + void notifyFilePublish(FilePublishEvent event) { notifyEvent(observersV1, ob -> ob.onFilePublish(event.target, event.source)) notifyEvent(observersV2, ob -> ob.onFilePublish(event)) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy index 4dfa78880e..56d16e114f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy @@ -26,6 +26,7 @@ import nextflow.exception.ScriptRuntimeException import nextflow.processor.PublishDir import nextflow.trace.event.FilePublishEvent import nextflow.trace.event.WorkflowOutputEvent +import nextflow.trace.event.WorkflowPublishEvent import nextflow.util.CsvWriter /** * Publish a workflow output. @@ -88,6 +89,9 @@ class PublishOp { protected void onNext(value) { log.trace "Publish operator received: $value" + // notify observers + session.notifyWorkflowPublish(new WorkflowPublishEvent(name, value)) + // evaluate dynamic path final targetResolver = getTargetDir(value) if( targetResolver == null ) diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy index 82beed1e90..761c4a51cf 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy @@ -23,6 +23,7 @@ import nextflow.processor.TaskProcessor import nextflow.trace.event.FilePublishEvent import nextflow.trace.event.TaskEvent import nextflow.trace.event.WorkflowOutputEvent +import nextflow.trace.event.WorkflowPublishEvent /** * Interface for consuming workflow lifecycle events. @@ -127,6 +128,15 @@ interface TraceObserverV2 { */ default void onWorkflowOutput(WorkflowOutputEvent event) {} + /** + * Invoked when a value is being published to a workflow output. + * Unlike {@link #onWorkflowOutput} which fires after all values are + * published, this fires for each value as it is published. + * + * @param event + */ + default void onWorkflowPublish(WorkflowPublishEvent event) {} + /** * Invoked when a file is published, either by a `publishDir` directive * or a workflow output. diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy new file mode 100644 index 0000000000..924ca24618 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy @@ -0,0 +1,39 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.trace.event + +import groovy.transform.Canonical +import groovy.transform.CompileStatic + +/** + * Models a workflow publish event, fired for each value + * as it is published to a workflow output. + * + * @author Rob Syme + */ +@Canonical +@CompileStatic +class WorkflowPublishEvent { + /** + * The name of the workflow output. + */ + String name + /** + * The value being published. + */ + Object value +} diff --git a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy index f398c0ecab..84a7877f2e 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy @@ -10,6 +10,7 @@ import nextflow.Session import nextflow.SysEnv import nextflow.trace.event.FilePublishEvent import nextflow.trace.event.WorkflowOutputEvent +import nextflow.trace.event.WorkflowPublishEvent import spock.lang.Specification /** * @@ -86,6 +87,8 @@ class OutputDslTest extends Specification { "${outputDir}/barbar/file2.txt" """.stripIndent() and: + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', file1)) + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('bar', file2)) 1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('foo/file1.txt'), null)) 1 * session.notifyFilePublish(new FilePublishEvent(file2, outputDir.resolve('barbar/file2.txt'), ['foo', 'bar'])) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('foo/file1.txt')], null)) @@ -127,6 +130,7 @@ class OutputDslTest extends Specification { then: outputDir.resolve('file1.txt').text == 'Hello' and: + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', file1)) 1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('file1.txt'), null)) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('file1.txt')], null)) @@ -165,6 +169,7 @@ class OutputDslTest extends Specification { dsl.apply(session) await(dsl) then: + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', record)) 0 * session.notifyFilePublish(_) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [ record ], null)) From 7e993380f1240821e80f24afb3bb3ca9c5bf9f66 Mon Sep 17 00:00:00 2001 From: Rob Syme Date: Sat, 3 Jan 2026 21:19:30 -0500 Subject: [PATCH 2/3] Move notifyWorkflowPublish call after path normalization in PublishOp.groovy Signed-off-by: Rob Syme --- .../src/main/groovy/nextflow/extension/PublishOp.groovy | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy index 56d16e114f..d7e78c1538 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy @@ -89,9 +89,6 @@ class PublishOp { protected void onNext(value) { log.trace "Publish operator received: $value" - // notify observers - session.notifyWorkflowPublish(new WorkflowPublishEvent(name, value)) - // evaluate dynamic path final targetResolver = getTargetDir(value) if( targetResolver == null ) @@ -118,6 +115,9 @@ class PublishOp { final normalized = normalizePaths(value, targetResolver) log.trace "Normalized record for index file: ${normalized}" indexRecords << normalized + + // notify observers + session.notifyWorkflowPublish(new WorkflowPublishEvent(name, normalized)) } /** From 6fa20636ec1522cb23455f4cd528e2258262dd71 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 6 Jan 2026 10:13:53 -0600 Subject: [PATCH 3/3] update docs Signed-off-by: Ben Sherman --- .../src/main/groovy/nextflow/Session.groovy | 10 ++--- .../nextflow/extension/PublishOp.groovy | 41 +++++++++---------- .../nextflow/trace/TraceObserverV2.groovy | 23 +++++++---- .../trace/event/FilePublishEvent.groovy | 3 +- .../trace/event/WorkflowOutputEvent.groovy | 11 ++++- .../trace/event/WorkflowPublishEvent.groovy | 9 ++-- .../nextflow/script/OutputDslTest.groovy | 8 ++-- 7 files changed, 61 insertions(+), 44 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 636ff3f2ea..6c6e8efc2a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -1068,17 +1068,17 @@ class Session implements ISession { notifyEvent(observersV2, ob -> ob.onFlowCreate(this)) } - void notifyWorkflowOutput(WorkflowOutputEvent event) { - notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event)) + void notifyFilePublish(FilePublishEvent event) { + notifyEvent(observersV1, ob -> ob.onFilePublish(event.target, event.source)) + notifyEvent(observersV2, ob -> ob.onFilePublish(event)) } void notifyWorkflowPublish(WorkflowPublishEvent event) { notifyEvent(observersV2, ob -> ob.onWorkflowPublish(event)) } - void notifyFilePublish(FilePublishEvent event) { - notifyEvent(observersV1, ob -> ob.onFilePublish(event.target, event.source)) - notifyEvent(observersV2, ob -> ob.onFilePublish(event)) + void notifyWorkflowOutput(WorkflowOutputEvent event) { + notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event)) } void notifyFlowComplete() { diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy index d7e78c1538..d9cdfcb409 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy @@ -52,7 +52,7 @@ class PublishOp { private IndexOpts indexOpts - private List indexRecords = [] + private List publishedValues = [] private volatile boolean complete @@ -87,7 +87,7 @@ class PublishOp { * @param value */ protected void onNext(value) { - log.trace "Publish operator received: $value" + log.trace "Received value for workflow output '${name}': ${value}" // evaluate dynamic path final targetResolver = getTargetDir(value) @@ -111,13 +111,12 @@ class PublishOp { publisher.apply(files, sourceDir) } - // append record to index - final normalized = normalizePaths(value, targetResolver) - log.trace "Normalized record for index file: ${normalized}" - indexRecords << normalized + // publish value to workflow output + final normalizedValue = normalizeValue(value, targetResolver) - // notify observers - session.notifyWorkflowPublish(new WorkflowPublishEvent(name, normalized)) + log.trace "Published value to workflow output '${name}': ${normalizedValue}" + publishedValues << normalizedValue + session.notifyWorkflowPublish(new WorkflowPublishEvent(name, normalizedValue)) } /** @@ -199,26 +198,26 @@ class PublishOp { */ protected void onComplete(nope) { // publish individual record if source is a value channel - final value = CH.isValue(source) - ? indexRecords.first() - : indexRecords + final outputValue = CH.isValue(source) + ? publishedValues.first() + : publishedValues // publish workflow output final indexPath = indexOpts ? indexOpts.path : null - session.notifyWorkflowOutput(new WorkflowOutputEvent(name, value, indexPath)) + session.notifyWorkflowOutput(new WorkflowOutputEvent(name, outputValue, indexPath)) // write value to index file if( indexOpts ) { final ext = indexPath.getExtension() indexPath.parent.mkdirs() if( ext == 'csv' ) { - new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexPath) + new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(publishedValues, indexPath) } else if( ext == 'json' ) { - indexPath.text = DumpHelper.prettyPrintJson(value) + indexPath.text = DumpHelper.prettyPrintJson(outputValue) } else if( ext == 'yaml' || ext == 'yml' ) { - indexPath.text = DumpHelper.prettyPrintYaml(value) + indexPath.text = DumpHelper.prettyPrintYaml(outputValue) } else { log.warn "Invalid extension '${ext}' for index file '${indexPath}' -- should be CSV, JSON, or YAML" @@ -226,7 +225,7 @@ class PublishOp { session.notifyFilePublish(new FilePublishEvent(null, indexPath, publishOpts.labels as List)) } - log.trace "Publish operator complete" + log.trace "Completed workflow output '${name}'" this.complete = true } @@ -264,7 +263,7 @@ class PublishOp { * @param value * @param targetResolver */ - protected Object normalizePaths(value, targetResolver) { + protected Object normalizeValue(value, targetResolver) { if( value instanceof Path ) { return normalizePath(value, targetResolver) } @@ -274,9 +273,9 @@ class PublishOp { if( el instanceof Path ) return normalizePath(el, targetResolver) if( el instanceof Collection ) - return normalizePaths(el, targetResolver) + return normalizeValue(el, targetResolver) if( el instanceof Map ) - return normalizePaths(el, targetResolver) + return normalizeValue(el, targetResolver) return el } } @@ -286,9 +285,9 @@ class PublishOp { if( v instanceof Path ) return [k, normalizePath(v, targetResolver)] if( v instanceof Collection ) - return [k, normalizePaths(v, targetResolver)] + return [k, normalizeValue(v, targetResolver)] if( v instanceof Map ) - return [k, normalizePaths(v, targetResolver)] + return [k, normalizeValue(v, targetResolver)] return [k, v] } } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy index 761c4a51cf..9483c7cbce 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy @@ -122,27 +122,34 @@ interface TraceObserverV2 { default void onFlowError(TaskEvent event) {} /** - * Invoked when a workflow output is published. + * Invoked when a file is published by a workflow output + * or `publishDir` directive. * * @param event */ - default void onWorkflowOutput(WorkflowOutputEvent event) {} + default void onFilePublish(FilePublishEvent event) {} /** - * Invoked when a value is being published to a workflow output. - * Unlike {@link #onWorkflowOutput} which fires after all values are - * published, this fires for each value as it is published. + * Invoked when a workflow output receives a value from + * a dataflow source. + * + * For a given published value, this event is guaranteed + * to be emitted after all files in that value have been + * published via onFilePublish(). * * @param event */ default void onWorkflowPublish(WorkflowPublishEvent event) {} /** - * Invoked when a file is published, either by a `publishDir` directive - * or a workflow output. + * Invoked when a workflow output is completed. + * + * For a given workflow output, this event is guaranteed + * to be emitted after all values for that output have been + * published via onWorkflowPublish(). * * @param event */ - default void onFilePublish(FilePublishEvent event) {} + default void onWorkflowOutput(WorkflowOutputEvent event) {} } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy index 5623c2b3e0..b10240f1c7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy @@ -22,7 +22,8 @@ import groovy.transform.Canonical import groovy.transform.CompileStatic /** - * Models a file publish event. + * Models a file publish event, which is emitted for each file + * that is published in a workflow output or publishDir. * * @author Ben Sherman */ diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy index c51944db0a..acc58affab 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy @@ -22,7 +22,8 @@ import groovy.transform.Canonical import groovy.transform.CompileStatic /** - * Models a workflow output event. + * Models a workflow output event, which is emitted for each + * workflow output when it is completed. * * @author Ben Sherman */ @@ -34,7 +35,13 @@ class WorkflowOutputEvent { */ String name /** - * The value of the workflow output. It will be null if the index file was specified. + * The value of the workflow output. + * + * If the source is a dataflow channel. this value is an unordered + * collection of the published values from the channel. If the source + * is a dataflow value, this value is the published value. + * + * If the index file was enabled, this value is null. */ Object value /** diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy index 924ca24618..6ab6a0be94 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowPublishEvent.groovy @@ -20,8 +20,8 @@ import groovy.transform.Canonical import groovy.transform.CompileStatic /** - * Models a workflow publish event, fired for each value - * as it is published to a workflow output. + * Models a workflow publish event, which is emitted for each value + * that is published to a workflow output from a dataflow source. * * @author Rob Syme */ @@ -33,7 +33,10 @@ class WorkflowPublishEvent { */ String name /** - * The value being published. + * The published value. + * + * File paths from the work directory are normalized to + * their corresponding path in the output directory. */ Object value } diff --git a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy index 84a7877f2e..030a8a9681 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy @@ -87,10 +87,10 @@ class OutputDslTest extends Specification { "${outputDir}/barbar/file2.txt" """.stripIndent() and: - 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', file1)) - 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('bar', file2)) 1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('foo/file1.txt'), null)) 1 * session.notifyFilePublish(new FilePublishEvent(file2, outputDir.resolve('barbar/file2.txt'), ['foo', 'bar'])) + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', outputDir.resolve('foo/file1.txt'))) + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('bar', outputDir.resolve('barbar/file2.txt'))) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('foo/file1.txt')], null)) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('bar', [outputDir.resolve('barbar/file2.txt')], outputDir.resolve('index.csv'))) 1 * session.notifyFilePublish(new FilePublishEvent(null, outputDir.resolve('index.csv'), ['foo', 'bar'])) @@ -130,8 +130,8 @@ class OutputDslTest extends Specification { then: outputDir.resolve('file1.txt').text == 'Hello' and: - 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', file1)) 1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('file1.txt'), null)) + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', outputDir.resolve('file1.txt'))) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('file1.txt')], null)) cleanup: @@ -169,8 +169,8 @@ class OutputDslTest extends Specification { dsl.apply(session) await(dsl) then: - 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', record)) 0 * session.notifyFilePublish(_) + 1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', record)) 1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [ record ], null)) cleanup: