Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import nextflow.trace.WorkflowStatsObserver
import nextflow.trace.event.FilePublishEvent
import nextflow.trace.event.TaskEvent
import nextflow.trace.event.WorkflowOutputEvent
import nextflow.trace.event.WorkflowPublishEvent
import nextflow.util.Barrier
import nextflow.util.ConfigHelper
import nextflow.util.Duration
Expand Down Expand Up @@ -1071,6 +1072,10 @@ class Session implements ISession {
notifyEvent(observersV2, ob -> ob.onWorkflowOutput(event))
}

void notifyWorkflowPublish(WorkflowPublishEvent event) {
notifyEvent(observersV2, ob -> ob.onWorkflowPublish(event))
}

void notifyFilePublish(FilePublishEvent event) {
notifyEvent(observersV1, ob -> ob.onFilePublish(event.target, event.source))
notifyEvent(observersV2, ob -> ob.onFilePublish(event))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import nextflow.exception.ScriptRuntimeException
import nextflow.processor.PublishDir
import nextflow.trace.event.FilePublishEvent
import nextflow.trace.event.WorkflowOutputEvent
import nextflow.trace.event.WorkflowPublishEvent
import nextflow.util.CsvWriter
/**
* Publish a workflow output.
Expand Down Expand Up @@ -114,6 +115,9 @@ class PublishOp {
final normalized = normalizePaths(value, targetResolver)
log.trace "Normalized record for index file: ${normalized}"
indexRecords << normalized

// notify observers
session.notifyWorkflowPublish(new WorkflowPublishEvent(name, normalized))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import nextflow.processor.TaskProcessor
import nextflow.trace.event.FilePublishEvent
import nextflow.trace.event.TaskEvent
import nextflow.trace.event.WorkflowOutputEvent
import nextflow.trace.event.WorkflowPublishEvent

/**
* Interface for consuming workflow lifecycle events.
Expand Down Expand Up @@ -127,6 +128,15 @@ interface TraceObserverV2 {
*/
default void onWorkflowOutput(WorkflowOutputEvent event) {}

/**
* Invoked when a value is being published to a workflow output.
* Unlike {@link #onWorkflowOutput} which fires after all values are
* published, this fires for each value as it is published.
*
* @param event
*/
default void onWorkflowPublish(WorkflowPublishEvent event) {}

/**
* Invoked when a file is published, either by a `publishDir` directive
* or a workflow output.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.trace.event

import groovy.transform.Canonical
import groovy.transform.CompileStatic

/**
* Models a workflow publish event, fired for each value
* as it is published to a workflow output.
*
* @author Rob Syme <rob.syme@gmail.com>
*/
@Canonical
@CompileStatic
class WorkflowPublishEvent {
/**
* The name of the workflow output.
*/
String name
/**
* The value being published.
*/
Object value
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import nextflow.Session
import nextflow.SysEnv
import nextflow.trace.event.FilePublishEvent
import nextflow.trace.event.WorkflowOutputEvent
import nextflow.trace.event.WorkflowPublishEvent
import spock.lang.Specification
/**
*
Expand Down Expand Up @@ -86,6 +87,8 @@ class OutputDslTest extends Specification {
"${outputDir}/barbar/file2.txt"
""".stripIndent()
and:
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', file1))
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('bar', file2))
1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('foo/file1.txt'), null))
1 * session.notifyFilePublish(new FilePublishEvent(file2, outputDir.resolve('barbar/file2.txt'), ['foo', 'bar']))
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('foo/file1.txt')], null))
Expand Down Expand Up @@ -127,6 +130,7 @@ class OutputDslTest extends Specification {
then:
outputDir.resolve('file1.txt').text == 'Hello'
and:
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', file1))
1 * session.notifyFilePublish(new FilePublishEvent(file1, outputDir.resolve('file1.txt'), null))
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [outputDir.resolve('file1.txt')], null))

Expand Down Expand Up @@ -165,6 +169,7 @@ class OutputDslTest extends Specification {
dsl.apply(session)
await(dsl)
then:
1 * session.notifyWorkflowPublish(new WorkflowPublishEvent('foo', record))
0 * session.notifyFilePublish(_)
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [ record ], null))

Expand Down
Loading