Skip to content

Commit f93f3ab

Browse files
committed
Add fromDataset() operator for Seqera Platform dataset downloads
Implements a new Channel.fromDataset() operator that downloads datasets from Seqera Platform. This feature enables workflows to fetch datasets directly from the platform API. Changes: - Add DatasetExplorer class for handling dataset download logic - Add Channel.fromDataset() factory method with support for dataset ID, version, and fileName parameters - Add comprehensive unit tests for DatasetExplorer - Use Tower access token authentication (TOWER_ACCESS_TOKEN env var or tower.accessToken config) Usage example: ch_input = Channel.fromList( samplesheetToList( fromDataset([fileName: 'samplesheet.csv'], params.input), "assets/schema_input.json" ) ) TODOs for future enhancements: - Support querying multiple datasets using list-datasets API - Support automatic version detection/latest version - Auto-detect fileName from dataset metadata Related to PR nextflow-io#6515 which added dataset upload functionality.
1 parent 5318812 commit f93f3ab

File tree

3 files changed

+350
-0
lines changed

3 files changed

+350
-0
lines changed

modules/nextflow/src/main/groovy/nextflow/Channel.groovy

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import groovyx.gpars.dataflow.DataflowWriteChannel
3636
import groovyx.gpars.dataflow.operator.ControlMessage
3737
import groovyx.gpars.dataflow.operator.PoisonPill
3838
import nextflow.dag.NodeMarker
39+
import nextflow.datasource.DatasetExplorer
3940
import nextflow.datasource.SraExplorer
4041
import nextflow.exception.AbortOperationException
4142
import nextflow.extension.CH
@@ -608,4 +609,52 @@ class Channel {
608609
fromPath0Future = future.exceptionally(Channel.&handlerException)
609610
}
610611

