Skip to content

Commit ffee8a0

Browse files
robsymebentsherman
andauthored
Add onWorkflowPublish event to TraceObserverV2 (#6701)
Co-authored-by: Ben Sherman <bentshermann@gmail.com>
1 parent e517f4c commit ffee8a0

File tree

7 files changed

+111
-31
lines changed

7 files changed

+111
-31
lines changed

modules/nextflow/src/main/groovy/nextflow/Session.groovy

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ import nextflow.trace.WorkflowStatsObserver
7979
import nextflow.trace.event.FilePublishEvent
8080
import nextflow.trace.event.TaskEvent
8181
import nextflow.trace.event.WorkflowOutputEvent
82+
import nextflow.trace.event.WorkflowPublishEvent
8283
import nextflow.util.Barrier
8384
import nextflow.util.ConfigHelper
8485
import nextflow.util.Duration
@@ -1067,15 +1068,19 @@ class Session implements ISession {
10671068
notifyEvent(observersV2, ob -> ob.onFlowCreate(this))
10681069
}
10691070

1070-
void notifyWorkflowOutput(WorkflowOutputEvent event) {
1071-
notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event))
1072-
}
1073-
10741071
void notifyFilePublish(FilePublishEvent event) {
10751072
notifyEvent(observersV1, ob -> ob.onFilePublish(event.target, event.source))
10761073
notifyEvent(observersV2, ob -> ob.onFilePublish(event))
10771074
}
10781075

1076+
void notifyWorkflowPublish(WorkflowPublishEvent event) {
1077+
notifyEvent(observersV2, ob -> ob.onWorkflowPublish(event))
1078+
}
1079+
1080+
void notifyWorkflowOutput(WorkflowOutputEvent event) {
1081+
notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event))
1082+
}
1083+
10791084
void notifyFlowComplete() {
10801085
notifyEvent(observersV1, ob -> ob.onFlowComplete())
10811086
notifyEvent(observersV2, ob -> ob.onFlowComplete())

modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import nextflow.exception.ScriptRuntimeException
2626
import nextflow.processor.PublishDir
2727
import nextflow.trace.event.FilePublishEvent
2828
import nextflow.trace.event.WorkflowOutputEvent
29+
import nextflow.trace.event.WorkflowPublishEvent
2930
import nextflow.util.CsvWriter
3031
/**
3132
* Publish a workflow output.
@@ -51,7 +52,7 @@ class PublishOp {
5152

5253
private IndexOpts indexOpts
5354

54-
private List indexRecords = []
55+
private List publishedValues = []
5556

5657
private volatile boolean complete
5758

@@ -86,7 +87,7 @@ class PublishOp {
8687
* @param value
8788
*/
8889
protected void onNext(value) {
89-
log.trace "Publish operator received: $value"
90+
log.trace "Received value for workflow output '${name}': ${value}"
9091

9192
// evaluate dynamic path
9293
final targetResolver = getTargetDir(value)
@@ -110,10 +111,12 @@ class PublishOp {
110111
publisher.apply(files, sourceDir)
111112
}
112113

113-
// append record to index
114-
final normalized = normalizePaths(value, targetResolver)
115-
log.trace "Normalized record for index file: ${normalized}"
116-
indexRecords << normalized
114+
// publish value to workflow output
115+
final normalizedValue = normalizeValue(value, targetResolver)
116+
117+
log.trace "Published value to workflow output '${name}': ${normalizedValue}"
118+
publishedValues << normalizedValue
119+
session.notifyWorkflowPublish(new WorkflowPublishEvent(name, normalizedValue))
117120
}
118121

119122
/**
@@ -195,34 +198,34 @@ class PublishOp {
195198
*/
196199
protected void onComplete(nope) {
197200
// publish individual record if source is a value channel
198-
final value = CH.isValue(source)
199-
? indexRecords.first()
200-
: indexRecords
201+
final outputValue = CH.isValue(source)
202+
? publishedValues.first()
203+
: publishedValues
201204

202205
// publish workflow output
203206
final indexPath = indexOpts ? indexOpts.path : null
204-
session.notifyWorkflowOutput(new WorkflowOutputEvent(name, value, indexPath))
207+
session.notifyWorkflowOutput(new WorkflowOutputEvent(name, outputValue, indexPath))
205208

206209
// write value to index file
207210
if( indexOpts ) {
208211
final ext = indexPath.getExtension()
209212
indexPath.parent.mkdirs()
210213
if( ext == 'csv' ) {
211-
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexPath)
214+
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(publishedValues, indexPath)
212215
}
213216
else if( ext == 'json' ) {
214-
indexPath.text = DumpHelper.prettyPrintJson(value)
217+
indexPath.text = DumpHelper.prettyPrintJson(outputValue)
215218
}
216219
else if( ext == 'yaml' || ext == 'yml' ) {
217-
indexPath.text = DumpHelper.prettyPrintYaml(value)
220+
indexPath.text = DumpHelper.prettyPrintYaml(outputValue)
218221
}
219222
else {
220223
log.warn "Invalid extension '${ext}' for index file '${indexPath}' -- should be CSV, JSON, or YAML"
221224
}
222225
session.notifyFilePublish(new FilePublishEvent(null, indexPath, publishOpts.labels as List))
223226
}
224227

225-
log.trace "Publish operator complete"
228+
log.trace "Completed workflow output '${name}'"
226229
this.complete = true
227230
}
228231

