Skip to content

Commit 62a1f0f

Browse files
committed
refactor(nf-tower): Replace manual HTTP with tower-java-sdk for dataset upload
Refactor dataset upload implementation to use the official tower-java-sdk instead of manual HTTP multipart encoding, significantly simplifying the code and improving maintainability. Changes: - Add tower-java-sdk dependency (1.43.1) with GitHub Packages repository - Replace manual HTTP implementation with DatasetsApi SDK methods: - createDataset() now uses datasetsApi.createDataset(wspId, request) - uploadIndexToDataset() now uses datasetsApi.uploadDataset(wspId, id, header, file) - Remove ~120 lines of manual HTTP code: - Deleted getUrlDatasets() and getUrlDatasetUpload() URL builders - Deleted uploadFile() multipart HTTP request construction - Deleted createMultipartBody() RFC 2388 multipart encoding - Add comprehensive test coverage: - 7 unit tests with mocked DatasetsApi (initialization, event collection, dataset creation, file upload, exception handling) - 1 integration test with real Platform API (conditional on TOWER_ACCESS_TOKEN) - Manual test workflow in test-dataset-upload/ directory with documentation Testing: - All unit tests passing (BUILD SUCCESSFUL) - Integration test ready (runs when TOWER_ACCESS_TOKEN available) - Test workflow provides end-to-end validation guide Benefits: - Uses official Seqera SDK (same as tower-cli) - Easier to test with mocked API - SDK handles all HTTP/multipart details automatically - Bug fixes in SDK benefit us automatically - Code reduced from ~300 lines to ~100 lines Note: Requires GitHub credentials for tower-java-sdk dependency. Configure github_username and github_access_token in gradle.properties or set GITHUB_USERNAME and GITHUB_TOKEN environment variables. Signed-off-by: Edmund Miller <[email protected]> Signed-off-by: Edmund Miller <[email protected]>
1 parent d9ce038 commit 62a1f0f

File tree

4 files changed

+359
-113
lines changed

4 files changed

+359
-113
lines changed

plugins/nf-tower/build.gradle

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ nextflowPlugin {
3030
]
3131
}
3232