612+
/**
613+
* Download a dataset from Seqera Platform and return its content as a String
614+
*
615+
* @param datasetId The dataset identifier
616+
* @return A String containing the dataset content
617+
*/
618+
static String fromDataset(String datasetId) {
619+
fromDataset(Collections.emptyMap(), datasetId)
620+
}
621+
622+
/**
623+
* Download a dataset from Seqera Platform and return its content as a String
624+
*
625+
* @param opts Optional parameters (endpoint, version, fileName)
626+
* @param datasetId The dataset identifier
627+
* @return A String containing the dataset content
628+
*
629+
* @example
630+
* <pre>
631+
* // Basic usage - requires fileName parameter
632+
* def csv = Channel.fromDataset([fileName: 'data.csv'], 'ds.123abc')
633+
*
634+
* // Specify version and endpoint
635+
* def csv = Channel.fromDataset([
636+
* fileName: 'data.csv',
637+
* version: '2',
638+
* endpoint: 'https://api.tower.nf'
639+
* ], 'ds.123abc')
640+
*
641+
* // Use with nf-schema for samplesheet parsing
642+
* ch_input = Channel.fromList(
643+
* samplesheetToList(
644+
* fromDataset([fileName: 'samplesheet.csv'], params.input),
645+
* "assets/schema_input.json"
646+
* )
647+
* )
648+
* </pre>
649+
*
650+
* TODO: Support querying multiple datasets using list-datasets API
651+
* TODO: Support automatic version detection/latest version
652+
*/
653+
static String fromDataset(Map opts, String datasetId) {
654+
CheckHelper.checkParams('fromDataset', opts, DatasetExplorer.PARAMS)
655+
656+
def explorer = new DatasetExplorer(datasetId, opts)
657+
return explorer.apply()
658+
}
659+
611660
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.datasource
18+
19+
import groovy.json.JsonSlurper
20+
import groovy.transform.CompileStatic
21+
import groovy.util.logging.Slf4j
22+
import nextflow.Global
23+
import nextflow.Session
24+
import nextflow.exception.AbortOperationException
25+
import nextflow.util.SimpleHttpClient
26+
27+
/**
28+
* Download datasets from Seqera Platform
29+
*
30+
* @author Edmund Miller
31+
*/
32+
@Slf4j
33+
@CompileStatic
34+
class DatasetExplorer {
35+
36+
static public Map PARAMS = [
37+
endpoint: String,
38+
version: String,
39+
fileName: String
40+
]
41+
42+
private String datasetId
43+
private String endpoint
44+
private String version
45+
private String fileName
46+
private String accessToken
47+
private JsonSlurper jsonSlurper = new JsonSlurper()
48+
49+
DatasetExplorer() {
50+
}
51+
52+
DatasetExplorer(String datasetId, Map opts) {
53+
this.datasetId = datasetId
54+
init(opts)
55+
}
56+
57+
DatasetExplorer setDatasetId(String datasetId) {
58+
this.datasetId = datasetId
59+
return this
60+
}
61+
62+
protected void init(Map opts) {
63+
this.endpoint = opts.endpoint as String ?: getConfigEndpoint()
64+
this.version = opts.version as String ?: '1'
65+
this.fileName = opts.fileName as String
66+
}
67+
68+
protected Map getEnv() {
69+
System.getenv()
70+
}
71+
72+
protected String getConfigEndpoint() {
73+
def session = Global.session as Session
74+
def result = session?.config?.navigate('tower.endpoint')
75+
if (!result)
76+
result = 'https://api.tower.nf'
77+
return result as String
78+
}
79+
80+
protected String getAccessToken() {
81+
def session = Global.session as Session
82+
def token = session?.config?.navigate('tower.accessToken')
83+
if (!token)
84+
token = getEnv().get('TOWER_ACCESS_TOKEN')
85+
if (!token)
86+
throw new AbortOperationException("Missing Seqera Platform access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment or tower.accessToken in your config")
87+
return token as String
88+
}
89+
90+
/**
91+
* Fetch dataset metadata to determine the fileName if not provided
92+
* TODO: Implement this when the list-datasets API is available
93+
*/
94+
protected String getDatasetFileName() {
95+
if (fileName)
96+
return fileName
97+
98+
// TODO: In the future, we can query the dataset metadata to get the fileName
99+
// For now, we'll use a default pattern or require the user to provide it
100+
throw new AbortOperationException("fileName parameter is required for fromDataset(). Future versions will support automatic detection.")
101+
}
102+
103+
protected String getDownloadUrl() {
104+
final name = getDatasetFileName()
105+
return "${endpoint}/datasets/${datasetId}/v/${version}/n/${URLEncoder.encode(name, "UTF-8")}"
106+
}
107+
108+
/**
109+
* Download the dataset and return its content
110+
*/
111+
String apply() {
112+
if (!accessToken)
113+
accessToken = getAccessToken()
114+
115+
final url = getDownloadUrl()
116+
log.debug "Fetching dataset from: $url"
117+
118+
try {
119+
final client = new SimpleHttpClient()
120+
client.setAuthToken("Bearer ${accessToken}")
121+
122+
// Make HTTP GET request
123+
final connection = new URL(url).openConnection() as HttpURLConnection
124+
connection.setRequestMethod('GET')
125+
connection.setRequestProperty('Authorization', "Bearer ${accessToken}")
126+
connection.setRequestProperty('Accept', 'text/csv, text/plain, */*')
127+
128+
final responseCode = connection.responseCode
129+
130+
if (responseCode == 200) {
131+
final content = connection.inputStream.text
132+
log.trace "Dataset content received:\n${content?.take(500)}"
133+
return content
134+
}
135+
else if (responseCode == 403) {
136+
throw new AbortOperationException("Access forbidden to dataset ${datasetId} -- Check your permissions and access token")
137+
}
138+
else if (responseCode == 404) {
139+
throw new AbortOperationException("Dataset ${datasetId} not found -- Check the dataset ID, version, and fileName")
140+
}
141+
else {
142+
final errorMsg = connection.errorStream?.text ?: "HTTP ${responseCode}"
143+
throw new AbortOperationException("Failed to download dataset ${datasetId}: ${errorMsg}")
144+
}
145+
}
146+
catch (AbortOperationException e) {
147+
throw e
148+
}
149+
catch (Exception e) {
150+
throw new AbortOperationException("Error downloading dataset ${datasetId}: ${e.message}", e)
151+
}
152+
}
153+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.datasource
18+
19+
import nextflow.exception.AbortOperationException
20+
import spock.lang.Specification
21+
22+
/**
23+
* Test DatasetExplorer functionality
24+
*
25+
* @author Edmund Miller
26+
*/
27+
class DatasetExplorerTest extends Specification {
28+
29+
def 'should return download url' () {
30+
given:
31+
def explorer = Spy(DatasetExplorer)
32+
explorer.datasetId = DATASET_ID
33+
explorer.endpoint = ENDPOINT
34+
explorer.version = VERSION
35+
36+
when:
37+
explorer.fileName = FILENAME
38+
def result = explorer.getDownloadUrl()
39+
40+
then:
41+
result == EXPECTED
42+
43+
where:
44+
DATASET_ID | ENDPOINT | VERSION | FILENAME | EXPECTED
45+
'ds.123abc' | 'https://api.tower.nf' | '1' | 'data.csv' | 'https://api.tower.nf/datasets/ds.123abc/v/1/n/data.csv'
46+
'ds.456def' | 'https://api.tower.nf' | '2' | 'sample.tsv' | 'https://api.tower.nf/datasets/ds.456def/v/2/n/sample.tsv'
47+
'ds.789ghi' | 'https://custom.api.com' | '1' | 'test.csv' | 'https://custom.api.com/datasets/ds.789ghi/v/1/n/test.csv'
48+
}
49+
50+
def 'should return download url with encoded filename' () {
51+
given:
52+
def explorer = Spy(DatasetExplorer)
53+
explorer.datasetId = 'ds.123'
54+
explorer.endpoint = 'https://api.tower.nf'
55+
explorer.version = '1'
56+
explorer.fileName = 'my file.csv'
57+
58+
when:
59+
def result = explorer.getDownloadUrl()
60+
61+
then:
62+
result == 'https://api.tower.nf/datasets/ds.123/v/1/n/my+file.csv'
63+
}
64+
65+
def 'should use default endpoint' () {
66+
given:
67+
def explorer = Spy(DatasetExplorer)
68+
69+
when:
70+
def result = explorer.getConfigEndpoint()
71+
72+
then:
73+
1 * explorer.getEnv() >> [:]
74+
result == 'https://api.tower.nf'
75+
}
76+
77+
def 'should retrieve access token from environment' () {
78+
given:
79+
def explorer = Spy(DatasetExplorer)
80+
81+
when:
82+
def result = explorer.getAccessToken()
83+
84+
then:
85+
1 * explorer.getEnv() >> [TOWER_ACCESS_TOKEN: 'test_token_123']
86+
result == 'test_token_123'
87+
}
88+
89+
def 'should throw error when access token is missing' () {
90+
given:
91+
def explorer = Spy(DatasetExplorer)
92+
93+
when:
94+
explorer.getAccessToken()
95+
96+
then:
97+
1 * explorer.getEnv() >> [:]
98+
thrown(AbortOperationException)
99+
}
100+
101+
def 'should throw error when fileName is missing' () {
102+
given:
103+
def explorer = new DatasetExplorer('ds.123', [:])
104+
105+
when:
106+
explorer.getDatasetFileName()
107+
108+
then:
109+
thrown(AbortOperationException)
110+
}
111+
112+
def 'should use provided fileName' () {
113+
given:
114+
def explorer = new DatasetExplorer('ds.123', [fileName: 'test.csv'])
115+
116+
when:
117+
def result = explorer.getDatasetFileName()
118+
119+
then:
120+
result == 'test.csv'
121+
}
122+
123+
def 'should initialize with options' () {
124+
given:
125+
def opts = [
126+
endpoint: 'https://custom.api.com',
127+
version: '2',
128+
fileName: 'data.csv'
129+
]
130+
131+
when:
132+
def explorer = new DatasetExplorer('ds.123', opts)
133+
134+
then:
135+
explorer.datasetId == 'ds.123'
136+
explorer.endpoint == 'https://custom.api.com'
137+
explorer.version == '2'
138+
explorer.fileName == 'data.csv'
139+
}
140+
141+
def 'should use default version' () {
142+
given:
143+
def explorer = new DatasetExplorer('ds.123', [:])
144+
145+
expect:
146+
explorer.version == '1'
147+
}
148+
}

0 commit comments

Comments
 (0)