Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import nextflow.trace.WorkflowStatsObserver
import nextflow.trace.event.FilePublishEvent
import nextflow.trace.event.TaskEvent
import nextflow.trace.event.WorkflowOutputEvent
import nextflow.trace.event.WorkflowPublishEvent
import nextflow.util.Barrier
import nextflow.util.ConfigHelper
import nextflow.util.Duration
Expand Down Expand Up @@ -1067,15 +1068,19 @@ class Session implements ISession {
notifyEvent(observersV2, ob -> ob.onFlowCreate(this))
}

void notifyWorkflowOutput(WorkflowOutputEvent event) {
notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event))
}

void notifyFilePublish(FilePublishEvent event) {
notifyEvent(observersV1, ob -> ob.onFilePublish(event.target, event.source))
notifyEvent(observersV2, ob -> ob.onFilePublish(event))
}

void notifyWorkflowPublish(WorkflowPublishEvent event) {
notifyEvent(observersV2, ob -> ob.onWorkflowPublish(event))
}

void notifyWorkflowOutput(WorkflowOutputEvent event) {
notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event))
}

void notifyFlowComplete() {
notifyEvent(observersV1, ob -> ob.onFlowComplete())
notifyEvent(observersV2, ob -> ob.onFlowComplete())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import nextflow.exception.ScriptRuntimeException
import nextflow.processor.PublishDir
import nextflow.trace.event.FilePublishEvent
import nextflow.trace.event.WorkflowOutputEvent
import nextflow.trace.event.WorkflowPublishEvent
import nextflow.util.CsvWriter
/**
* Publish a workflow output.
Expand All @@ -51,7 +52,7 @@ class PublishOp {

private IndexOpts indexOpts

private List indexRecords = []
private List publishedValues = []

private volatile boolean complete

Expand Down Expand Up @@ -86,7 +87,7 @@ class PublishOp {
* @param value
*/
protected void onNext(value) {
log.trace "Publish operator received: $value"
log.trace "Received value for workflow output '${name}': ${value}"

// evaluate dynamic path
final targetResolver = getTargetDir(value)
Expand All @@ -110,10 +111,12 @@ class PublishOp {
publisher.apply(files, sourceDir)
}

// append record to index
final normalized = normalizePaths(value, targetResolver)
log.trace "Normalized record for index file: ${normalized}"
indexRecords << normalized
// publish value to workflow output
final normalizedValue = normalizeValue(value, targetResolver)

log.trace "Published value to workflow output '${name}': ${normalizedValue}"
publishedValues << normalizedValue
session.notifyWorkflowPublish(new WorkflowPublishEvent(name, normalizedValue))
}

/**
Expand Down Expand Up @@ -195,34 +198,34 @@ class PublishOp {
*/
protected void onComplete(nope) {
// publish individual record if source is a value channel
final value = CH.isValue(source)
? indexRecords.first()
: indexRecords
final outputValue = CH.isValue(source)
? publishedValues.first()
: publishedValues

// publish workflow output
final indexPath = indexOpts ? indexOpts.path : null
session.notifyWorkflowOutput(new WorkflowOutputEvent(name, value, indexPath))
session.notifyWorkflowOutput(new WorkflowOutputEvent(name, outputValue, indexPath))

// write value to index file
if( indexOpts ) {
final ext = indexPath.getExtension()
indexPath.parent.mkdirs()
if( ext == 'csv' ) {
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexPath)
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(publishedValues, indexPath)
}
else if( ext == 'json' ) {
indexPath.text = DumpHelper.prettyPrintJson(value)
indexPath.text = DumpHelper.prettyPrintJson(outputValue)
}
else if( ext == 'yaml' || ext == 'yml' ) {
indexPath.text = DumpHelper.prettyPrintYaml(value)
indexPath.text = DumpHelper.prettyPrintYaml(outputValue)
}
else {
log.warn "Invalid extension '${ext}' for index file '${indexPath}' -- should be CSV, JSON, or YAML"
}
session.notifyFilePublish(new FilePublishEvent(null, indexPath, publishOpts.labels as List))
}

log.trace "Publish operator complete"
log.trace "Completed workflow output '${name}'"
this.complete = true
}

