Skip to content

Commit 24abcb8

Browse files
Refactor s3 to general blobStore (#576)
1 parent 9a9cb90 commit 24abcb8

File tree

14 files changed

+183
-128
lines changed

14 files changed

+183
-128
lines changed

cmd/server/iwf/iwf.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"context"
2525
"crypto/tls"
2626
"fmt"
27+
"github.com/indeedeng/iwf/service/common/blobstore"
2728
rawLog "log"
2829
"strings"
2930
"sync"
@@ -205,16 +206,21 @@ func launchTemporalService(
205206
logger log.Logger,
206207
) {
207208
s3Client := CreateS3Client(config, context.Background())
209+
blobStore := blobstore.NewBlobStore(
210+
s3Client,
211+
config.Interpreter.Temporal.Namespace,
212+
config.ExternalStorage,
213+
logger,
214+
)
208215
switch svcName {
209216
case serviceAPI:
210217
svc := api.NewService(
211218
config, unifiedClient, logger.WithTags(tag.Service(svcName)),
212-
s3Client,
213-
config.Interpreter.Temporal.Namespace+"/",
219+
blobStore,
214220
)
215221
rawLog.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
216222
case serviceInterpreter:
217-
interpreter := temporal.NewInterpreterWorker(config, temporalClient, isvc.TaskQueue, false, nil, unifiedClient, s3Client)
223+
interpreter := temporal.NewInterpreterWorker(config, temporalClient, isvc.TaskQueue, false, nil, unifiedClient, blobStore)
218224
interpreter.Start()
219225
default:
220226
rawLog.Fatalf("Invalid service: %v", svcName)
@@ -231,16 +237,21 @@ func launchCadenceService(
231237
logger log.Logger,
232238
) {
233239
s3Client := CreateS3Client(config, context.Background())
240+
blobStore := blobstore.NewBlobStore(
241+
s3Client,
242+
config.Interpreter.Cadence.Domain,
243+
config.ExternalStorage,
244+
logger,
245+
)
234246
switch svcName {
235247
case serviceAPI:
236248
svc := api.NewService(
237249
config, unifiedClient, logger.WithTags(tag.Service(svcName)),
238-
s3Client,
239-
config.Interpreter.Cadence.Domain+"/",
250+
blobStore,
240251
)
241252
rawLog.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
242253
case serviceInterpreter:
243-
interpreter := cadence.NewInterpreterWorker(config, service, domain, isvc.TaskQueue, closeFunc, unifiedClient, s3Client)
254+
interpreter := cadence.NewInterpreterWorker(config, service, domain, isvc.TaskQueue, closeFunc, unifiedClient, blobStore)
244255
interpreter.Start()
245256
default:
246257
rawLog.Fatalf("Invalid service: %v", svcName)

config/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ const (
1818
StorageStatusInactive = "inactive"
1919
)
2020

21+
const (
22+
StorageTypeS3 = "s3"
23+
)
24+
2125
type (
2226
Config struct {
2327
// Log is the logging config
@@ -42,6 +46,7 @@ type (
4246
}
4347

4448
StorageStatus string
49+
StorageType string
4550

4651
BlobStorageConfig struct {
4752
// Status means whether this storage is active for writing.
@@ -50,7 +55,7 @@ type (
5055
// StorageId is the id of the external storage, it's used to identify the external storage in the EncodedObject that is stored in the workflow history
5156
StorageId string `yaml:"storageId"`
5257
// StorageType is the type of the external storage, currently only s3 is supported
53-
StorageType string `yaml:"storageType"`
58+
StorageType StorageType `yaml:"storageType"`
5459
// S3Endpoint is the endpoint of s3 service
5560
S3Endpoint string `yaml:"s3Endpoint"`
5661
// S3Bucket is the bucket name of the S3 storage

integ/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
3636
SupportedStorages: []config.BlobStorageConfig{
3737
{
3838
Status: config.StorageStatusActive,
39-
StorageId: "s3",
40-
StorageType: "s3",
39+
StorageId: "s3-store-id",
40+
StorageType: config.StorageTypeS3,
4141
S3Endpoint: "http://localhost:9000",
4242
S3Bucket: "iwf-test-bucket",
4343
S3Region: "us-east-1",

integ/s3_wf_start_input_test.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,18 +82,6 @@ func doTestWorkflowWithS3StartInput(t *testing.T, backendType service.BackendTyp
8282
assertions := assert.New(t)
8383

8484
_, history := wfHandler.GetTestResult()
85-
// assertions.Equalf(map[string]interface{}{
86-
// "S1_start": 1,
87-
// "S1_decide": 1,
88-
// "S1_start_input": iwfidl.EncodedObject{
89-
// Encoding: iwfidl.PtrString("json"),
90-
// Data: iwfidl.PtrString("\"12345678901\""),
91-
// },
92-
// "S1_decide_input": iwfidl.EncodedObject{
93-
// Encoding: iwfidl.PtrString("json"),
94-
// Data: iwfidl.PtrString("\"12345678901\""),
95-
// },
96-
// }, history, "headers test fail, %v", history)
9785

9886
assertions.Equal(history["S1_start_input"], iwfidl.EncodedObject{
9987
Encoding: iwfidl.PtrString("json"),

integ/util.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package integ
33
import (
44
"context"
55
"fmt"
6+
"github.com/indeedeng/iwf/service/common/blobstore"
67
"log"
78
"net/http"
89
"testing"
@@ -124,9 +125,10 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
124125

125126
testCfg := createTestConfig(config)
126127
s3Client := iwf.CreateS3Client(testCfg, context.Background())
128+
store := blobstore.NewBlobStore(s3Client, testNamespace, testCfg.ExternalStorage, logger)
127129

128130
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, &testCfg.Api.QueryWorkflowFailedRetryPolicy)
129-
iwfService := api.NewService(testCfg, uclient, logger, s3Client, "test/") // TODO pass s3 client for integ test
131+
iwfService := api.NewService(testCfg, uclient, logger, store)
130132
iwfServer := &http.Server{
131133
Addr: ":" + testIwfServerPort,
132134
Handler: iwfService,
@@ -138,7 +140,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
138140
}()
139141

140142
// start iwf interpreter worker
141-
interpreter := temporal.NewInterpreterWorker(testCfg, temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient, s3Client)
143+
interpreter := temporal.NewInterpreterWorker(testCfg, temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient, store)
142144
if *disableStickyCache {
143145
interpreter.StartWithStickyCacheDisabledForTest()
144146
} else {
@@ -163,9 +165,10 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
163165

164166
testCfg := createTestConfig(config)
165167
s3Client := iwf.CreateS3Client(testCfg, context.Background())
168+
store := blobstore.NewBlobStore(s3Client, iwf.DefaultCadenceDomain, testCfg.ExternalStorage, logger)
166169

167170
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &testCfg.Api.QueryWorkflowFailedRetryPolicy)
168-
iwfService := api.NewService(testCfg, uclient, logger, s3Client, "test/") // pass in for integ tests
171+
iwfService := api.NewService(testCfg, uclient, logger, store)
169172
iwfServer := &http.Server{
170173
Addr: ":" + testIwfServerPort,
171174
Handler: iwfService,
@@ -177,7 +180,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
177180
}()
178181

179182
// start iwf interpreter worker
180-
interpreter := cadence.NewInterpreterWorker(testCfg, serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient, s3Client)
183+
interpreter := cadence.NewInterpreterWorker(testCfg, serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient, store)
181184
if *disableStickyCache {
182185
interpreter.StartWithStickyCacheDisabledForTest()
183186
} else {

service/api/handler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package api
22

33
import (
4-
"github.com/aws/aws-sdk-go-v2/service/s3"
54
"github.com/indeedeng/iwf/config"
5+
"github.com/indeedeng/iwf/service/common/blobstore"
66
"net/http"
77

88
"github.com/indeedeng/iwf/service"
@@ -21,8 +21,8 @@ type handler struct {
2121
logger log.Logger
2222
}
2323

24-
func newHandler(config config.Config, client uclient.UnifiedClient, logger log.Logger, s3Client *s3.Client, s3PathPrefix string) *handler {
25-
svc, err := NewApiService(config, client, service.TaskQueue, logger, s3Client, s3PathPrefix)
24+
func newHandler(config config.Config, client uclient.UnifiedClient, logger log.Logger, store blobstore.BlobStore) *handler {
25+
svc, err := NewApiService(config, client, service.TaskQueue, logger, store)
2626
if err != nil {
2727
panic(err)
2828
}

service/api/routers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package api
22

33
import (
4-
"github.com/aws/aws-sdk-go-v2/service/s3"
54
"github.com/gin-gonic/gin"
65
"github.com/indeedeng/iwf/config"
76
uclient "github.com/indeedeng/iwf/service/client"
7+
"github.com/indeedeng/iwf/service/common/blobstore"
88
"github.com/indeedeng/iwf/service/common/log"
99
)
1010

@@ -29,10 +29,10 @@ const WorkflowRpcApiPath = "/api/v1/workflow/rpc"
2929
const InfoHealthCheck = "/info/healthcheck"
3030

3131
// NewService returns a new router.
32-
func NewService(config config.Config, client uclient.UnifiedClient, logger log.Logger, s3Client *s3.Client, s3PathPrefix string) *gin.Engine {
32+
func NewService(config config.Config, client uclient.UnifiedClient, logger log.Logger, store blobstore.BlobStore) *gin.Engine {
3333
router := gin.Default()
3434

35-
handler := newHandler(config, client, logger, s3Client, s3PathPrefix)
35+
handler := newHandler(config, client, logger, store)
3636

3737
router.GET("/", handler.index)
3838
router.POST(WorkflowStartApiPath, handler.apiV1WorkflowStart)

service/api/service.go

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ package api
33
import (
44
"context"
55
"fmt"
6+
"github.com/indeedeng/iwf/service/common/blobstore"
67
"net/http"
78
"os"
89
"strings"
910
"time"
1011

11-
"github.com/aws/aws-sdk-go-v2/aws"
12-
"github.com/aws/aws-sdk-go-v2/service/s3"
1312
"github.com/google/uuid"
1413
"github.com/indeedeng/iwf/config"
1514
"github.com/indeedeng/iwf/service/common/event"
@@ -31,39 +30,26 @@ import (
3130
)
3231

3332
type serviceImpl struct {
34-
client uclient.UnifiedClient
35-
s3Client *s3.Client
36-
s3PathPrefix string // it's recommended to be the Temporal namespace or Cadence domain + "/"
37-
activeStorage *config.BlobStorageConfig
38-
taskQueue string
39-
logger log.Logger
40-
config config.Config
33+
client uclient.UnifiedClient
34+
store blobstore.BlobStore
35+
taskQueue string
36+
logger log.Logger
37+
config config.Config
4138
}
4239

4340
func (s *serviceImpl) Close() {
4441
s.client.Close()
4542
}
4643

4744
func NewApiService(
48-
cfg config.Config, client uclient.UnifiedClient, taskQueue string, logger log.Logger, s3Client *s3.Client, s3PathPrefix string,
45+
cfg config.Config, client uclient.UnifiedClient, taskQueue string, logger log.Logger, store blobstore.BlobStore,
4946
) (ApiService, error) {
50-
// get the first active storage
51-
var activeStorage *config.BlobStorageConfig
52-
for _, storage := range cfg.ExternalStorage.SupportedStorages {
53-
if storage.Status == config.StorageStatusActive {
54-
activeStorage = &storage
55-
break
56-
}
57-
}
58-
5947
return &serviceImpl{
60-
client: client,
61-
s3Client: s3Client,
62-
s3PathPrefix: s3PathPrefix,
63-
activeStorage: activeStorage,
64-
taskQueue: taskQueue,
65-
logger: logger,
66-
config: cfg,
48+
client: client,
49+
store: store,
50+
taskQueue: taskQueue,
51+
logger: logger,
52+
config: cfg,
6753
}, nil
6854
}
6955

@@ -172,18 +158,15 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
172158
// 2. if it is, upload the input to S3
173159
uuid := uuid.New().String()
174160
yyyymmdd := time.Now().Format("20060102")
175-
// namespace/yymmdd/workflowId/uuid
176-
objectKey := fmt.Sprintf("%s%s/%s/%s", s.s3PathPrefix, yyyymmdd, req.GetWorkflowId(), uuid)
177-
err := putObject(ctx, s.s3Client,
178-
s.activeStorage.S3Bucket,
179-
objectKey,
180-
*input.StateInput.Data)
161+
// yymmdd/workflowId/uuid
162+
objectKey := fmt.Sprintf("%s/%s/%s", yyyymmdd, req.GetWorkflowId(), uuid)
163+
storeId, err := s.store.WriteObject(ctx, objectKey, input.StateInput.GetData())
181164
if err != nil {
182165
return nil, s.handleError(err, WorkflowStartApiPath, req.GetWorkflowId())
183166
}
184167
// 3. replace the input with the S3 object
185168
newStateInput := iwfidl.EncodedObject{
186-
ExtStoreId: iwfidl.PtrString(s.activeStorage.StorageId),
169+
ExtStoreId: iwfidl.PtrString(storeId),
187170
ExtPath: iwfidl.PtrString(objectKey),
188171
Encoding: iwfidl.PtrString(*input.StateInput.Encoding),
189172
}
@@ -225,16 +208,6 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
225208
}, nil
226209
}
227210

228-
func putObject(ctx context.Context, client *s3.Client, bucketName string, key, content string) error {
229-
_, err := client.PutObject(ctx, &s3.PutObjectInput{
230-
Bucket: aws.String(bucketName),
231-
Key: aws.String(key),
232-
Body: strings.NewReader(content),
233-
ContentType: aws.String("application/json"),
234-
})
235-
return err
236-
}
237-
238211
func overrideWorkflowConfig(configOverride iwfidl.WorkflowConfig, workflowConfig *iwfidl.WorkflowConfig) {
239212
if configOverride.ExecutingStateIdMode != nil {
240213
workflowConfig.ExecutingStateIdMode = configOverride.ExecutingStateIdMode
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package blobstore
2+
3+
import "context"
4+
5+
type BlobStore interface {
6+
// WriteObject will write to the current active store
7+
// returns the active storeId
8+
WriteObject(ctx context.Context, path, data string) (storeId string, err error)
9+
// ReadObject will read from the store by storeId
10+
ReadObject(ctx context.Context, storeId, path string) (string, error)
11+
}

0 commit comments

Comments
 (0)