Skip to content

Commit 21a6470

Browse files
bentshermanjorgee
andauthored
Fix bugs with workflow output and lineage (#6254)
--------- Signed-off-by: Ben Sherman <[email protected]> Signed-off-by: jorgee <[email protected]> Co-authored-by: jorgee <[email protected]>
1 parent b48c069 commit 21a6470

File tree

6 files changed

+101
-5
lines changed

6 files changed

+101
-5
lines changed

docs/workflow.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,10 @@ output {
572572

573573
Each `>>` specifies a *source file* and *publish target*. The source file should be a file or collection of files, and the publish target should be a directory or file name. If the publish target ends with a slash, it is treated as the directory in which source files are published. Otherwise, it is treated as the target filename of a source file. Only files that are published with the `>>` operator are saved to the output directory.
574574

575+
:::{note}
576+
Files that do not originate from the work directory are not published.
577+
:::
578+
575579
### Index files
576580

577581
Each output can create an index file of the values that were published. An index file preserves the structure of channel values, including metadata, which is simpler than encoding this information with directories and file names. The index file can be a CSV (`.csv`), JSON (`.json`), or YAML (`.yml`, `.yaml`) file. The channel values should be files, lists, or maps.
@@ -628,6 +632,10 @@ This example will produce the following index file:
628632
"3"|"sample 3"|"results/fastq/3a.fastq"|"results/fastq/3b.fastq"
629633
```
630634

635+
:::{note}
636+
Files that do not originate from the work directory are not published, but are included in the index file.
637+
:::
638+
631639
See [Output directives](#output-directives) for the list of available index directives.
632640

633641
### Output directives

modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,8 @@ class PublishOp {
269269
return normalizePath(el, targetResolver)
270270
if( el instanceof Collection<Path> )
271271
return normalizePaths(el, targetResolver)
272+
if( el instanceof Map )
273+
return normalizePaths(el, targetResolver)
272274
return el
273275
}
274276
}
@@ -281,6 +283,8 @@ class PublishOp {
281283
return Map.entry(k, normalizePath(v, targetResolver))
282284
if( v instanceof Collection<Path> )
283285
return Map.entry(k, normalizePaths(v, targetResolver))
286+
if( v instanceof Map )
287+
return Map.entry(k, normalizePaths(v, targetResolver))
284288
return Map.entry(k, v)
285289
}
286290
}

modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,44 @@ class OutputDslTest extends Specification {
135135
root?.deleteDir()
136136
}
137137

138+
def 'should preserve non-task output files in workflow output'() {
139+
given:
140+
def root = Files.createTempDirectory('test')
141+
def outputDir = root.resolve('results')
142+
def workDir = root.resolve('work')
143+
def inputDir = root.resolve('inputs'); Files.createDirectories(inputDir)
144+
def file1 = inputDir.resolve('file1.txt'); file1.text = 'Hello'
145+
def file2 = inputDir.resolve('file2.txt'); file2.text = 'world'
146+
def record = [id: '1', file1: file1, file2: file2]
147+
and:
148+
def session = Mock(Session) {
149+
getOutputs() >> [:]
150+
getConfig() >> [:]
151+
getOutputDir() >> outputDir
152+
getWorkDir() >> workDir
153+
}
154+
Global.session = session
155+
and:
156+
assignOutput(session, 'foo', [ record ])
157+
and:
158+
def dsl = new OutputDsl()
159+
and:
160+
SysEnv.push(NXF_FILE_ROOT: root.toString())
161+
162+
when:
163+
dsl.declare('foo') {
164+
}
165+
dsl.apply(session)
166+
await(dsl)
167+
then:
168+
0 * session.notifyFilePublish(_)
169+
1 * session.notifyWorkflowOutput(new WorkflowOutputEvent('foo', [ record ], null))
170+
171+
cleanup:
172+
SysEnv.pop()
173+
root?.deleteDir()
174+
}
175+
138176
def 'should set publish options in output declaration' () {
139177
when:
140178
def dsl1 = new OutputDsl.DeclareDsl()

modules/nf-commons/src/main/nextflow/serde/gson/GsonEncoder.groovy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package nextflow.serde.gson
1818

1919
import java.lang.reflect.Type
20+
import java.nio.file.Path
2021
import java.time.Instant
2122
import java.time.OffsetDateTime
2223

@@ -80,6 +81,7 @@ abstract class GsonEncoder<T> implements Encoder<T, String> {
8081
builder.registerTypeAdapter(Instant.class, new InstantAdapter())
8182
builder.registerTypeAdapter(OffsetDateTime.class, new OffsetDateTimeAdapter())
8283
builder.registerTypeAdapter(GStringImpl.class, new GStringSerializer())
84+
builder.registerTypeHierarchyAdapter(Path.class, new PathAdapter())
8385
if( factory )
8486
builder.registerTypeAdapterFactory(factory)
8587
if( prettyPrint )
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2013-2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.serde.gson
18+
19+
import com.google.gson.TypeAdapter
20+
import com.google.gson.stream.JsonReader
21+
import com.google.gson.stream.JsonWriter
22+
import nextflow.file.FileHelper
23+
24+
import java.nio.file.Path
25+
26+
class PathAdapter extends TypeAdapter<Path> {
27+
@Override
28+
void write(JsonWriter writer, Path value) throws IOException {
29+
writer.value(value?.toUriString())
30+
}
31+
32+
@Override
33+
Path read(JsonReader reader) throws IOException {
34+
if (reader.peek() == JsonToken.NULL) {
35+
reader.nextNull()
36+
return null
37+
}
38+
return FileHelper.asPath(reader.nextString())
39+
}
40+
}

modules/nf-lineage/src/test/nextflow/lineage/serde/LinEncoderTest.groovy

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package nextflow.lineage.serde
1818

19+
import java.nio.file.Path
1920
import java.time.OffsetDateTime
2021

2122
import nextflow.lineage.model.v1beta1.Checksum
@@ -83,22 +84,25 @@ class LinEncoderTest extends Specification{
8384
result.params.get(0).name == "param1"
8485
}
8586

86-
def 'should encode and decode WorkflowResults'(){
87+
def 'should encode and decode WorkflowOutputs'(){
8788
given:
8889
def encoder = new LinEncoder()
8990
and:
9091
def time = OffsetDateTime.now()
91-
def wfResults = new WorkflowOutput(time, "lid://1234", [new Parameter("String", "a", "A"), new Parameter("String", "b", "B")])
92+
def wfResults = new WorkflowOutput(time, "lid://1234", [
93+
new Parameter("Collection", "a", [id: 'id', file: 'sample.txt' as Path]),
94+
new Parameter("String", "b", "B")
95+
])
96+
9297
when:
9398
def encoded = encoder.encode(wfResults)
9499
def object = encoder.decode(encoded)
95-
96100
then:
97101
object instanceof WorkflowOutput
98102
def result = object as WorkflowOutput
99103
result.createdAt == time
100104
result.workflowRun == "lid://1234"
101-
result.output == [new Parameter("String", "a", "A"), new Parameter("String", "b", "B")]
105+
result.output == [new Parameter("Collection", "a", [id: 'id', file: 'sample.txt']), new Parameter("String", "b", "B")]
102106
}
103107

104108
def 'should encode and decode TaskRun'() {
@@ -133,7 +137,7 @@ class LinEncoderTest extends Specification{
133137
result.binEntries.get(0).checksum.value == "78910"
134138
}
135139

136-
def 'should encode and decode TaskResults'(){
140+
def 'should encode and decode TaskOutputs'(){
137141
given:
138142
def encoder = new LinEncoder()
139143
and:

0 commit comments

Comments
 (0)