Skip to content

Commit d9ce038

Browse files
committed
feat(nf-tower): Add workflow output dataset upload to Seqera Platform
Implement automatic upload of Nextflow workflow output index files to Seqera Platform datasets when workflows complete, enabling seamless integration between Nextflow's output syntax and Platform's dataset management. Changes: - Add DatasetConfig class for dataset upload configuration - Support auto-create or use existing datasets - Customizable dataset name patterns with variable substitution - Per-output configuration overrides - Update TowerConfig to include datasets configuration scope - Implement dataset upload in TowerClient: - Collect workflow outputs via onWorkflowOutput() callback - Upload index files on workflow completion (onFlowComplete) - Create datasets via Platform API with proper workspace URLs - Use multipart/form-data for file uploads (matches tower-cli) - Add URL builders for dataset API endpoints - Add comprehensive unit tests for DatasetConfig API Implementation: - Create dataset: POST /workspaces/{id}/datasets/ - Upload file: POST /workspaces/{id}/datasets/{id}/upload - Proper multipart/form-data format with file field - Workspace ID in URL path (not query param) - Header detection via ?header=true query parameter Configuration example: tower { datasets { enabled = true createMode = 'auto' namePattern = '${workflow.runName}-outputs' perOutput { 'results' { datasetId = 'existing-id' } } } } Based on research of tower-cli (v0.15.0) and Seqera Platform API documentation to ensure correct endpoint structure and payload format. Signed-off-by: Edmund Miller <[email protected]> Signed-off-by: Edmund Miller <[email protected]>
1 parent bd6a7b4 commit d9ce038

File tree

3 files changed

+283
-77
lines changed

3 files changed

+283
-77
lines changed

CODE-OF-CONDUCT.md

Lines changed: 0 additions & 77 deletions
This file was deleted.

plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import nextflow.trace.TraceObserverV2
4747
import nextflow.trace.TraceRecord
4848
import nextflow.trace.event.FilePublishEvent
4949
import nextflow.trace.event.TaskEvent
50+
import nextflow.trace.event.WorkflowOutputEvent
5051
import nextflow.util.Duration
5152
import nextflow.util.LoggerHelper
5253
import nextflow.util.ProcessHelper
@@ -142,12 +143,17 @@ class TowerClient implements TraceObserverV2 {
142143

143144
private Map<String,Boolean> allContainers = new ConcurrentHashMap<>()
144145

146+
private List<WorkflowOutputEvent> workflowOutputs = []
147+
148+
private DatasetConfig datasetConfig
149+
145150
TowerClient(Session session, TowerConfig config) {
146151
this.session = session
147152
this.endpoint = checkUrl(config.endpoint)
148153
this.accessToken = config.accessToken
149154
this.workspaceId = config.workspaceId
150155
this.retryPolicy = config.retryPolicy
156+
this.datasetConfig = config.datasets
151157
this.schema = loadSchema()
152158
this.generator = TowerJsonGenerator.create(schema)
153159
this.reports = new TowerReports(session)
@@ -260,6 +266,18 @@ class TowerClient implements TraceObserverV2 {
260266
return result
261267
}
262268

269+
protected String getUrlDatasets() {
270+
if( workspaceId )
271+
return "$endpoint/workspaces/$workspaceId/datasets/"
272+
return "$endpoint/datasets/"
273+
}
274+
275+
protected String getUrlDatasetUpload(String datasetId) {
276+
if( workspaceId )
277+
return "$endpoint/workspaces/$workspaceId/datasets/$datasetId/upload"
278+
return "$endpoint/datasets/$datasetId/upload"
279+
}
280+
263281
/**
264282
* On workflow start, submit a message with some basic
265283
* information, like Id, activity and an ISO 8601 formatted
@@ -409,6 +427,10 @@ class TowerClient implements TraceObserverV2 {
409427
}
410428
// wait and flush reports content
411429
reports.flowComplete()
430+
// upload workflow outputs to datasets
431+
if( shouldUploadDatasets() ) {
432+
uploadWorkflowOutputsToDatasets()
433+
}
412434
// notify the workflow completion
413435
if( workflowId ) {
414436
final req = makeCompleteReq(session)
@@ -495,6 +517,19 @@ class TowerClient implements TraceObserverV2 {
495517
reports.filePublish(event.target)
496518
}
497519

520+
/**
521+
* Collect workflow output events for later upload to datasets
522+
*
523+
* @param event The workflow output event
524+
*/
525+
@Override
526+
void onWorkflowOutput(WorkflowOutputEvent event) {
527+
log.debug "Workflow output published: ${event.name} -> ${event.index}"
528+
if( event.index ) {
529+
workflowOutputs << event
530+
}
531+
}
532+
498533
/**
499534
* Little helper method that sends a HTTP POST message as JSON with
500535
* the current run status, ISO 8601 UTC timestamp, run name and the TraceRecord
@@ -829,4 +864,245 @@ class TowerClient implements TraceObserverV2 {
829864
}
830865
}
831866

867+
/**
868+
* Check if dataset uploads should be performed
869+
*
870+
* @return true if dataset uploads are enabled and there are outputs to upload
871+
*/
872+
private boolean shouldUploadDatasets() {
873+
if( !datasetConfig?.enabled )
874+
return false
875+
if( !workflowId )
876+
return false
877+
if( workflowOutputs.isEmpty() )
878+
return false
879+
return true
880+
}
881+
882+
/**
883+
* Upload workflow outputs to Seqera Platform datasets
884+
*/
885+
private void uploadWorkflowOutputsToDatasets() {
886+
log.info "Uploading workflow outputs to Seqera Platform datasets"
887+
888+
for( final output : workflowOutputs ) {
889+
try {
890+
if( !datasetConfig.isEnabledForOutput(output.name) ) {
891+
log.debug "Dataset upload disabled for output: ${output.name}"
892+
continue
893+
}
894+
895+
uploadOutputToDataset(output)
896+
}
897+
catch( Exception e ) {
898+
log.warn "Failed to upload workflow output '${output.name}' to dataset: ${e.message}", e
899+
}
900+
}
901+
}
902+
903+
/**
904+
* Upload a single workflow output to a dataset
905+
*
906+
* @param output The workflow output event to upload
907+
*/
908+
private void uploadOutputToDataset(WorkflowOutputEvent output) {
909+
// Resolve dataset ID
910+
final datasetId = resolveDatasetId(output)
911+
if( !datasetId ) {
912+
log.warn "Could not determine dataset ID for output: ${output.name}"
913+
return
914+
}
915+
916+
// Upload index file
917+
uploadIndexToDataset(datasetId, output.index, output.name)
918+
}
919+
920+
/**
921+
* Resolve the dataset ID for a workflow output
922+
*
923+
* @param output The workflow output event
924+
* @return The dataset ID, or null if it could not be determined
925+
*/
926+
private String resolveDatasetId(WorkflowOutputEvent output) {
927+
// First check if a dataset ID is explicitly configured for this output
928+
final configuredId = datasetConfig.getDatasetId(output.name)
929+
if( configuredId ) {
930+
log.debug "Using configured dataset ID for output '${output.name}': ${configuredId}"
931+
return configuredId
932+
}
933+
934+
// If auto-create is enabled, create a new dataset
935+
if( datasetConfig.isAutoCreateEnabled() ) {
936+
final datasetName = resolveDatasetName(output.name)
937+
return createDataset(datasetName, output.name)
938+
}
939+
940+
log.warn "No dataset ID configured for output '${output.name}' and auto-create is disabled"
941+
return null
942+
}
943+
944+
/**
945+
* Resolve the dataset name using the configured pattern
946+
*
947+
* @param outputName The name of the workflow output
948+
* @return The resolved dataset name
949+
*/
950+
private String resolveDatasetName(String outputName) {
951+
def name = datasetConfig.namePattern
952+
953+
// Replace variables in the pattern
954+
name = name.replace('${workflow.runName}', session.runName ?: 'unknown')
955+
name = name.replace('${workflow.sessionId}', session.uniqueId?.toString() ?: 'unknown')
956+
name = name.replace('${output.name}', outputName)
957+
958+
return name
959+
}
960+
961+
/**
962+
* Create a new dataset in Seqera Platform
963+
*
964+
* @param name The name for the new dataset
965+
* @param description The description for the new dataset
966+
* @return The ID of the created dataset, or null if creation failed
967+
*/
968+
private String createDataset(String name, String description) {
969+
log.info "Creating new dataset: ${name}"
970+
971+
try {
972+
final payload = [
973+
name: name,
974+
description: "Workflow output: ${description}",
975+
header: true
976+
]
977+
978+
final url = getUrlDatasets()
979+
final resp = sendHttpMessage(url, payload, 'POST')
980+
981+
if( resp.isError() ) {
982+
log.warn "Failed to create dataset '${name}': ${resp.message}"
983+
return null
984+
}
985+
986+
// Parse the response to extract dataset ID
987+
final json = new JsonSlurper().parseText(resp.message) as Map
988+
final dataset = json.dataset as Map
989+
final datasetId = dataset?.id as String
990+
991+
if( datasetId ) {
992+
log.info "Created dataset '${name}' with ID: ${datasetId}"
993+
}
994+
995+
return datasetId
996+
}
997+
catch( Exception e ) {
998+
log.warn "Failed to create dataset '${name}': ${e.message}", e
999+
return null
1000+
}
1001+
}
1002+
1003+
/**
1004+
* Upload an index file to a dataset
1005+
*
1006+
* @param datasetId The ID of the dataset
1007+
* @param indexPath The path to the index file
1008+
* @param outputName The name of the workflow output (for logging)
1009+
*/
1010+
private void uploadIndexToDataset(String datasetId, java.nio.file.Path indexPath, String outputName) {
1011+
if( !indexPath || !java.nio.file.Files.exists(indexPath) ) {
1012+
log.warn "Index file does not exist for output '${outputName}': ${indexPath}"
1013+
return
1014+
}
1015+
1016+
log.info "Uploading index file for output '${outputName}' to dataset ${datasetId}: ${indexPath}"
1017+
1018+
try {
1019+
// Build URL with header parameter
1020+
def url = getUrlDatasetUpload(datasetId)
1021+
// Workflow output index files always have headers
1022+
url += "?header=true"
1023+
1024+
// Upload file using multipart form data
1025+
final resp = uploadFile(url, indexPath.toFile())
1026+
1027+
if( resp.isError() ) {
1028+
log.warn "Failed to upload index file for output '${outputName}': ${resp.message}"
1029+
} else {
1030+
log.info "Successfully uploaded index file for output '${outputName}' to dataset ${datasetId}"
1031+
}
1032+
}
1033+
catch( Exception e ) {
1034+
log.warn "Failed to upload index file for output '${outputName}': ${e.message}", e
1035+
}
1036+
}
1037+
1038+
/**
1039+
* Upload a file to Seqera Platform using multipart/form-data
1040+
*
1041+
* @param url The upload URL
1042+
* @param file The file to upload
1043+
* @return Response object
1044+
*/
1045+
protected Response uploadFile(String url, File file) {
1046+
log.trace "HTTP multipart upload: url=$url; file=${file.name}"
1047+
1048+
try {
1049+
// Create multipart body
1050+
final boundary = "----TowerNextflowBoundary" + System.currentTimeMillis()
1051+
final body = createMultipartBody(file, boundary)
1052+
1053+
// Build request
1054+
final request = HttpRequest.newBuilder(URI.create(url))
1055+
.header('Content-Type', "multipart/form-data; boundary=$boundary")
1056+
.header('User-Agent', "Nextflow/$BuildInfo.version")
1057+
.header('Traceparent', TraceUtils.rndTrace())
1058+
.POST(HttpRequest.BodyPublishers.ofByteArray(body))
1059+
.build()
1060+
1061+
final resp = httpClient.sendAsString(request)
1062+
final status = resp.statusCode()
1063+
1064+
if( status == 401 ) {
1065+
return new Response(status, 'Unauthorized Seqera Platform API access')
1066+
}
1067+
if( status >= 400 ) {
1068+
final msg = parseCause(resp?.body()) ?: "Unexpected response for request $url"
1069+
return new Response(status, msg as String)
1070+
}
1071+
1072+
return new Response(status, resp.body())
1073+
}
1074+
catch( IOException e ) {
1075+
return new Response(0, "Unable to connect to Seqera Platform API: ${getHostUrl(url)}")
1076+
}
1077+
}
1078+
1079+
/**
1080+
* Create a multipart/form-data request body
1081+
*
1082+
* @param file The file to include in the request
1083+
* @param boundary The multipart boundary string
1084+
* @return Byte array containing the multipart body
1085+
*/
1086+
private byte[] createMultipartBody(File file, String boundary) {
1087+
final baos = new ByteArrayOutputStream()
1088+
final writer = new PrintWriter(new OutputStreamWriter(baos, 'UTF-8'), true)
1089+
1090+
// Write file part
1091+
writer.append("--${boundary}\r\n")
1092+
writer.append("Content-Disposition: form-data; name=\"file\"; filename=\"${file.name}\"\r\n")
1093+
writer.append("Content-Type: text/csv\r\n")
1094+
writer.append("\r\n")
1095+
writer.flush()
1096+
1097+
// Write file content
1098+
baos.write(file.bytes)
1099+
1100+
// Write closing boundary
1101+
writer.append("\r\n")
1102+
writer.append("--${boundary}--\r\n")
1103+
writer.flush()
1104+
1105+
return baos.toByteArray()
1106+
}
1107+
8321108
}

0 commit comments

Comments
 (0)