Skip to content

Commit 7a1409b

Browse files
committed
Add Channel.fromDataset() operator for Seqera Platform integration
This commit adds a new `fromDataset()` operator to the nf-tower plugin that allows downloading datasets from Seqera Platform. Key Features: - Downloads dataset files from Seqera Platform via the API - Supports version specification (defaults to version 1) - Supports custom file names (defaults to data.csv) - Returns dataset content as a String for further processing - Integrates seamlessly with nf-schema and other tools Usage Examples: ```groovy // Basic usage - download default file from dataset def content = Channel.fromDataset('my-dataset-id') // With nf-schema integration ch_input = Channel.fromList( samplesheetToList(Channel.fromDataset(params.input), "assets/schema_input.json") ) // Specify version and filename def dataset = Channel.fromDataset( datasetId: 'my-dataset-id', version: '2', fileName: 'samples.csv' ) ``` Implementation Details: - DatasetHelper: Handles API communication with Seqera Platform - TowerChannelExtension: Provides the Channel extension method - Uses Groovy extension module mechanism for seamless integration - Properly handles authentication via TOWER_ACCESS_TOKEN - Comprehensive error handling for HTTP errors (404, 403, 500, etc.) TODOs for future enhancements: - Add support for listing datasets (using /datasets API endpoint) - Auto-detect latest version when not specified - Query dataset metadata to determine actual filename Related to PR nextflow-io#6515 (dataset upload functionality) Signed-off-by: Edmund Miller <[email protected]>
1 parent 5318812 commit 7a1409b

File tree

5 files changed

+538
-0
lines changed

5 files changed