@@ -260,7 +263,7 @@ class PublishOp {
260263
* @param value
261264
* @param targetResolver
262265
*/
263-
protected Object normalizePaths(value, targetResolver) {
266+
protected Object normalizeValue(value, targetResolver) {
264267
if( value instanceof Path ) {
265268
return normalizePath(value, targetResolver)
266269
}
@@ -270,9 +273,9 @@ class PublishOp {
270273
if( el instanceof Path )
271274
return normalizePath(el, targetResolver)
272275
if( el instanceof Collection<Path> )
273-
return normalizePaths(el, targetResolver)
276+
return normalizeValue(el, targetResolver)
274277
if( el instanceof Map )
275-
return normalizePaths(el, targetResolver)
278+
return normalizeValue(el, targetResolver)
276279
return el
277280
}
278281
}
@@ -282,9 +285,9 @@ class PublishOp {
282285
if( v instanceof Path )
283286
return [k, normalizePath(v, targetResolver)]
284287
if( v instanceof Collection<Path> )
285-
return [k, normalizePaths(v, targetResolver)]
288+
return [k, normalizeValue(v, targetResolver)]
286289
if( v instanceof Map )
287-
return [k, normalizePaths(v, targetResolver)]
290+
return [k, normalizeValue(v, targetResolver)]
288291
return [k, v]
289292
}
290293
}

modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import nextflow.processor.TaskProcessor
2323
import nextflow.trace.event.FilePublishEvent
2424
import nextflow.trace.event.TaskEvent
2525
import nextflow.trace.event.WorkflowOutputEvent
26+
import nextflow.trace.event.WorkflowPublishEvent
2627

2728
/**
2829
* Interface for consuming workflow lifecycle events.
@@ -121,18 +122,34 @@ interface TraceObserverV2 {
121122
default void onFlowError(TaskEvent event) {}
122123

123124
/**
124-
* Invoked when a workflow output is published.
125+
* Invoked when a file is published by a workflow output
126+
* or `publishDir` directive.
125127
*
126128
* @param event
127129
*/
128-
default void onWorkflowOutput(WorkflowOutputEvent event) {}
130+
default void onFilePublish(FilePublishEvent event) {}
129131

130132
/**
131-
* Invoked when a file is published, either by a `publishDir` directive
132-
* or a workflow output.
133+
* Invoked when a workflow output receives a value from
134+
* a dataflow source.
135+
*
136+
* For a given published value, this event is guaranteed
137+
* to be emitted after all files in that value have been
138+
* published via onFilePublish().
133139
*
134140
* @param event
135141
*/
136-
default void onFilePublish(FilePublishEvent event) {}
142+
default void onWorkflowPublish(WorkflowPublishEvent event) {}
143+
144+
/**
145+
* Invoked when a workflow output is completed.
146+
*
147+
* For a given workflow output, this event is guaranteed
148+
* to be emitted after all values for that output have been
149+
* published via onWorkflowPublish().
150+
*
151+
* @param event
152+
*/
153+
default void onWorkflowOutput(WorkflowOutputEvent event) {}
137154

138155
}

