Skip to content

Commit b5a155d

Browse files
committed
Add onWorkflowPublish event to TraceObserverV2
Fire a new event during PublishOp::onNext for each value being published to a workflow output. Unlike onWorkflowOutput which fires after completion, this provides real-time notification with the complete channel value as each emission is published. Signed-off-by: Rob Syme <[email protected]>
1 parent 38c3910 commit b5a155d

File tree

5 files changed

+63
-0
lines changed

5 files changed

+63
-0
lines changed

modules/nextflow/src/main/groovy/nextflow/Session.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ import nextflow.trace.WorkflowStatsObserver
7979
import nextflow.trace.event.FilePublishEvent
8080
import nextflow.trace.event.TaskEvent
8181
import nextflow.trace.event.WorkflowOutputEvent
82+
import nextflow.trace.event.WorkflowPublishEvent
8283
import nextflow.util.Barrier
8384
import nextflow.util.ConfigHelper
8485
import nextflow.util.Duration
@@ -1071,6 +1072,10 @@ class Session implements ISession {
10711072
notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event))
10721073
}
10731074

1075+
void notifyWorkflowPublish(WorkflowPublishEvent event) {
1076+
notifyEvent(observersV2, ob -> ob.onWorkflowPublish(event))
1077+
}
1078+
10741079
void notifyFilePublish(FilePublishEvent event) {
10751080
notifyEvent(observersV1, ob -> ob.onFilePublish(event.target, event.source))
10761081
notifyEvent(observersV2, ob -> ob.onFilePublish(event))

modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import nextflow.exception.ScriptRuntimeException
2626
import nextflow.processor.PublishDir
2727
import nextflow.trace.event.FilePublishEvent
2828
import nextflow.trace.event.WorkflowOutputEvent
29+
import nextflow.trace.event.WorkflowPublishEvent
2930
import nextflow.util.CsvWriter
3031
/**
3132
* Publish a workflow output.
@@ -88,6 +89,9 @@ class PublishOp {
8889
protected void onNext(value) {
8990
log.trace "Publish operator received: $value"
9091

92+
// notify observers
93+
session.notifyWorkflowPublish(new WorkflowPublishEvent(name, value))
94+
9195
// evaluate dynamic path
9296
final targetResolver = getTargetDir(value)
9397
if( targetResolver == null )

modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import nextflow.processor.TaskProcessor
2323
import nextflow.trace.event.FilePublishEvent
2424
import nextflow.trace.event.TaskEvent
2525
import nextflow.trace.event.WorkflowOutputEvent
26+
import nextflow.trace.event.WorkflowPublishEvent
2627

2728
/**
2829
* Interface for consuming workflow lifecycle events.
@@ -127,6 +128,15 @@ interface TraceObserverV2 {
127128
*/
128129
default void onWorkflowOutput(WorkflowOutputEvent event) {}
129130

131+
/**
132+
* Invoked when a value is being published to a workflow output.
133+
* Unlike {@link #onWorkflowOutput} which fires after all values are
134+
* published, this fires for each value as it is published.
135+
*
136+
* @param event
137+
*/
138+
default void onWorkflowPublish(WorkflowPublishEvent event) {}
139+
130140
/**
131141
* Invoked when a file is published, either by a `publishDir` directive
132142
* or a workflow output.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.trace.event
18+
19+
import groovy.transform.Canonical
20+
import groovy.transform.CompileStatic
21+
22+
/**
23+
* Models a workflow publish event, fired for each value
24+
* as it is published to a workflow output.
25+
*
26+
* @author Ben Sherman <[email protected]>
27+
*/
28+
@Canonical
29+
@CompileStatic
30+
class WorkflowPublishEvent {
31+
/**
32+
* The name of the workflow output.
33+
*/
34+
String name
35+
/**
36+
* The value being published.
37+
*/
38+
Object value
39+
}

modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import nextflow.Session
1010
import nextflow.SysEnv
1111
import nextflow.trace.event.FilePublishEvent
1212
import nextflow.trace.event.WorkflowOutputEvent
13+
import nextflow.trace.event.WorkflowPublishEvent
1314
import spock.lang.Specification
1415
/**
1516
*
@@ -86,6 +87,8 @@ class OutputDslTest extends Specification {
8687
"${outputDir}/barbar/file2.txt"
8788
""".stripIndent()
8889
and:
90+
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', file1))
91+
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('bar', file2))
8992
1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('foo/file1.txt'), null))
9093
1 * session.notifyFilePublish(new FilePublishEvent(file2, outputDir.resolve('barbar/file2.txt'), ['foo', 'bar']))
9194
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('foo/file1.txt')], null))
@@ -127,6 +130,7 @@ class OutputDslTest extends Specification {
127130
then:
128131
outputDir.resolve('file1.txt').text == 'Hello'
129132
and:
133+
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', file1))
130134
1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('file1.txt'), null))
131135
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('file1.txt')], null))
132136

@@ -165,6 +169,7 @@ class OutputDslTest extends Specification {
165169
dsl.apply(session)
166170
await(dsl)
167171
then:
172+
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', record))
168173
0 * session.notifyFilePublish(_)
169174
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [ record ], null))
170175

0 commit comments

Comments
 (0)