Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bbc21d8
add fromLineage channel factory
jorgee Apr 24, 2025
fbdbeef
include query factory
jorgee Apr 24, 2025
3aa4f0f
add published files in output, support queries in fromPath
jorgee Apr 25, 2025
cdd9e89
rename fromLinageQuery to queryLineage
jorgee Apr 25, 2025
62a2cfa
lineage API refactor and remove other implementations
jorgee Apr 29, 2025
3e37728
Correct lineage function comment
jorgee Apr 29, 2025
ba309cd
Merge branch 'master' into lineage-factory
jorgee Apr 29, 2025
6abae3c
Convert lineage from operator to function and add documentation
jorgee Apr 29, 2025
00f7f2e
Merge branch 'master' into lineage-factory
jorgee Apr 30, 2025
584c4ee
remove que query in view and fromPath
jorgee Apr 30, 2025
4974bee
improve error message [ci fast]
jorgee Apr 30, 2025
b5a2671
Merge branch 'master' into lineage-factory
bentsherman Apr 30, 2025
117ef30
Update docs
bentsherman Apr 30, 2025
321c02a
cleanup
bentsherman Apr 30, 2025
224d92d
change queryLineage to return file outputs
jorgee May 2, 2025
9136ccc
update docs
jorgee May 2, 2025
6bf2c75
Merge branch 'master' into lineage-factory
jorgee May 2, 2025
689cdcc
fixes from merge
jorgee May 2, 2025
9d6b77a
fix LinPath getFileName bug
jorgee May 2, 2025
e77a9c0
cleanup
bentsherman May 2, 2025
064349a
Update migration notes
bentsherman May 2, 2025
251c06e
Rename queryLineage -> fromLineage
bentsherman May 2, 2025
8b4ca1e
Rnemae "metadata object" -> "lineage record"
bentsherman May 2, 2025
880461c
Merge branch 'master' into lineage-factory
bentsherman May 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/Channel.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package nextflow

import nextflow.extension.LinChannelEx
import nextflow.plugin.Plugins

import static nextflow.util.CheckHelper.*

import java.nio.file.FileSystem
Expand Down Expand Up @@ -657,4 +660,42 @@ class Channel {
fromPath0Future = future.exceptionally(Channel.&handlerException)
}

}
static DataflowWriteChannel fromLineage(String uri) {
final result = CH.create()
if( NF.isDsl2() ) {
session.addIgniter { fromLineage0(result, uri) }
}
else {
fromLineage0(result, uri )
}
return result
}

private static void fromLineage0(DataflowWriteChannel channel, String uri) {
final operation = Plugins.getExtension(LinChannelEx)
if( !operation )
throw new IllegalStateException("Unable to load lineage extensions.")
def future = CompletableFuture.runAsync( { operation.viewLineage(session, channel, new URI(uri)) } as Runnable)
future.exceptionally(this.&handlerException)
}

static DataflowWriteChannel queryLineage(String queryString) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just use a Map here instead of query string. I would apply the same change to the find command as well. There is no need to add the extra complexity of URL encoding

final result = CH.create()
if( NF.isDsl2() ) {
session.addIgniter { queryLineage0(result, queryString) }
}
else {
queryLineage0(result, queryString )
}
return result
}