modules/nextflow/src/main/groovy/nextflow/trace/event/FilePublishEvent.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import groovy.transform.Canonical
2222
import groovy.transform.CompileStatic
2323

2424
/**
25-
* Models a file publish event.
25+
* Models a file publish event, which is emitted for each file
26+
* that is published in a workflow output or publishDir.
2627
*
2728
* @author Ben Sherman <bentshermann@gmail.com>
2829
*/

modules/nextflow/src/main/groovy/nextflow/trace/event/WorkflowOutputEvent.groovy

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import groovy.transform.Canonical
2222
import groovy.transform.CompileStatic
2323

2424
/**
25-
* Models a workflow output event.
25+
* Models a workflow output event, which is emitted for each
26+
* workflow output when it is completed.
2627
*
2728
* @author Ben Sherman <bentshermann@gmail.com>
2829
*/
@@ -34,7 +35,13 @@ class WorkflowOutputEvent {
3435
*/
3536
String name
3637
/**
37-
* The value of the workflow output. It will be null if the index file was specified.
38+
* The value of the workflow output.
39+
*
40+
* If the source is a dataflow channel. this value is an unordered
41+
* collection of the published values from the channel. If the source
42+
* is a dataflow value, this value is the published value.
43+
*
44+
* If the index file was enabled, this value is null.
3845
*/
3946
Object value
4047
/**
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.trace.event
18+
19+
import groovy.transform.Canonical
20+
import groovy.transform.CompileStatic
21+
22+
/**
23+
* Models a workflow publish event, which is emitted for each value
24+
* that is published to a workflow output from a dataflow source.
25+
*
26+
* @author Rob Syme <rob.syme@gmail.com>
27+
*/
28+
@Canonical
29+
@CompileStatic
30+
class WorkflowPublishEvent {
31+
/**
32+
* The name of the workflow output.
33+
*/
34+
String name
35+
/**
36+
* The published value.
37+
*
38+
* File paths from the work directory are normalized to
39+
* their corresponding path in the output directory.
40+
*/
41+
Object value
42+
}

modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import nextflow.Session
1010
import nextflow.SysEnv
1111
import nextflow.trace.event.FilePublishEvent
1212
import nextflow.trace.event.WorkflowOutputEvent
13+
import nextflow.trace.event.WorkflowPublishEvent
1314
import spock.lang.Specification
1415
/**
1516
*
@@ -88,6 +89,8 @@ class OutputDslTest extends Specification {
8889
and:
8990
1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('foo/file1.txt'), null))
9091
1 * session.notifyFilePublish(new FilePublishEvent(file2, outputDir.resolve('barbar/file2.txt'), ['foo', 'bar']))
92+
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', outputDir.resolve('foo/file1.txt')))
93+
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('bar', outputDir.resolve('barbar/file2.txt')))
9194
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('foo/file1.txt')], null))
9295
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('bar', [outputDir.resolve('barbar/file2.txt')], outputDir.resolve('index.csv')))
9396
1 * session.notifyFilePublish(new FilePublishEvent(null, outputDir.resolve('index.csv'), ['foo', 'bar']))
@@ -128,6 +131,7 @@ class OutputDslTest extends Specification {
128131
outputDir.resolve('file1.txt').text == 'Hello'
129132
and:
130133
1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('file1.txt'), null))
134+
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', outputDir.resolve('file1.txt')))
131135
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('file1.txt')], null))
132136

133137
cleanup:
@@ -166,6 +170,7 @@ class OutputDslTest extends Specification {
166170
await(dsl)
167171
then:
168172
0 * session.notifyFilePublish(_)
173+
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', record))
169174
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [ record ], null))
170175

171176
cleanup:

0 commit comments

Comments
 (0)