Skip to content

Commit 9a9cb90

Browse files
Add s3 dependencies and state input of start workflow API (#575)
1 parent f545701 commit 9a9cb90

30 files changed

+1781
-52
lines changed

cmd/s3-example/main.go

Lines changed: 441 additions & 0 deletions
Large diffs are not rendered by default.

cmd/server/iwf/iwf.go

Lines changed: 98 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@ import (
2424
"context"
2525
"crypto/tls"
2626
"fmt"
27+
rawLog "log"
28+
"strings"
29+
"sync"
30+
"time"
31+
32+
"github.com/aws/aws-sdk-go-v2/aws"
33+
awsConfig "github.com/aws/aws-sdk-go-v2/config"
34+
"github.com/aws/aws-sdk-go-v2/credentials"
35+
"github.com/aws/aws-sdk-go-v2/service/s3"
2736
"github.com/indeedeng/iwf/config"
2837
cadenceapi "github.com/indeedeng/iwf/service/client/cadence"
2938
temporalapi "github.com/indeedeng/iwf/service/client/temporal"
3039
ggrpc "google.golang.org/grpc"
3140
"google.golang.org/grpc/metadata"
32-
rawLog "log"
33-
"strings"
34-
"sync"
35-
"time"
3641

3742
isvc "github.com/indeedeng/iwf/service"
3843
"github.com/indeedeng/iwf/service/api"
@@ -199,12 +204,17 @@ func launchTemporalService(
199204
svcName string, config config.Config, unifiedClient uclient.UnifiedClient, temporalClient client.Client,
200205
logger log.Logger,
201206
) {
207+
s3Client := CreateS3Client(config, context.Background())
202208
switch svcName {
203209
case serviceAPI:
204-
svc := api.NewService(config, unifiedClient, logger.WithTags(tag.Service(svcName)))
210+
svc := api.NewService(
211+
config, unifiedClient, logger.WithTags(tag.Service(svcName)),
212+
s3Client,
213+
config.Interpreter.Temporal.Namespace+"/",
214+
)
205215
rawLog.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
206216
case serviceInterpreter:
207-
interpreter := temporal.NewInterpreterWorker(config, temporalClient, isvc.TaskQueue, false, nil, unifiedClient)
217+
interpreter := temporal.NewInterpreterWorker(config, temporalClient, isvc.TaskQueue, false, nil, unifiedClient, s3Client)
208218
interpreter.Start()
209219
default:
210220
rawLog.Fatalf("Invalid service: %v", svcName)
@@ -220,12 +230,17 @@ func launchCadenceService(
220230
closeFunc func(),
221231
logger log.Logger,
222232
) {
233+
s3Client := CreateS3Client(config, context.Background())
223234
switch svcName {
224235
case serviceAPI:
225-
svc := api.NewService(config, unifiedClient, logger.WithTags(tag.Service(svcName)))
236+
svc := api.NewService(
237+
config, unifiedClient, logger.WithTags(tag.Service(svcName)),
238+
s3Client,
239+
config.Interpreter.Cadence.Domain+"/",
240+
)
226241
rawLog.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
227242
case serviceInterpreter:
228-
interpreter := cadence.NewInterpreterWorker(config, service, domain, isvc.TaskQueue, closeFunc, unifiedClient)
243+
interpreter := cadence.NewInterpreterWorker(config, service, domain, isvc.TaskQueue, closeFunc, unifiedClient, s3Client)
229244
interpreter.Start()
230245
default:
231246
rawLog.Fatalf("Invalid service: %v", svcName)
@@ -340,3 +355,78 @@ func newPrometheusScope(c prometheus.Configuration, logger log.Logger) tally.Sco
340355
logger.Info("prometheus metrics scope created")
341356
return scope
342357
}
358+
359+
func CreateS3Client(cfg config.Config, ctx context.Context) *s3.Client {
360+
361+
if !cfg.ExternalStorage.Enabled {
362+
return nil
363+
}
364+
365+
// get the first active storage
366+
var activeStorage *config.BlobStorageConfig
367+
for _, storage := range cfg.ExternalStorage.SupportedStorages {
368+
if storage.Status == config.StorageStatusActive {
369+
activeStorage = &storage
370+
break
371+
}
372+
}
373+
if activeStorage == nil {
374+
rawLog.Fatal("no active storage found")
375+
}
376+
377+
if activeStorage.StorageType != "s3" {
378+
rawLog.Fatal("only s3 is supported for external storage")
379+
}
380+
381+
// Create custom resolver for MinIO endpoint
382+
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
383+
384+
if service == s3.ServiceID {
385+
return aws.Endpoint{
386+
URL: activeStorage.S3Endpoint,
387+
HostnameImmutable: true,
388+
Source: aws.EndpointSourceCustom,
389+
}, nil
390+
}
391+
return aws.Endpoint{}, fmt.Errorf("unknown endpoint requested")
392+
})
393+
394+
// Load AWS config with custom credentials and endpoint
395+
cfg2, err := awsConfig.LoadDefaultConfig(ctx,
396+
awsConfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(activeStorage.S3AccessKey, activeStorage.S3SecretKey, "")),
397+
awsConfig.WithRegion(activeStorage.S3Region),
398+
awsConfig.WithEndpointResolverWithOptions(customResolver),
399+
)
400+
if err != nil {
401+
rawLog.Fatal("failed to load AWS config", tag.Error(err))
402+
}
403+
404+
// Create S3 client with path-style addressing (required for MinIO)
405+
client := s3.NewFromConfig(cfg2, func(o *s3.Options) {
406+
o.UsePathStyle = true
407+
})
408+
409+
createBucketIfNotExists(ctx, client, activeStorage.S3Bucket)
410+
411+
return client
412+
}
413+
414+
func createBucketIfNotExists(ctx context.Context, client *s3.Client, bucketName string) {
415+
// Check if bucket exists
416+
_, err := client.HeadBucket(ctx, &s3.HeadBucketInput{
417+
Bucket: aws.String(bucketName),
418+
})
419+
420+
if err != nil {
421+
// Bucket doesn't exist, create it
422+
_, err = client.CreateBucket(ctx, &s3.CreateBucketInput{
423+
Bucket: aws.String(bucketName),
424+
})
425+
if err != nil {
426+
rawLog.Fatal("failed to create bucket", tag.Error(err))
427+
}
428+
rawLog.Printf("bucket created successfully: %s", bucketName)
429+
} else {
430+
rawLog.Printf("bucket already exists: %s", bucketName)
431+
}
432+
}

config/config.go

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@ package config
22

33
import (
44
"fmt"
5+
"log"
6+
"os"
7+
"time"
8+
59
"github.com/indeedeng/iwf/gen/iwfidl"
610
"github.com/uber-go/tally/v4/prometheus"
711
temporalWorker "go.temporal.io/sdk/worker"
812
cadenceWorker "go.uber.org/cadence/worker"
913
"gopkg.in/yaml.v3"
10-
"log"
11-
"os"
12-
"time"
14+
)
15+
16+
const (
17+
StorageStatusActive = "active"
18+
StorageStatusInactive = "inactive"
1319
)
1420

1521
type (
@@ -20,6 +26,41 @@ type (
2026
Api ApiConfig `yaml:"api"`
2127
// Interpreter is the service behind, either Cadence or Temporal is required
2228
Interpreter Interpreter `yaml:"interpreter"`
29+
// ExternalStorage is the external storage config
30+
ExternalStorage ExternalStorageConfig `yaml:"externalStorage"`
31+
}
32+
33+
ExternalStorageConfig struct {
34+
Enabled bool `yaml:"enabled"`
35+
// ThresholdInBytes is the size threshold of encodedObject
36+
// that will be stored by external storage(picking the current active one)
37+
ThresholdInBytes int `yaml:"thresholdInBytes"`
38+
// SupportedStorages is the list of supported storage
39+
// Only one can be active, meaning the one that will be used for writing.
40+
// The non-active ones are for read only.
41+
SupportedStorages []BlobStorageConfig `yaml:"supportedStorages"`
42+
}
43+
44+
StorageStatus string
45+
46+
BlobStorageConfig struct {
47+
// Status means whether this storage is active for writing.
48+
// Only one of the supported storages can be active
49+
Status StorageStatus
50+
// StorageId is the id of the external storage, it's used to identify the external storage in the EncodedObject that is stored in the workflow history
51+
StorageId string `yaml:"storageId"`
52+
// StorageType is the type of the external storage, currently only s3 is supported
53+
StorageType string `yaml:"storageType"`
54+
// S3Endpoint is the endpoint of s3 service
55+
S3Endpoint string `yaml:"s3Endpoint"`
56+
// S3Bucket is the bucket name of the S3 storage
57+
S3Bucket string `yaml:"s3Bucket"`
58+
// S3Region is the region of the S3 storage
59+
S3Region string `yaml:"s3Region"`
60+
// S3AccessKey is the access key of the S3 storage
61+
S3AccessKey string `yaml:"s3AccessKey"`
62+
// S3SecretKey is the secret key of the S3 storage
63+
S3SecretKey string `yaml:"s3SecretKey"`
2364
}
2465

2566
ApiConfig struct {

config/development.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,15 @@ interpreter:
1919
timerType: histogram
2020
defaultWorkflowConfig:
2121
continueAsNewThreshold: 100
22+
externalStorage:
23+
enabled: true
24+
thresholdInBytes: 100000 # 100 KB
25+
supportedStorages:
26+
- status: active
27+
storageId: "local-dev-s3"
28+
storageType: "s3"
29+
s3Endpoint: "http://localhost:9000"
30+
s3Bucket: "iwf-test-bucket"
31+
s3Region: "us-east-1"
32+
s3AccessKey: "minioadmin"
33+
s3SecretKey: "minioadmin"

docker-compose/ci-cadence-dependencies.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,24 @@ services:
9898
volumes:
9999
- ./init-ci-cadence.sh:/etc/cadence/init-ci-cadence.sh
100100
entrypoint: sh -c "/etc/cadence/init-ci-cadence.sh"
101+
minio:
102+
container_name: minio
103+
image: minio/minio:${MINIO_VERSION:-latest}
104+
ports:
105+
- "9000:9000"
106+
- "9001:9001"
107+
environment:
108+
- MINIO_ROOT_USER=minioadmin
109+
- MINIO_ROOT_PASSWORD=minioadmin
110+
- MINIO_DOMAIN=minio
111+
command: server /data --console-address ":9001"
112+
networks:
113+
- testing-network
114+
healthcheck:
115+
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
116+
interval: 30s
117+
timeout: 20s
118+
retries: 3
101119
networks:
102120
testing-network:
103121
driver: bridge

docker-compose/ci-cadence-temporal-dependencies.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,24 @@ services:
147147
volumes:
148148
- ./init-ci-cadence.sh:/etc/cadence/init-ci-cadence.sh
149149
entrypoint: sh -c "/etc/cadence/init-ci-cadence.sh"
150+
minio:
151+
container_name: minio
152+
image: minio/minio:${MINIO_VERSION:-latest}
153+
ports:
154+
- "9000:9000"
155+
- "9001:9001"
156+
environment:
157+
- MINIO_ROOT_USER=minioadmin
158+
- MINIO_ROOT_PASSWORD=minioadmin
159+
- MINIO_DOMAIN=minio
160+
command: server /data --console-address ":9001"
161+
networks:
162+
- testing-network
163+
healthcheck:
164+
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
165+
interval: 30s
166+
timeout: 20s
167+
retries: 3
150168
networks:
151169
testing-network:
152170
driver: bridge

docker-compose/ci-temporal-dependencies.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,24 @@ services:
6666
volumes:
6767
- ./init-ci-temporal.sh:/etc/temporal/init-ci-temporal.sh
6868
entrypoint: sh -c "/etc/temporal/init-ci-temporal.sh"
69+
minio:
70+
container_name: minio
71+
image: minio/minio:${MINIO_VERSION:-latest}
72+
ports:
73+
- "9000:9000"
74+
- "9001:9001"
75+
environment:
76+
- MINIO_ROOT_USER=minioadmin
77+
- MINIO_ROOT_PASSWORD=minioadmin
78+
- MINIO_DOMAIN=minio
79+
command: server /data --console-address ":9001"
80+
networks:
81+
- testing-network
82+
healthcheck:
83+
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
84+
interval: 30s
85+
timeout: 20s
86+
retries: 3
6987
networks:
7088
testing-network:
7189
driver: bridge

docker-compose/integ-dependencies.yml

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,27 @@ services:
170170
- testing-network
171171
ports:
172172
- 8233:8080
173+
minio:
174+
container_name: minio
175+
image: minio/minio:${MINIO_VERSION:-latest}
176+
ports:
177+
- "9000:9000"
178+
- "9001:9001"
179+
environment:
180+
- MINIO_ROOT_USER=minioadmin
181+
- MINIO_ROOT_PASSWORD=minioadmin
182+
- MINIO_DOMAIN=minio
183+
command: server /data --console-address ":9001"
184+
networks:
185+
- testing-network
186+
healthcheck:
187+
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
188+
interval: 30s
189+
timeout: 20s
190+
retries: 3
173191
networks:
174192
testing-network:
175193
driver: bridge
176-
name: testing-network
194+
name: testing-network
195+
volumes:
196+
minio-data:

gen/iwfidl/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ Class | Method | HTTP request | Description
8080
*DefaultApi* | [**ApiV1WorkflowConfigUpdatePost**](docs/DefaultApi.md#apiv1workflowconfigupdatepost) | **Post** /api/v1/workflow/config/update | update the config of a workflow
8181
*DefaultApi* | [**ApiV1WorkflowDataobjectsGetPost**](docs/DefaultApi.md#apiv1workflowdataobjectsgetpost) | **Post** /api/v1/workflow/dataobjects/get | get workflow data objects aka data attributes
8282
*DefaultApi* | [**ApiV1WorkflowDataobjectsSetPost**](docs/DefaultApi.md#apiv1workflowdataobjectssetpost) | **Post** /api/v1/workflow/dataobjects/set | set workflow data objects aka data attributes
83+
*DefaultApi* | [**ApiV1WorkflowEncodedobjectLoadPost**](docs/DefaultApi.md#apiv1workflowencodedobjectloadpost) | **Post** /api/v1/workflow/encodedobject/load | load encoded object from storage
8384
*DefaultApi* | [**ApiV1WorkflowGetPost**](docs/DefaultApi.md#apiv1workflowgetpost) | **Post** /api/v1/workflow/get | get a workflow's status and results(if completed & requested)
8485
*DefaultApi* | [**ApiV1WorkflowGetWithWaitPost**](docs/DefaultApi.md#apiv1workflowgetwithwaitpost) | **Post** /api/v1/workflow/getWithWait | get a workflow's status and results(if completed & requested), wait if the workflow is still running
8586
*DefaultApi* | [**ApiV1WorkflowInternalDumpPost**](docs/DefaultApi.md#apiv1workflowinternaldumppost) | **Post** /api/v1/workflow/internal/dump | dump internal info of a workflow

0 commit comments

Comments
 (0)