33+
repositories {
34+
maven {
35+
url = uri("https://maven.pkg.github.com/seqeralabs/tower-java-sdk")
36+
credentials {
37+
username = project.findProperty('github_username') ?: System.getenv("GITHUB_USERNAME")
38+
password = project.findProperty('github_access_token') ?: System.getenv("GITHUB_TOKEN")
39+
}
40+
}
41+
}
42+
3343
sourceSets {
3444
main.java.srcDirs = []
3545
main.groovy.srcDirs = ['src/main']
@@ -50,6 +60,7 @@ dependencies {
5060
compileOnly 'org.pf4j:pf4j:3.12.0'
5161
compileOnly 'io.seqera:lib-httpx:2.1.0'
5262

63+
api 'io.seqera.tower:tower-java-sdk:1.43.1'
5364
api "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0"
5465
api "com.fasterxml.jackson.core:jackson-databind:2.12.7.1"
5566

plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy

Lines changed: 63 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ import groovy.transform.TupleConstructor
3535
import groovy.util.logging.Slf4j
3636
import io.seqera.http.HxClient
3737
import io.seqera.util.trace.TraceUtils
38+
import io.seqera.tower.ApiClient
39+
import io.seqera.tower.ApiException
40+
import io.seqera.tower.api.DatasetsApi
41+
import io.seqera.tower.model.CreateDatasetRequest
42+
import io.seqera.tower.model.CreateDatasetResponse
3843
import nextflow.BuildInfo
3944
import nextflow.Session
4045
import nextflow.container.resolver.ContainerMeta
@@ -101,6 +106,8 @@ class TowerClient implements TraceObserverV2 {
101106

102107
private HxClient httpClient
103108

109+
private DatasetsApi datasetsApi
110+
104111
private JsonGenerator generator
105112

106113
private String workflowId
@@ -157,6 +164,7 @@ class TowerClient implements TraceObserverV2 {
157164
this.schema = loadSchema()
158165
this.generator = TowerJsonGenerator.create(schema)
159166
this.reports = new TowerReports(session)
167+
this.datasetsApi = createDatasetsApi()
160168
}
161169

162170
TowerClient withEnvironment(Map env) {
@@ -169,6 +177,30 @@ class TowerClient implements TraceObserverV2 {
169177
this.generator = TowerJsonGenerator.create(Collections.EMPTY_MAP)
170178
}
171179

180+
/**
181+
* Create and configure a DatasetsApi client for Seqera Platform
182+
*
183+
* @return Configured DatasetsApi instance
184+
*/
185+
protected DatasetsApi createDatasetsApi() {
186+
if( !accessToken || !endpoint ) {
187+
return null
188+
}
189+
190+
try {
191+
def apiClient = new ApiClient()
192+
apiClient.setBasePath(endpoint)
193+
apiClient.setBearerToken(accessToken)
194+
apiClient.setUserAgent("Nextflow/$BuildInfo.version")
195+
196+
return new DatasetsApi(apiClient)
197+
}
198+
catch( Exception e ) {
199+
log.warn "Failed to initialize DatasetsApi: ${e.message}"
200+
return null
201+
}
202+
}
203+
172204
@Override
173205
boolean enableMetrics() { true }
174206

@@ -266,18 +298,6 @@ class TowerClient implements TraceObserverV2 {
266298
return result
267299
}
268300

269-
protected String getUrlDatasets() {
270-
if( workspaceId )
271-
return "$endpoint/workspaces/$workspaceId/datasets/"
272-
return "$endpoint/datasets/"
273-
}
274-
275-
protected String getUrlDatasetUpload(String datasetId) {
276-
if( workspaceId )
277-
return "$endpoint/workspaces/$workspaceId/datasets/$datasetId/upload"
278-
return "$endpoint/datasets/$datasetId/upload"
279-
}
280-
281301
/**
282302
* On workflow start, submit a message with some basic
283303
* information, like Id, activity and an ISO 8601 formatted
@@ -959,7 +979,7 @@ class TowerClient implements TraceObserverV2 {
959979
}
960980

961981
/**
962-
* Create a new dataset in Seqera Platform
982+
* Create a new dataset in Seqera Platform using tower-java-sdk
963983
*
964984
* @param name The name for the new dataset
965985
* @param description The description for the new dataset
@@ -968,40 +988,39 @@ class TowerClient implements TraceObserverV2 {
968988
private String createDataset(String name, String description) {
969989
log.info "Creating new dataset: ${name}"
970990

991+
if( !datasetsApi ) {
992+
log.warn "DatasetsApi not initialized, cannot create dataset"
993+
return null
994+
}
995+
971996
try {
972-
final payload = [
973-
name: name,
974-
description: "Workflow output: ${description}",
975-
header: true
976-
]
977-
978-
final url = getUrlDatasets()
979-
final resp = sendHttpMessage(url, payload, 'POST')
980-
981-
if( resp.isError() ) {
982-
log.warn "Failed to create dataset '${name}': ${resp.message}"
983-
return null
984-
}
997+
def request = new CreateDatasetRequest()
998+
request.setName(name)
999+
request.setDescription("Workflow output: ${description}")
9851000

986-
// Parse the response to extract dataset ID
987-
final json = new JsonSlurper().parseText(resp.message) as Map
988-
final dataset = json.dataset as Map
989-
final datasetId = dataset?.id as String
1001+
def wspId = workspaceId ? Long.valueOf(workspaceId) : null
1002+
CreateDatasetResponse response = datasetsApi.createDataset(wspId, request)
1003+
1004+
def datasetId = response.dataset?.id?.toString()
9901005

9911006
if( datasetId ) {
9921007
log.info "Created dataset '${name}' with ID: ${datasetId}"
9931008
}
9941009

9951010
return datasetId
9961011
}
1012+
catch( ApiException e ) {
1013+
log.warn "Failed to create dataset '${name}': ${e.message} (status: ${e.code})", e
1014+
return null
1015+
}
9971016
catch( Exception e ) {
9981017
log.warn "Failed to create dataset '${name}': ${e.message}", e
9991018
return null
10001019
}
10011020
}
10021021

10031022
/**
1004-
* Upload an index file to a dataset
1023+
* Upload an index file to a dataset using tower-java-sdk
10051024
*
10061025
* @param datasetId The ID of the dataset
10071026
* @param indexPath The path to the index file
@@ -1013,96 +1032,27 @@ class TowerClient implements TraceObserverV2 {
10131032
return
10141033
}
10151034

1016-
log.info "Uploading index file for output '${outputName}' to dataset ${datasetId}: ${indexPath}"
1017-
1018-
try {
1019-
// Build URL with header parameter
1020-
def url = getUrlDatasetUpload(datasetId)
1021-
// Workflow output index files always have headers
1022-
url += "?header=true"
1023-
1024-
// Upload file using multipart form data
1025-
final resp = uploadFile(url, indexPath.toFile())
1026-
1027-
if( resp.isError() ) {
1028-
log.warn "Failed to upload index file for output '${outputName}': ${resp.message}"
1029-
} else {
1030-
log.info "Successfully uploaded index file for output '${outputName}' to dataset ${datasetId}"
1031-
}
1032-
}
1033-
catch( Exception e ) {
1034-
log.warn "Failed to upload index file for output '${outputName}': ${e.message}", e
1035+
if( !datasetsApi ) {
1036+
log.warn "DatasetsApi not initialized, cannot upload index file"
1037+
return
10351038
}
1036-
}
10371039

1038-
/**
1039-
* Upload a file to Seqera Platform using multipart/form-data
1040-
*
1041-
* @param url The upload URL
1042-
* @param file The file to upload
1043-
* @return Response object
1044-
*/
1045-
protected Response uploadFile(String url, File file) {
1046-
log.trace "HTTP multipart upload: url=$url; file=${file.name}"
1040+
log.info "Uploading index file for output '${outputName}' to dataset ${datasetId}: ${indexPath}"
10471041

10481042
try {
1049-
// Create multipart body
1050-
final boundary = "----TowerNextflowBoundary" + System.currentTimeMillis()
1051-
final body = createMultipartBody(file, boundary)
1052-
1053-
// Build request
1054-
final request = HttpRequest.newBuilder(URI.create(url))
1055-
.header('Content-Type', "multipart/form-data; boundary=$boundary")
1056-
.header('User-Agent', "Nextflow/$BuildInfo.version")
1057-
.header('Traceparent', TraceUtils.rndTrace())
1058-
.POST(HttpRequest.BodyPublishers.ofByteArray(body))
1059-
.build()
1060-
1061-
final resp = httpClient.sendAsString(request)
1062-
final status = resp.statusCode()
1043+
def wspId = workspaceId ? Long.valueOf(workspaceId) : null
1044+
def header = Boolean.TRUE // Workflow output index files always have headers
10631045

1064-
if( status == 401 ) {
1065-
return new Response(status, 'Unauthorized Seqera Platform API access')
1066-
}
1067-
if( status >= 400 ) {
1068-
final msg = parseCause(resp?.body()) ?: "Unexpected response for request $url"
1069-
return new Response(status, msg as String)
1070-
}
1046+
datasetsApi.uploadDataset(wspId, datasetId, header, indexPath.toFile())
10711047

1072-
return new Response(status, resp.body())
1048+
log.info "Successfully uploaded index file for output '${outputName}' to dataset ${datasetId}"
10731049
}
1074-
catch( IOException e ) {
1075-
return new Response(0, "Unable to connect to Seqera Platform API: ${getHostUrl(url)}")
1050+
catch( ApiException e ) {
1051+
log.warn "Failed to upload index file for output '${outputName}': ${e.message} (status: ${e.code})", e
1052+
}
1053+
catch( Exception e ) {
1054+
log.warn "Failed to upload index file for output '${outputName}': ${e.message}", e
10761055
}
1077-
}
1078-
1079-
/**
1080-
* Create a multipart/form-data request body
1081-
*
1082-
* @param file The file to include in the request
1083-
* @param boundary The multipart boundary string
1084-
* @return Byte array containing the multipart body
1085-
*/
1086-
private byte[] createMultipartBody(File file, String boundary) {
1087-
final baos = new ByteArrayOutputStream()
1088-
final writer = new PrintWriter(new OutputStreamWriter(baos, 'UTF-8'), true)
1089-
1090-
// Write file part
1091-
writer.append("--${boundary}\r\n")
1092-
writer.append("Content-Disposition: form-data; name=\"file\"; filename=\"${file.name}\"\r\n")
1093-
writer.append("Content-Type: text/csv\r\n")
1094-
writer.append("\r\n")
1095-
writer.flush()
1096-
1097-
// Write file content
1098-
baos.write(file.bytes)
1099-
1100-
// Write closing boundary
1101-
writer.append("\r\n")
1102-
writer.append("--${boundary}--\r\n")
1103-
writer.flush()
1104-
1105-
return baos.toByteArray()
11061056
}
11071057

11081058
}

0 commit comments

Comments
 (0)