Skip to content

Commit 9c4db63

Browse files
committed
feat(pipeline): add S3 send step for uploading pipeline output to S3-compatible storage
Implement a new pipeline step that uploads files from the previous step output to S3-compatible storage (AWS S3, MinIO, Ceph, etc.). Key features: - S3Uploader interface with AWSS3Uploader implementation using AWS SDK v2 - Upload manifest tracking for retry resilience (skips already uploaded files) - Environment variable expansion for credentials (${VAR} syntax) - Error classification (transient vs non-transient) for retry logic - Support for path-style addressing (required for MinIO) - Default region set to eu-central-1 New files: - internal/models/send.go: SendConfig, UploadManifest structs - internal/services/s3_uploader.go: S3Uploader interface and implementation - internal/pipeline/send.go: ExecuteSendStep function - tests/unit/send_step_test.go: Unit tests for send step - tests/unit/send_config_loading_test.go: Config loading tests Closes #126
1 parent 462fe4d commit 9c4db63

File tree

13 files changed

+1412
-13
lines changed

13 files changed

+1412
-13
lines changed

cmd/pipeline.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,23 @@ func executeStep(job *models.PipelineJob, stepName models.StepName, config *mode
230230
fmt.Printf("\n✓ Flattening completed\n")
231231
return nil
232232

233+
case models.StepSend:
234+
fmt.Println("Starting S3 send step...")
235+
if err := pipeline.ExecuteSendStep(job, jobDir, logger); err != nil {
236+
failedJob := pipeline.FailJob(job, err.Error())
237+
if saveErr := pipeline.UpdateJob(config.JobsDir, failedJob); saveErr != nil {
238+
logger.Error("Failed to save job state", "error", saveErr)
239+
}
240+
return fmt.Errorf("send step failed: %w", err)
241+
}
242+
243+
if err := pipeline.UpdateJob(config.JobsDir, job); err != nil {
244+
return fmt.Errorf("failed to save job state: %w", err)
245+
}
246+
247+
fmt.Printf("\n✓ S3 send completed\n")
248+
return nil
249+
233250
case models.StepWait:
234251
// Execute wait step - creates empty wait directory and pauses pipeline
235252
stepIndex := -1

config/aether.example.yaml

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,42 @@ services:
9393
# Default: 30m (30 minutes)
9494
timeout: 30m
9595

96+
# S3 Send Service (optional)
97+
# Uploads pipeline output files to S3-compatible storage (AWS S3, MinIO, Ceph, etc.)
98+
# Leave empty to skip send step
99+
send:
100+
# S3 endpoint URL (optional)
101+
# Leave empty for AWS S3, or set for S3-compatible services like MinIO
102+
# Example for MinIO: "http://localhost:9000"
103+
# Example for AWS S3: leave empty (uses default AWS endpoints)
104+
endpoint: "${S3_ENDPOINT}"
105+
106+
# AWS region (default: eu-central-1)
107+
region: "eu-central-1"
108+
109+
# S3 bucket name (required when send step is enabled)
110+
# Supports environment variable expansion: "${S3_BUCKET}"
111+
bucket: "${S3_BUCKET}"
112+
113+
# AWS credentials
114+
# IMPORTANT: Use environment variables for sensitive credentials
115+
# Never commit actual credentials to version control
116+
access_key_id: "${AWS_ACCESS_KEY_ID}"
117+
secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
118+
119+
# Use path-style addressing instead of virtual-hosted-style
120+
# Set to true for MinIO and other S3-compatible services
121+
# Set to false (default) for AWS S3
122+
use_path_style: false
123+
124+
# Upload timeout per operation
125+
# Default: 30m (30 minutes)
126+
timeout: 30m
127+
96128
pipeline:
97129
# List of steps to execute in order
98130
# Import step options (must be first): torch, local_import, http_import
99-
# Other step options: dimp, validation, csv_conversion, parquet_conversion, wait, flattening
131+
# Other step options: dimp, validation, csv_conversion, parquet_conversion, wait, flattening, send
100132
# NOTE: At least one import step must be first in enabled_steps
101133
# NOTE: Enable all the import types you want to support - the system will automatically
102134
# use the correct one based on your input (TORCH URL → torch, local dir → local_import, etc.)
@@ -107,6 +139,13 @@ pipeline:
107139
# - Pipeline pauses until 'aether pipeline continue <job-id>' is run
108140
# - Cannot be first step (needs previous output)
109141
# - Consecutive waits are not allowed
142+
#
143+
# Send step:
144+
# Use 'send' to upload pipeline output to S3-compatible storage.
145+
# - Uploads files from the previous step output
146+
# - S3 key format: {job-id}/{filename}
147+
# - Tracks uploads for retry resilience (skips already uploaded files)
148+
# - Typically used as the last step in the pipeline
110149
enabled_steps:
111150
- torch # TORCH import via CRTDL or direct TORCH URL
112151
- local_import # Import from local directory
@@ -117,6 +156,7 @@ pipeline:
117156
- flattening # Transform FHIR to CSV using fhir-flattener (CRTDL input only)
118157
- csv_conversion
119158
- parquet_conversion
159+
# - send # Uncomment to upload output to S3-compatible storage
120160

121161
retry:
122162
# Maximum number of retry attempts for transient errors (network, 5xx)

go.mod

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.25
44

55
require (
66
github.com/google/uuid v1.6.0
7+
github.com/klauspost/compress v1.18.2
78
github.com/schollz/progressbar/v3 v3.19.0
89
github.com/spf13/cobra v1.10.2
910
github.com/spf13/viper v1.21.0
@@ -12,11 +13,29 @@ require (
1213
)
1314

1415
require (
16+
github.com/aws/aws-sdk-go-v2 v1.41.1 // indirect
17+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect
18+
github.com/aws/aws-sdk-go-v2/config v1.32.7 // indirect
19+
github.com/aws/aws-sdk-go-v2/credentials v1.19.7 // indirect
20+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 // indirect
21+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 // indirect
22+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 // indirect
23+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
24+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17 // indirect
25+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect
26+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 // indirect
27+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 // indirect
28+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.17 // indirect
29+
github.com/aws/aws-sdk-go-v2/service/s3 v1.95.1 // indirect
30+
github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 // indirect
31+
github.com/aws/aws-sdk-go-v2/service/sso v1.30.9 // indirect
32+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.13 // indirect
33+
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect
34+
github.com/aws/smithy-go v1.24.0 // indirect
1535
github.com/davecgh/go-spew v1.1.1 // indirect
1636
github.com/fsnotify/fsnotify v1.9.0 // indirect
1737
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
1838
github.com/inconshreveable/mousetrap v1.1.0 // indirect
19-
github.com/klauspost/compress v1.18.2 // indirect
2039
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
2140
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
2241
github.com/pmezard/go-difflib v1.0.0 // indirect

go.sum

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,41 @@
1+
github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU=
2+
github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0=
3+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 h1:489krEF9xIGkOaaX3CE/Be2uWjiXrkCH6gUX+bZA/BU=
4+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4/go.mod h1:IOAPF6oT9KCsceNTvvYMNHy0+kMF8akOjeDvPENWxp4=
5+
github.com/aws/aws-sdk-go-v2/config v1.32.7 h1:vxUyWGUwmkQ2g19n7JY/9YL8MfAIl7bTesIUykECXmY=
6+
github.com/aws/aws-sdk-go-v2/config v1.32.7/go.mod h1:2/Qm5vKUU/r7Y+zUk/Ptt2MDAEKAfUtKc1+3U1Mo3oY=
7+
github.com/aws/aws-sdk-go-v2/credentials v1.19.7 h1:tHK47VqqtJxOymRrNtUXN5SP/zUTvZKeLx4tH6PGQc8=
8+
github.com/aws/aws-sdk-go-v2/credentials v1.19.7/go.mod h1:qOZk8sPDrxhf+4Wf4oT2urYJrYt3RejHSzgAquYeppw=
9+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 h1:I0GyV8wiYrP8XpA70g1HBcQO1JlQxCMTW9npl5UbDHY=
10+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17/go.mod h1:tyw7BOl5bBe/oqvoIeECFJjMdzXoa/dfVz3QQ5lgHGA=
11+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 h1:xOLELNKGp2vsiteLsvLPwxC+mYmO6OZ8PYgiuPJzF8U=
12+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17/go.mod h1:5M5CI3D12dNOtH3/mk6minaRwI2/37ifCURZISxA/IQ=
13+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 h1:WWLqlh79iO48yLkj1v3ISRNiv+3KdQoZ6JWyfcsyQik=
14+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17/go.mod h1:EhG22vHRrvF8oXSTYStZhJc1aUgKtnJe+aOiFEV90cM=
15+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk=
16+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc=
17+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17 h1:JqcdRG//czea7Ppjb+g/n4o8i/R50aTBHkA7vu0lK+k=
18+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17/go.mod h1:CO+WeGmIdj/MlPel2KwID9Gt7CNq4M65HUfBW97liM0=
19+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEdJbzUCEWkVXEXoqlXV72J5keC1GvILMOuD00E=
20+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow=
21+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 h1:Z5EiPIzXKewUQK0QTMkutjiaPVeVYXX7KIqhXu/0fXs=
22+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8/go.mod h1:FsTpJtvC4U1fyDXk7c71XoDv3HlRm8V3NiYLeYLh5YE=
23+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 h1:RuNSMoozM8oXlgLG/n6WLaFGoea7/CddrCfIiSA+xdY=
24+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17/go.mod h1:F2xxQ9TZz5gDWsclCtPQscGpP0VUOc8RqgFM3vDENmU=
25+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.17 h1:bGeHBsGZx0Dvu/eJC0Lh9adJa3M1xREcndxLNZlve2U=
26+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.17/go.mod h1:dcW24lbU0CzHusTE8LLHhRLI42ejmINN8Lcr22bwh/g=
27+
github.com/aws/aws-sdk-go-v2/service/s3 v1.95.1 h1:C2dUPSnEpy4voWFIq3JNd8gN0Y5vYGDo44eUE58a/p8=
28+
github.com/aws/aws-sdk-go-v2/service/s3 v1.95.1/go.mod h1:5jggDlZ2CLQhwJBiZJb4vfk4f0GxWdEDruWKEJ1xOdo=
29+
github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 h1:VrhDvQib/i0lxvr3zqlUwLwJP4fpmpyD9wYG1vfSu+Y=
30+
github.com/aws/aws-sdk-go-v2/service/signin v1.0.5/go.mod h1:k029+U8SY30/3/ras4G/Fnv/b88N4mAfliNn08Dem4M=
31+
github.com/aws/aws-sdk-go-v2/service/sso v1.30.9 h1:v6EiMvhEYBoHABfbGB4alOYmCIrcgyPPiBE1wZAEbqk=
32+
github.com/aws/aws-sdk-go-v2/service/sso v1.30.9/go.mod h1:yifAsgBxgJWn3ggx70A3urX2AN49Y5sJTD1UQFlfqBw=
33+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.13 h1:gd84Omyu9JLriJVCbGApcLzVR3XtmC4ZDPcAI6Ftvds=
34+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.13/go.mod h1:sTGThjphYE4Ohw8vJiRStAcu3rbjtXRsdNB0TvZ5wwo=
35+
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 h1:5fFjR/ToSOzB2OQ/XqWpZBmNvmP/pJ1jOWYlFDJTjRQ=
36+
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6/go.mod h1:qgFDZQSD/Kys7nJnVqYlWKnh0SSdMjAi0uSwON4wgYQ=
37+
github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk=
38+
github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
139
github.com/chengxilo/virtualterm v1.0.4 h1:Z6IpERbRVlfB8WkOmtbHiDbBANU7cimRIof7mk9/PwM=
240
github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0iwWCOK1q10rlY=
341
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
@@ -37,8 +75,6 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f
3775
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
3876
github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc=
3977
github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik=
40-
github.com/schollz/progressbar/v3 v3.18.0 h1:uXdoHABRFmNIjUfte/Ex7WtuyVslrw2wVPQmCN62HpA=
41-
github.com/schollz/progressbar/v3 v3.18.0/go.mod h1:IsO3lpbaGuzh8zIMzgY3+J8l4C8GjO0Y9S69eFvNsec=
4278
github.com/schollz/progressbar/v3 v3.19.0 h1:Ea18xuIRQXLAUidVDox3AbwfUhD0/1IvohyTutOIFoc=
4379
github.com/schollz/progressbar/v3 v3.19.0/go.mod h1:IsO3lpbaGuzh8zIMzgY3+J8l4C8GjO0Y9S69eFvNsec=
4480
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw=
@@ -47,8 +83,6 @@ github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I=
4783
github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg=
4884
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
4985
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
50-
github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=
51-
github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0=
5286
github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU=
5387
github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4=
5488
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=

internal/models/config.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ func DefaultCompressionConfig() CompressionConfig {
3131

3232
// ServiceConfig contains connection details for external HTTP services
3333
type ServiceConfig struct {
34-
DIMP DIMPConfig `yaml:"dimp" json:"dimp"`
35-
CSVConversion CSVConversionConfig `yaml:"csv_conversion" json:"csv_conversion"`
36-
ParquetConversion ParquetConversionConfig `yaml:"parquet_conversion" json:"parquet_conversion"`
37-
TORCH TORCHConfig `yaml:"torch" json:"torch"`
38-
Flattening FlatteningConfig `yaml:"flattening" json:"flattening"`
39-
CRTDLPreprocessing CRTDLPreprocessingConfig `yaml:"crtdl_preprocessing" json:"crtdl_preprocessing"`
34+
DIMP DIMPConfig `yaml:"dimp" json:"dimp"`
35+
CSVConversion CSVConversionConfig `yaml:"csv_conversion" json:"csv_conversion"`
36+
ParquetConversion ParquetConversionConfig `yaml:"parquet_conversion" json:"parquet_conversion"`
37+
TORCH TORCHConfig `yaml:"torch" json:"torch"`
38+
Flattening FlatteningConfig `yaml:"flattening" json:"flattening"`
39+
CRTDLPreprocessing CRTDLPreprocessingConfig `yaml:"crtdl_preprocessing" json:"crtdl_preprocessing"`
40+
Send SendConfig `yaml:"send" json:"send"`
4041
}
4142

4243
// DIMPConfig contains DIMP pseudonymization service settings
@@ -101,6 +102,7 @@ func DefaultConfig() ProjectConfig {
101102
},
102103
Flattening: DefaultFlatteningConfig(),
103104
CRTDLPreprocessing: DefaultCRTDLPreprocessingConfig(),
105+
Send: DefaultSendConfig(),
104106
},
105107
Pipeline: PipelineConfig{
106108
EnabledSteps: []StepName{StepLocalImport, StepHttpImport},

0 commit comments

Comments
 (0)