Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2013-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.script

import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import groovy.util.logging.Slf4j

/**
* Store the workflow platform-related metadata
*
* @author Jorge Ejarque <[email protected]>
*/
@Slf4j
@CompileStatic
@ToString(includeNames = true, includePackage = false)
@EqualsAndHashCode
class PlatformMetadata {

@Canonical
static class User {
String id
String userName
String email
String firstName
String lastName
String organization

User(Map opts) {
id = opts.id as String
userName = opts.userName as String
email = opts.email as String
firstName = opts.firstName as String
lastName = opts.lastName as String
organization = opts.organization as String
}
}

@Canonical
static class Workspace {
String id
String name
String fullName
String organization

Workspace(Map opts) {
id = opts.workspaceId as String
name = opts.workspaceName as String
fullName = opts.workspaceFullName
organization = opts.orgName as String
}
}

@Canonical
static class ComputeEnv {
String id
String name
String platform

ComputeEnv(Map opts) {
id = opts.id as String
name = opts.name as String
platform = opts.platform as String
}
}

@Canonical
static class Pipeline {
String id
String name
String revision
String commitId

Pipeline(Map opts) {
id = opts.id as String
name = opts.name as String
revision = opts.revision as String
commitId = opts.commitId as String
}
}

String workflowId
User user
Workspace workspace
ComputeEnv computeEnv
Pipeline pipeline
List labels

PlatformMetadata(String id, User user, Workspace workspace, ComputeEnv computeEnv, Pipeline pipeline, List labels) {
this.workflowId = id
this.user = user
this.workspace = workspace
this.computeEnv = computeEnv
this.pipeline = pipeline
this.labels = labels
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ class WorkflowMetadata {
*/
FusionMetadata fusion

/**
* Workflow metadata associated to the Seqera Platform execution.
*/
PlatformMetadata seqeraPlatform

/**
* The list of files that concurred to create the config object
*/
Expand Down
24 changes: 23 additions & 1 deletion modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.lineage

import nextflow.NextflowMeta
import nextflow.extension.FilesEx
import nextflow.lineage.exception.OutputRelativePathException

import static nextflow.lineage.fs.LinPath.*
Expand Down Expand Up @@ -71,6 +73,9 @@ import nextflow.util.TestOnly
@Slf4j
@CompileStatic
class LinObserver implements TraceObserverV2 {
private static List<String> workflowMetadataPropertiesToRemove = [
"completed", "duration", "exitStatus", "errorMessage", "errorReport", "stats", "success" // Only existing at the end
]
private static Map<Class<? extends BaseParam>, String> taskParamToValue = [
(StdOutParam) : "stdout",
(StdInParam) : "stdin",
Expand Down Expand Up @@ -163,7 +168,8 @@ class LinObserver implements TraceObserverV2 {
session.uniqueId.toString(),
session.runName,
getNormalizedParams(session.params, normalizer),
SecretHelper.hideSecrets(session.config.deepClone()) as Map
SecretHelper.hideSecrets(session.config.deepClone()) as Map,
addOtherMetadata(normalizer)
)
final executionHash = CacheHelper.hasher(value).hash().toString()
store.save(executionHash, value)
Expand Down Expand Up @@ -474,4 +480,20 @@ class LinObserver implements TraceObserverV2 {
}
return paths
}

private Map addOtherMetadata(PathNormalizer normalizer) {
try {
def metadata = session.workflowMetadata.toMap()
.collectEntries { it.value instanceof Path ? [it.key, FilesEx.toUriString(it.value as Path) ] : [it.key, it.value] }
metadata.removeAll {it.key.toString() in workflowMetadataPropertiesToRemove }
if( metadata.containsKey("nextflow") )
metadata["nextflow"] = (metadata["nextflow"] as NextflowMeta).toJsonMap()
if( metadata.containsKey("configFiles") )
metadata["configFiles"] = (metadata["configFiles"] as List<Path>).collect {normalizer.normalizePath(it)}
return metadata
}catch( Throwable e){
log.debug("Error creating metadata", e)
return [:]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ class WorkflowRun implements LinSerializable {
* Resolved Configuration
*/
Map config
/**
* Raw metadata
*/
Map metadata

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package nextflow.lineage

import nextflow.extension.FilesEx
import nextflow.lineage.exception.OutputRelativePathException

import java.nio.file.Files
Expand Down Expand Up @@ -165,13 +166,26 @@ class LinObserverTest extends Specification {
def store = new DefaultLinStore();
def uniqueId = UUID.randomUUID()
def scriptFile = folder.resolve("main.nf")
def map = [
repository: "https://nextflow.io/nf-test/",
commitId: "123456",
scriptId: "78910",
scriptFile: scriptFile,
projectDir: folder.resolve("projectDir"),
revision: "main",
projectName: "nextflow.io/nf-test",
workDir: folder.resolve("workDir")
]
def metadata = Mock(WorkflowMetadata){
getRepository() >> "https://nextflow.io/nf-test/"
getCommitId() >> "123456"
getScriptId() >> "78910"
getScriptFile() >> scriptFile
getProjectDir() >> folder.resolve("projectDir")
getWorkDir() >> folder.resolve("workDir")
getRepository() >> map.repository
getCommitId() >> map.commitId
getScriptId() >> map.scriptId
getScriptFile() >> map.scriptFile
getProjectDir() >> map.projectDir
getRevision() >> map.revision
getProjectName() >> map.projectName
getWorkDir() >> map.workDir
toMap() >> map
}
def session = Mock(Session) {
getConfig() >> config
Expand All @@ -183,8 +197,8 @@ class LinObserverTest extends Specification {
store.open(LineageConfig.create(session))
def observer = new LinObserver(session, store)
def mainScript = new DataPath("file://${scriptFile.toString()}", new Checksum("78910", "nextflow", "standard"))
def workflow = new Workflow([mainScript],"https://nextflow.io/nf-test/", "123456" )
def workflowRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [], config)
def workflow = new Workflow([mainScript], map.repository, map.commitId)
def workflowRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [], config, map)
when:
observer.onFlowCreate(session)
observer.onFlowBegin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,40 @@ class LinEncoderTest extends Specification{
given:
def encoder = new LinEncoder()
and:
def config = [
process: [
container : "quay.io/nextflow/bash",
executor : "local",
resourceLabels: ["owner": "xxx"],
scratch : false
]
]
def metadata = [
runName : "big_kare",
start : "2025-11-06T13:35:42.049135334Z",
container : "quay.io/nextflow/bash",
commandLine : "nextflow run 'https://github.com/nextflow-io/hello' -name big_kare -with-tower -r master",
nextflow : [version: "25.10.0", enable: [dsl: 2.0]],
containerEngine: "docker",
wave : [enabled: true],
fusion : [enabled: true, version: "2.4"],
seqeraPlatform : [
workflowId: "wf1234",
user : [id: "xxx", userName: "john-smith", email: "[email protected]", firstName: "John", lastName: "Smith", organization: "acme"],
workspace : [id: "1234", name: "test-workspace", fullName: "Test workspace", organization: "acme"],
computeEnv: [id: "ce3456", name: "test-ce", platform: "aws-cloud"],
pipeline : [id: "pipe294", name: "https://github.com/nextflow-io/hello", revision: "master", commitId: null],
labels : []
],
failOnIgnore : false
]
def uniqueId = UUID.randomUUID()
def mainScript = new DataPath("file://path/to/main.nf", new Checksum("78910", "nextflow", "standard"))
def workflow = new Workflow([mainScript], "https://nextflow.io/nf-test/", "123456")
def wfRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")])
def wfRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run",
[new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")],
config, metadata
)

when:
def encoded = encoder.encode(wfRun)
Expand All @@ -82,6 +112,68 @@ class LinEncoderTest extends Specification{
result.name == "test_run"
result.params.size() == 2
result.params.get(0).name == "param1"
result.config.process.container == "quay.io/nextflow/bash"
result.config.process.executor == "local"
result.metadata.seqeraPlatform.workflowId == "wf1234"

}

def 'should decode WorkflowRuns without metadata'(){
given:
def encoder = new LinEncoder()
def wfRunStr = '''
{
"version": "lineage/v1beta1",
"kind": "WorkflowRun",
"spec": {
"workflow": {
"scriptFiles": [
{
"path": "https://github.com/nextflow-io/hello/main.nf",
"checksum": {
"value": "78910",
"algorithm": "nextflow",
"mode": "standard"
}
}
],
"repository": "https://github.com/nextflow-io/hello",
"commitId": "2ce0b0e2943449188092a0e25102f0dadc70cb0a"
},
"sessionId": "4f02559e-9ebd-41d8-8ee2-a8d1e4f09c67",
"name": "test_run",
"params": [],
"config": {
"process": {
"container": "quay.io/nextflow/bash",
"executor": "local",
"resourceLabels": {
"owner": "xxx"
},
"scratch": false
}
}
}
}
'''
when:
def object = encoder.decode(wfRunStr)

then:
object instanceof WorkflowRun
def result = object as WorkflowRun
result.workflow instanceof Workflow
result.workflow.scriptFiles.first instanceof DataPath
result.workflow.scriptFiles.first.path == "https://github.com/nextflow-io/hello/main.nf"
result.workflow.scriptFiles.first.checksum instanceof Checksum
result.workflow.scriptFiles.first.checksum.value == "78910"
result.workflow.commitId == "2ce0b0e2943449188092a0e25102f0dadc70cb0a"
result.sessionId == "4f02559e-9ebd-41d8-8ee2-a8d1e4f09c67"
result.name == "test_run"
result.params.size() == 0
result.config.process.container == "quay.io/nextflow/bash"
result.config.process.executor == "local"
result.metadata == null
}

def 'should encode and decode WorkflowOutputs'(){
Expand Down
Loading
Loading