private static void queryLineage0(DataflowWriteChannel channel, String query) {
final operation = Plugins.getExtension(LinChannelEx)
if( !operation )
throw new IllegalStateException("Unable to load lineage extensions.")
def future = CompletableFuture.runAsync( { operation.queryLineage(session, channel, query) } as Runnable)
future.exceptionally(this.&handlerException)
}

}
2 changes: 1 addition & 1 deletion modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ class Session implements ISession {
this.dag = new DAG()

// -- init output dir
this.outputDir = FileHelper.toCanonicalPath(config.outputDir ?: 'results')
this.outputDir = FileHelper.toCanonicalPath(config.outputDir ?: config.navigate('params.outdir') ?: 'results')

// -- init work dir
this.workDir = FileHelper.toCanonicalPath(config.workDir ?: 'work')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package nextflow.extension

import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Session

interface LinChannelEx {
void viewLineage(Session session, DataflowWriteChannel channel, URI uri)

void queryLineage(Session session, DataflowWriteChannel channel, String query)
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class PathVisitor {
applyRegexPattern0(filePattern)

else if( filePattern != null )
applyGlobPattern0(filePattern as Path)
applyPathPattern0(filePattern as Path)

else
throw new IllegalArgumentException("Missing file pattern argument")
Expand Down Expand Up @@ -103,6 +103,27 @@ class PathVisitor {
target.bind(STOP)
}

private void applyPathPattern0(Path filePattern) {
if( isQuery(filePattern) )
applyQueryablePath0(filePattern as QueryablePath)
else
applyGlobPattern0(filePattern)
}

private static boolean isQuery(Path filePattern) {
log.debug("Checking if query: $filePattern.class ")
return filePattern instanceof QueryablePath && (filePattern as QueryablePath).hasQuery()
}

private boolean applyQueryablePath0(QueryablePath path) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to add query support to fromPath because it will already be supported through queryLineage. Besides I would like to get away from using URI query params everywhere.

The queryLineage factory should return a channel of metadata objects, so you can chain it with a map operator to extract the actual files

Copy link
Contributor Author

@jorgee jorgee Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems one of the use cases that would be nice to support is to get outputs annotated with some metadata. This is only possible to do with a query, so I implemented it within fromPath and queryLineage to see what's the best option.

I had doubts about what queryLineage should return. Command find is returning the lids, not the objects. I returned the LinPaths thinking in the mentioned use case. If the LinPath is a FileOutput, it accesses the real file, checking the integrity. If we return the FileOutput description, users must get the path and check the integrity.

Maybe the best is returning the lid, letting the user do what they want according to the query. If it is cast to Path, it will be converted to a LinPath, and if it is passed to the lineage function, it will be converted to the object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the best is returning the lid, letting the user do what they want according to the query.

This is exactly it. The lineage function and queryLineage factory should return just the metadata descriptions. Users can compose these with other operators to do things like get the actual files.

Meanwhile, fromPath should only be used to get individual files.

final paths = path.resolveQuery()
if( !paths )
throw new FileNotFoundException("No files found for ${path}")

paths.forEach { emit0(it) }
close0()
}

private void applyGlobPattern0(Path filePattern) {

final glob = opts?.containsKey('glob') ? opts.glob as boolean : true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package nextflow.file

import java.nio.file.Path

/**
* Interface to indicate a Path could contain a query that is resolved to several real paths.
*
* @author Jorge Ejarque <jorge.ejarque@seqera.io>
*/
interface QueryablePath {
boolean hasQuery();
List<Path> resolveQuery();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2013-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.lineage

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.Session
import nextflow.extension.LinChannelEx
import nextflow.lineage.fs.LinPath
import nextflow.lineage.fs.LinPathFactory
import nextflow.lineage.serde.LinSerializable

/**
* Lineage channel extensions
*
* @author Jorge Ejarque <jorge.ejarque@seqera.io>
*/
@CompileStatic
@Slf4j
class LinChanneExImpl implements LinChannelEx{

void viewLineage(Session session, DataflowWriteChannel channel, URI uri) {
final store = getStore(session)
emitResults(channel, LinUtils.query(store, uri))
channel.bind(Channel.STOP)
}

void queryLineage(Session session, DataflowWriteChannel channel, String query) {
final store = getStore(session)
emitSearchResults(channel, store.search(query))
channel.bind(Channel.STOP)
}


protected LinStore getStore(Session session){
final store = LinStoreFactory.getOrCreate(session)
if( !store ) {
throw new Exception("Lineage store not found - Check Nextflow configuration")
}
return store
}

private static void emitResults(DataflowWriteChannel channel, Collection results){
if( !results ) {
return
}
// Remove nested collections of a single element
if( results.size() == 1 ) {
final entry = results[0]
if( entry instanceof Collection ) {
emitResults(channel, entry)
} else {
channel.bind(LinUtils.encodeSearchOutputs(entry))
}
} else
results.forEach { channel.bind(LinUtils.encodeSearchOutputs(it)) }
}

private void emitSearchResults(DataflowWriteChannel channel, Map<String, LinSerializable> results) {
if( !results ) {
return
}
results.keySet().forEach { channel.bind(LinPathFactory.create(LinPath.LID_PROT + it)) }
}
}
11 changes: 9 additions & 2 deletions modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class LinObserver implements TraceObserver {
private Session session
private WorkflowOutput workflowOutput
private Map<String,String> outputsStoreDirLid = new HashMap<String,String>(10)
private Set<String> publishedFiles = new HashSet<String>()
private PathNormalizer normalizer

LinObserver(Session session, LinStore store){
Expand Down Expand Up @@ -124,6 +125,10 @@ class LinObserver implements TraceObserver {
@Override
void onFlowComplete(){
if (this.workflowOutput){
//Add publishedFiles
for (String path: publishedFiles){
workflowOutput.output.add(new Parameter(Path.simpleName, null, path))
}
workflowOutput.createdAt = OffsetDateTime.now()
final key = executionHash + '#output'
this.store.save(key, workflowOutput)
Expand Down Expand Up @@ -360,6 +365,7 @@ class LinObserver implements TraceObserver {
LinUtils.toDate(attrs?.lastModifiedTime()),
convertAnnotations(annotations))
store.save(key, value)
publishedFiles.add(asUriString(key))
} catch (Throwable e) {
log.warn("Unexpected error storing published file '${destination.toUriString()}' for workflow '${executionHash}'", e)
}
Expand Down Expand Up @@ -411,8 +417,9 @@ class LinObserver implements TraceObserver {
private Object convertPathsToLidReferences(Object value){
if( value instanceof Path ) {
try {
final key = getWorkflowOutputKey(value)
return asUriString(key)
final key = asUriString(getWorkflowOutputKey(value))
publishedFiles.remove(key)
return key
} catch (Throwable e){
//Workflow output key not found
return value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ class LinUtils {
* @param output Output to encode
* @return Output encoded as a JSON string
*/
static String encodeSearchOutputs(Object output, boolean prettyPrint) {
static String encodeSearchOutputs(Object output, boolean prettyPrint = false) {
if (output instanceof LinSerializable) {
return new LinEncoder().withPrettyPrint(prettyPrint).encode(output)
} else {
Expand Down
Loading
Loading