+538
-0
lines changed
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright (c) 2019, Seqera Labs.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*
8+
* This Source Code Form is "Incompatible With Secondary Licenses", as
9+
* defined by the Mozilla Public License, v. 2.0.
10+
*/
11+
12+
package io.seqera.tower.plugin
13+
14+
import groovy.json.JsonSlurper
15+
import groovy.transform.CompileStatic
16+
import groovy.util.logging.Slf4j
17+
import nextflow.Global
18+
import nextflow.Session
19+
import nextflow.exception.AbortOperationException
20+
import nextflow.util.SimpleHttpClient
21+
22+
/**
23+
* Helper class to download datasets from Seqera Platform
24+
*
25+
* @author Edmund Miller
26+
*/
27+
@Slf4j
28+
@CompileStatic
29+
class DatasetHelper {
30+
31+
static private final String TOKEN_PREFIX = '@token:'
32+
33+
private String endpoint
34+
private String accessToken
35+
private SimpleHttpClient httpClient
36+
37+
DatasetHelper() {
38+
this.endpoint = getEndpoint()
39+
this.accessToken = getAccessToken()
40+
this.httpClient = new SimpleHttpClient().setAuthToken(TOKEN_PREFIX + accessToken)
41+
}
42+
43+
DatasetHelper(String endpoint, String accessToken) {
44+
this.endpoint = endpoint
45+
this.accessToken = accessToken
46+
this.httpClient = new SimpleHttpClient().setAuthToken(TOKEN_PREFIX + accessToken)
47+
}
48+
49+
/**
50+
* Get the Tower endpoint URL from config or environment
51+
*/
52+
protected String getEndpoint() {
53+
def session = Global.session as Session
54+
def endpoint = session?.config?.navigate('tower.endpoint') as String
55+
if( !endpoint || endpoint == '-' )
56+
endpoint = TowerClient.DEF_ENDPOINT_URL
57+
return endpoint
58+
}
59+
60+
/**
61+
* Get the Tower access token from config or environment
62+
*/
63+
protected String getAccessToken() {
64+
def session = Global.session as Session
65+
def token = session?.config?.navigate('tower.accessToken')
66+
if( !token ) {
67+
def env = System.getenv()
68+
token = env.get('TOWER_ACCESS_TOKEN')
69+
}
70+
if( !token )
71+
throw new AbortOperationException("Missing Nextflow Tower access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment")
72+
return token
73+
}
74+
75+
/**
76+
* Download a dataset from Seqera Platform
77+
*
78+
* @param datasetId The dataset ID to download
79+
* @param version The version of the dataset (defaults to latest)
80+
* @param fileName The name of the file in the dataset (defaults to 'data.csv')
81+
* @return The content of the dataset file as a String
82+
*/
83+
String downloadDataset(String datasetId, String version = null, String fileName = null) {
84+
if( !datasetId )
85+
throw new IllegalArgumentException("Dataset ID cannot be null or empty")
86+
87+
// TODO: When version is not specified, we should query the latest version
88+
// For now, default to version 1 if not specified
89+
final versionStr = version ?: '1'
90+
91+
// TODO: In the future, we should query the dataset metadata to get the actual filename
92+
// For now, default to 'data.csv' if not specified
93+
final fileNameStr = fileName ?: 'data.csv'
94+
95+
final url = buildDownloadUrl(datasetId, versionStr, fileNameStr)
96+
log.debug "Downloading dataset from: $url"
97+
98+
try {
99+
httpClient.sendHttpMessage(url, null, 'GET')
100+
final responseCode = httpClient.responseCode
101+
102+
if( responseCode >= 200 && responseCode < 300 ) {
103+
return httpClient.getResponse()
104+
} else if( responseCode == 404 ) {
105+
throw new AbortOperationException("Dataset not found: $datasetId (version: $versionStr, file: $fileNameStr)")
106+
} else if( responseCode == 403 ) {
107+
throw new AbortOperationException("Access denied to dataset: $datasetId -- Check your Tower access token permissions")
108+
} else {
109+
throw new AbortOperationException("Failed to download dataset: $datasetId -- HTTP status: $responseCode")
110+
}
111+
} catch( IOException e ) {
112+
throw new AbortOperationException("Failed to download dataset: $datasetId -- ${e.message}", e)
113+
}
114+
}
115+
116+
/**
117+
* Build the download URL for a dataset
118+
*
119+
* @param datasetId The dataset ID
120+
* @param version The dataset version
121+
* @param fileName The file name
122+
* @return The complete download URL
123+
*/
124+
protected String buildDownloadUrl(String datasetId, String version, String fileName) {
125+
return "${endpoint}/datasets/${datasetId}/v/${version}/n/${fileName}"
126+
}
127+
128+
/**
129+
* TODO: List all datasets in a workspace
130+
* This will use the /datasets API endpoint
131+
*
132+
* @param workspaceId Optional workspace ID
133+
* @return List of available datasets
134+
*/
135+
// Future implementation for listing datasets
136+
// List<Map> listDatasets(String workspaceId = null) {
137+
// final url = workspaceId
138+
// ? "${endpoint}/datasets?workspaceId=${workspaceId}"
139+
// : "${endpoint}/datasets"
140+
//
141+
// httpClient.sendHttpMessage(url, null, 'GET')
142+
// if( httpClient.responseCode >= 200 && httpClient.responseCode < 300 ) {
143+
// def json = new JsonSlurper().parseText(httpClient.response)
144+
// return json.datasets as List<Map>
145+
// }
146+
// return []
147+
// }
148+
149+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright (c) 2019, Seqera Labs.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*
8+
* This Source Code Form is "Incompatible With Secondary Licenses", as
9+
* defined by the Mozilla Public License, v. 2.0.
10+
*/
11+
12+
package io.seqera.tower.plugin
13+
14+
import groovy.transform.CompileStatic
15+
import groovy.util.logging.Slf4j
16+
import groovyx.gpars.dataflow.DataflowWriteChannel
17+
import nextflow.Channel
18+
import nextflow.Global
19+
import nextflow.NF
20+
import nextflow.Session
21+
import nextflow.dag.NodeMarker
22+
import nextflow.extension.CH
23+
24+
/**
25+
* Channel extension methods for Tower/Seqera Platform integration
26+
*
27+
* @author Edmund Miller
28+
*/
29+
@Slf4j
30+
@CompileStatic
31+
class TowerChannelExtension {
32+
33+
/**
34+
* Download a dataset from Seqera Platform and return its content as a String
35+
*
36+
* This function downloads a dataset file from Seqera Platform.
37+
* It can be used in combination with other Channel factory methods.
38+
*
39+
* Example usage:
40+
* <pre>
41+
* // Basic usage - returns the dataset content as a string
42+
* def dataset = Channel.fromDataset('my-dataset-id')
43+
*
44+
* // With nf-schema integration
45+
* ch_input = Channel.fromList(
46+
* samplesheetToList(Channel.fromDataset(params.input), "assets/schema_input.json")
47+
* )
48+
*
49+
* // Specify version and filename
50+
* def dataset = Channel.fromDataset(
51+
* datasetId: 'my-dataset-id',
52+
* version: '2',
53+
* fileName: 'samples.csv'
54+
* )
55+
* </pre>
56+
*
57+
* @param datasetId The dataset ID (when called with a string)
58+
* @return The content of the dataset file as a String
59+
*/
60+
static String fromDataset(Channel self, String datasetId) {
61+
return fromDataset(self, [datasetId: datasetId])
62+
}
63+
64+
/**
65+
* Download a dataset from Seqera Platform with options
66+
*
67+
* @param opts Map with options:
68+
* - datasetId: (required) The dataset ID to download
69+
* - version: (optional) The version of the dataset (defaults to '1')
70+
* - fileName: (optional) The name of the file in the dataset (defaults to 'data.csv')
71+
* @return The content of the dataset file as a String
72+
*/
73+
static String fromDataset(Channel self, Map opts) {
74+
final datasetId = opts.datasetId as String
75+
final version = opts.version as String
76+
final fileName = opts.fileName as String
77+
78+
if( !datasetId )
79+
throw new IllegalArgumentException("fromDataset requires 'datasetId' parameter")
80+
81+
// Check if Tower is configured
82+
checkTowerEnabled()
83+
84+
log.debug "Fetching dataset: $datasetId (version: ${version ?: 'latest'}, file: ${fileName ?: 'data.csv'})"
85+
86+
final helper = new DatasetHelper()
87+
final content = helper.downloadDataset(datasetId, version, fileName)
88+
89+
log.trace "Dataset content retrieved: ${content?.length() ?: 0} characters"
90+
91+
return content
92+
}
93+
94+
/**
95+
* Check if Tower is properly configured
96+
*/
97+
protected static void checkTowerEnabled() {
98+
def session = Global.session as Session
99+
if( !session ) {
100+
log.warn "Session not initialized - Tower configuration cannot be validated"
101+
return
102+
}
103+
104+
// We don't require tower.enabled=true for fromDataset to work
105+
// as long as the access token is available
106+
def token = session.config?.navigate('tower.accessToken')
107+
if( !token ) {
108+
token = System.getenv('TOWER_ACCESS_TOKEN')
109+
}
110+
111+
if( !token ) {
112+
log.debug "Tower access token not found - fromDataset may fail"
113+
}
114+
}
115+
116+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
moduleName=nf-tower
2+
moduleVersion=1.0
3+
extensionClasses=io.seqera.tower.plugin.TowerChannelExtension

0 commit comments

Comments
 (0)