Skip to content

Commit e72a41f

Browse files
Add S3 count API for testing verification (#579)
1 parent 994950b commit e72a41f

File tree

5 files changed

+45
-10
lines changed

5 files changed

+45
-10
lines changed

integ/s3_wf_init_data_attributes_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func doTestWorkflowWithS3InitDataAttributes(t *testing.T, backendType service.Ba
7676

7777
wfInput := &iwfidl.EncodedObject{
7878
Encoding: iwfidl.PtrString("json"),
79-
Data: iwfidl.PtrString("\"test-input\""),
79+
Data: iwfidl.PtrString("test"),
8080
}
8181

8282
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
@@ -131,4 +131,8 @@ func doTestWorkflowWithS3InitDataAttributes(t *testing.T, backendType service.Ba
131131
assertions.Equal(history["S2_decide_attr2_data"], *s3_init_data_attributes.TestDataAttributeVal2.Data, "S2_decide attr2 data should match initial value")
132132
assertions.Equal(history["S2_decide_attr3_data"], *s3_init_data_attributes.TestDataAttributeVal3.Data, "S2_decide attr3 data should match initial value")
133133
assertions.Equal(history["S2_decide_validation_pass"], true, "S2_decide validation should pass - all data attributes match initial values exactly")
134+
135+
objectCount, err := globalBlobStore.CountWorkflowObjectsForTesting(context.Background(), wfId)
136+
assertions.Nil(err)
137+
assertions.Equal(int64(2), objectCount)
134138
}

integ/s3_wf_start_input_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,8 @@ func doTestWorkflowWithS3StartInput(t *testing.T, backendType service.BackendTyp
9494

9595
assertions.Equal(history["S1_start"], int64(1), "S1_start is not equal")
9696
assertions.Equal(history["S1_decide"], int64(1), "S1_decide is not equal")
97+
98+
objectCount, err := globalBlobStore.CountWorkflowObjectsForTesting(context.Background(), wfId)
99+
assertions.Nil(err)
100+
assertions.Equal(int64(1), objectCount)
97101
}

integ/util.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ func startIwfServiceWithClient(backendType service.BackendType) (uclient uclient
110110
//var integCadenceUclientCached api.UnifiedClient
111111
//var integTemporalUclientCached api.UnifiedClient
112112

113+
// globalBlobStore is a global var in this package for testing
114+
var globalBlobStore blobstore.BlobStore
115+
113116
func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.UnifiedClient, closeFunc func()) {
114117
if config.BackendType == service.BackendTypeTemporal {
115118
dataConverter := converter.GetDefaultDataConverter()
@@ -125,10 +128,10 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
125128

126129
testCfg := createTestConfig(config)
127130
s3Client := iwf.CreateS3Client(testCfg, context.Background())
128-
store := blobstore.NewBlobStore(s3Client, testNamespace, testCfg.ExternalStorage, logger, client.MetricsNopHandler)
131+
globalBlobStore = blobstore.NewBlobStore(s3Client, testNamespace, testCfg.ExternalStorage, logger, client.MetricsNopHandler)
129132

130133
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, &testCfg.Api.QueryWorkflowFailedRetryPolicy)
131-
iwfService := api.NewService(testCfg, uclient, logger, store)
134+
iwfService := api.NewService(testCfg, uclient, logger, globalBlobStore)
132135
iwfServer := &http.Server{
133136
Addr: ":" + testIwfServerPort,
134137
Handler: iwfService,
@@ -140,7 +143,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
140143
}()
141144

142145
// start iwf interpreter worker
143-
interpreter := temporal.NewInterpreterWorker(testCfg, temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient, store)
146+
interpreter := temporal.NewInterpreterWorker(testCfg, temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient, globalBlobStore)
144147
if *disableStickyCache {
145148
interpreter.StartWithStickyCacheDisabledForTest()
146149
} else {
@@ -165,10 +168,10 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
165168

166169
testCfg := createTestConfig(config)
167170
s3Client := iwf.CreateS3Client(testCfg, context.Background())
168-
store := blobstore.NewBlobStore(s3Client, iwf.DefaultCadenceDomain, testCfg.ExternalStorage, logger, client.MetricsNopHandler)
171+
globalBlobStore = blobstore.NewBlobStore(s3Client, iwf.DefaultCadenceDomain, testCfg.ExternalStorage, logger, client.MetricsNopHandler)
169172

170173
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &testCfg.Api.QueryWorkflowFailedRetryPolicy)
171-
iwfService := api.NewService(testCfg, uclient, logger, store)
174+
iwfService := api.NewService(testCfg, uclient, logger, globalBlobStore)
172175
iwfServer := &http.Server{
173176
Addr: ":" + testIwfServerPort,
174177
Handler: iwfService,
@@ -180,7 +183,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
180183
}()
181184

182185
// start iwf interpreter worker
183-
interpreter := cadence.NewInterpreterWorker(testCfg, serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient, store)
186+
interpreter := cadence.NewInterpreterWorker(testCfg, serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient, globalBlobStore)
184187
if *disableStickyCache {
185188
interpreter.StartWithStickyCacheDisabledForTest()
186189
} else {

service/common/blobstore/interfaces.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ type BlobStore interface {
2727
DeleteObjectPath(ctx context.Context, storeId, path string) error
2828
// ListObjectPaths will list the paths of yyyymmdd$workflowId
2929
ListObjectPaths(ctx context.Context, input ListObjectPathsInput) (*ListObjectPathsOutput, error)
30+
// CountWorkflowObjectsForTesting is for testing ONLY.
31+
// count the number of S3 objects for this workflowId
32+
// Limitation:
33+
// 1. It doesn't count across two days(so expect test to fail if you happen to run the test across day boundary :)
34+
// 2. Only count less than 1000 objects(because it only make one API call to S3 which return at most 1000 objects)
35+
CountWorkflowObjectsForTesting(ctx context.Context, workflowId string) (int64, error)
3036
}
3137

3238
type ListObjectPathsInput struct {

service/common/blobstore/store_impl.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"io"
9+
"strings"
10+
"time"
11+
812
"github.com/aws/aws-sdk-go-v2/aws"
913
"github.com/aws/aws-sdk-go-v2/service/s3"
1014
"github.com/google/uuid"
1115
"github.com/indeedeng/iwf/config"
1216
"github.com/indeedeng/iwf/service/common/log"
1317
"go.temporal.io/sdk/client"
14-
"io"
15-
"strings"
16-
"time"
1718
)
1819

1920
type blobStoreImpl struct {
@@ -137,6 +138,23 @@ func getObject(ctx context.Context, client *s3.Client, bucketName, key string) (
137138
return buf.String(), nil
138139
}
139140

141+
func (b *blobStoreImpl) CountWorkflowObjectsForTesting(ctx context.Context, workflowId string) (int64, error) {
142+
// Create the prefix to match objects for this workflowId for today
143+
yyyymmdd := time.Now().Format("20060102")
144+
prefix := fmt.Sprintf("%s%s$%s/", b.pathPrefix, yyyymmdd, workflowId)
145+
146+
// List objects with the prefix (limited to 1000 objects as documented)
147+
result, err := b.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
148+
Bucket: aws.String(b.activeStorage.S3Bucket),
149+
Prefix: aws.String(prefix),
150+
})
151+
if err != nil {
152+
return 0, err
153+
}
154+
155+
return int64(len(result.Contents)), nil
156+
}
157+
140158
func (b *blobStoreImpl) DeleteObjectPath(ctx context.Context, storeId, path string) error {
141159
//TODO implement me
142160
panic("implement me")

0 commit comments

Comments
 (0)