Expand Down Expand Up @@ -260,7 +263,7 @@ class PublishOp {
* @param value
* @param targetResolver
*/
protected Object normalizePaths(value, targetResolver) {
protected Object normalizeValue(value, targetResolver) {
if( value instanceof Path ) {
return normalizePath(value, targetResolver)
}
Expand All @@ -270,9 +273,9 @@ class PublishOp {
if( el instanceof Path )
return normalizePath(el, targetResolver)
if( el instanceof Collection<Path> )
return normalizePaths(el, targetResolver)
return normalizeValue(el, targetResolver)
if( el instanceof Map )
return normalizePaths(el, targetResolver)
return normalizeValue(el, targetResolver)
return el
}
}
Expand All @@ -282,9 +285,9 @@ class PublishOp {
if( v instanceof Path )
return [k, normalizePath(v, targetResolver)]
if( v instanceof Collection<Path> )
return [k, normalizePaths(v, targetResolver)]
return [k, normalizeValue(v, targetResolver)]
if( v instanceof Map )
return [k, normalizePaths(v, targetResolver)]
return [k, normalizeValue(v, targetResolver)]
return [k, v]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import nextflow.processor.TaskProcessor
import nextflow.trace.event.FilePublishEvent
import nextflow.trace.event.TaskEvent
import nextflow.trace.event.WorkflowOutputEvent
import nextflow.trace.event.WorkflowPublishEvent

/**
* Interface for consuming workflow lifecycle events.
Expand Down Expand Up @@ -121,18 +122,34 @@ interface TraceObserverV2 {
default void onFlowError(TaskEvent event) {}

/**
* Invoked when a workflow output is published.
* Invoked when a file is published by a workflow output
* or `publishDir` directive.
*
* @param event
*/
default void onWorkflowOutput(WorkflowOutputEvent event) {}
default void onFilePublish(FilePublishEvent event) {}

/**
* Invoked when a file is published, either by a `publishDir` directive
* or a workflow output.
* Invoked when a workflow output receives a value from
* a dataflow source.
*
* For a given published value, this event is guaranteed
* to be emitted after all files in that value have been
* published via onFilePublish().
*
* @param event
*/
default void onFilePublish(FilePublishEvent event) {}
default void onWorkflowPublish(WorkflowPublishEvent event) {}

/**
* Invoked when a workflow output is completed.
*
* For a given workflow output, this event is guaranteed
* to be emitted after all values for that output have been
* published via onWorkflowPublish().
*
* @param event
*/
default void onWorkflowOutput(WorkflowOutputEvent event) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import groovy.transform.Canonical
import groovy.transform.CompileStatic

/**
* Models a file publish event.
* Models a file publish event, which is emitted for each file
* that is published in a workflow output or publishDir.
*
* @author Ben Sherman <[email protected]>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import groovy.transform.Canonical
import groovy.transform.CompileStatic

/**
* Models a workflow output event.
* Models a workflow output event, which is emitted for each
* workflow output when it is completed.
*
* @author Ben Sherman <[email protected]>
*/
Expand All @@ -34,7 +35,13 @@ class WorkflowOutputEvent {
*/
String name
/**
* The value of the workflow output. It will be null if the index file was specified.
* The value of the workflow output.
*
* If the source is a dataflow channel. this value is an unordered
* collection of the published values from the channel. If the source
* is a dataflow value, this value is the published value.
*
* If the index file was enabled, this value is null.
*/
Object value
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.trace.event

import groovy.transform.Canonical
import groovy.transform.CompileStatic

/**
* Models a workflow publish event, which is emitted for each value
* that is published to a workflow output from a dataflow source.
*
* @author Rob Syme <[email protected]>
*/
@Canonical
@CompileStatic
class WorkflowPublishEvent {
/**
* The name of the workflow output.
*/
String name
/**
* The published value.
*
* File paths from the work directory are normalized to
* their corresponding path in the output directory.
*/
Object value
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import nextflow.Session
import nextflow.SysEnv
import nextflow.trace.event.FilePublishEvent
import nextflow.trace.event.WorkflowOutputEvent
import nextflow.trace.event.WorkflowPublishEvent
import spock.lang.Specification
/**
*
Expand Down Expand Up @@ -88,6 +89,8 @@ class OutputDslTest extends Specification {
and:
1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('foo/file1.txt'), null))
1 * session.notifyFilePublish(new FilePublishEvent(file2, outputDir.resolve('barbar/file2.txt'), ['foo', 'bar']))
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', outputDir.resolve('foo/file1.txt')))
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('bar', outputDir.resolve('barbar/file2.txt')))
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('foo/file1.txt')], null))
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('bar', [outputDir.resolve('barbar/file2.txt')], outputDir.resolve('index.csv')))
1 * session.notifyFilePublish(new FilePublishEvent(null, outputDir.resolve('index.csv'), ['foo', 'bar']))
Expand Down Expand Up @@ -128,6 +131,7 @@ class OutputDslTest extends Specification {
outputDir.resolve('file1.txt').text == 'Hello'
and:
1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('file1.txt'), null))
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', outputDir.resolve('file1.txt')))
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('file1.txt')], null))

cleanup:
Expand Down Expand Up @@ -166,6 +170,7 @@ class OutputDslTest extends Specification {
await(dsl)
then:
0 * session.notifyFilePublish(_)
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', record))
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [ record ], null))

cleanup:
Expand Down
Loading