Skip to content

Commit 03a86b6

Browse files
Add S3 list and delete API (#580)
1 parent e72a41f commit 03a86b6

File tree

4 files changed

+426
-15
lines changed

4 files changed

+426
-15
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ stressTestsTemporalNoSearch:
207207
$Q go test -v ./integ -repeat=10 -cadence=false -search=false | tee test.log
208208

209209
unitTests:
210-
$Q go test -v ./service/...
210+
@go test -v $(shell go list ./service/... | grep -v ./service/common/blobstore)
211211

212212
help:
213213
@# print help first, so it's visible

service/common/blobstore/interfaces.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,38 @@ func ValidateWorkflowId(workflowId string) error {
1717
return nil
1818
}
1919

20+
func MustExtractWorkflowId(workflowPath string) string {
21+
workflowId, err := ExtractWorkflowId(workflowPath)
22+
if err != nil {
23+
panic(err)
24+
}
25+
return workflowId
26+
}
27+
28+
func ExtractWorkflowId(workflowPath string) (string, error) {
29+
parts := strings.Split(workflowPath, "$")
30+
if len(parts) != 2 {
31+
return "", fmt.Errorf("invalid workflow path: %s", workflowPath)
32+
}
33+
return parts[1], nil
34+
}
35+
2036
type BlobStore interface {
2137
// WriteObject will write to the current active store
2238
// returns the active storeId
39+
// The final path pattern is pathPrefix + yyyymmdd$workflowId/uuid
40+
// But the returned path doesn't include pathPrefix, only yymmdd$workflowId/uuid
2341
WriteObject(ctx context.Context, workflowId, data string) (storeId, path string, err error)
24-
// ReadObject will read from the store by storeId
42+
// ReadObject will read from the store by storeId and path
43+
// The path should be the one returned from WriteObject, in format of yyyymmdd$workflowId/uuid
2544
ReadObject(ctx context.Context, storeId, path string) (string, error)
26-
// DeleteObjectPath will delete all the objects of the path
27-
DeleteObjectPath(ctx context.Context, storeId, path string) error
28-
// ListObjectPaths will list the paths of yyyymmdd$workflowId
29-
ListObjectPaths(ctx context.Context, input ListObjectPathsInput) (*ListObjectPathsOutput, error)
45+
// DeleteWorkflowObjects will delete all the objects of the workflowId
46+
// workflowPath is yyyymmdd$workflowId, where yymmdd is needed to compose the path
47+
DeleteWorkflowObjects(ctx context.Context, storeId, workflowPath string) error
48+
// ListWorkflowPaths will list the workflowPaths ( yyyymmdd$workflowId ) as CommonPrefixes from S3
49+
// It uses of delimiter "/" before the uuid to get all the CommonPrefixes
50+
// StartAfterYyyymmdd is the yyyymmdd to exclude the date when listing
51+
ListWorkflowPaths(ctx context.Context, input ListObjectPathsInput) (*ListObjectPathsOutput, error)
3052
// CountWorkflowObjectsForTesting is for testing ONLY.
3153
// count the number of S3 objects for this workflowId
3254
// Limitation:
@@ -37,11 +59,10 @@ type BlobStore interface {
3759

3860
type ListObjectPathsInput struct {
3961
StoreId string
40-
StartAfter string
4162
ContinuationToken *string
4263
}
4364

4465
type ListObjectPathsOutput struct {
4566
ContinuationToken *string
46-
Paths []string
67+
WorkflowPaths []string
4768
}

service/common/blobstore/store_impl.go

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/aws/aws-sdk-go-v2/aws"
1313
"github.com/aws/aws-sdk-go-v2/service/s3"
14+
"github.com/aws/aws-sdk-go-v2/service/s3/types"
1415
"github.com/google/uuid"
1516
"github.com/indeedeng/iwf/config"
1617
"github.com/indeedeng/iwf/service/common/log"
@@ -81,7 +82,7 @@ func (b *blobStoreImpl) WriteObject(ctx context.Context, workflowId, data string
8182
storeId = b.activeStorage.StorageId
8283
randomUuid := uuid.New().String()
8384
yyyymmdd := time.Now().Format("20060102")
84-
// yymmdd$workflowId/uuid
85+
// yyyymmdd$workflowId/uuid
8586
// Note: using $ here so that the listing can be much easier to implement for pagination
8687
path = fmt.Sprintf("%s$%s/%s", yyyymmdd, workflowId, randomUuid)
8788

@@ -155,12 +156,122 @@ func (b *blobStoreImpl) CountWorkflowObjectsForTesting(ctx context.Context, work
155156
return int64(len(result.Contents)), nil
156157
}
157158

158-
func (b *blobStoreImpl) DeleteObjectPath(ctx context.Context, storeId, path string) error {
159-
//TODO implement me
160-
panic("implement me")
159+
func (b *blobStoreImpl) DeleteWorkflowObjects(ctx context.Context, storeId, workflowPath string) error {
160+
storeConfig, ok := b.supportedStore[storeId]
161+
if !ok {
162+
return errors.New("store not found for " + storeId)
163+
}
164+
165+
// Construct the prefix for all objects of this workflow
166+
prefix := fmt.Sprintf("%s%s/", b.pathPrefix, workflowPath)
167+
168+
// Paginate through all objects and delete them in batches
169+
var continuationToken *string
170+
for {
171+
listInput := &s3.ListObjectsV2Input{
172+
Bucket: aws.String(storeConfig.S3Bucket),
173+
Prefix: aws.String(prefix),
174+
}
175+
176+
if continuationToken != nil {
177+
listInput.ContinuationToken = continuationToken
178+
}
179+
180+
listResult, err := b.s3Client.ListObjectsV2(ctx, listInput)
181+
if err != nil {
182+
return fmt.Errorf("failed to list objects for deletion: %w", err)
183+
}
184+
185+
// If no objects found, we're done
186+
if len(listResult.Contents) == 0 {
187+
break
188+
}
189+
190+
// Prepare objects for batch deletion
191+
var objectsToDelete []types.ObjectIdentifier
192+
for _, obj := range listResult.Contents {
193+
if obj.Key != nil {
194+
objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
195+
Key: obj.Key,
196+
})
197+
}
198+
}
199+
200+
// Delete objects in batch
201+
if len(objectsToDelete) > 0 {
202+
deleteResult, err := b.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
203+
Bucket: aws.String(storeConfig.S3Bucket),
204+
Delete: &types.Delete{
205+
Objects: objectsToDelete,
206+
Quiet: aws.Bool(true), // Don't return successful deletions
207+
},
208+
})
209+
if err != nil {
210+
return fmt.Errorf("failed to delete objects: %w", err)
211+
}
212+
213+
// Check for any delete errors
214+
if len(deleteResult.Errors) > 0 {
215+
var errorMsgs []string
216+
for _, delErr := range deleteResult.Errors {
217+
if delErr.Key != nil && delErr.Code != nil && delErr.Message != nil {
218+
errorMsgs = append(errorMsgs, fmt.Sprintf("key=%s, code=%s, message=%s",
219+
*delErr.Key, *delErr.Code, *delErr.Message))
220+
}
221+
}
222+
return fmt.Errorf("some objects failed to delete: %s", strings.Join(errorMsgs, "; "))
223+
}
224+
}
225+
226+
// Check if there are more objects to process
227+
if listResult.IsTruncated == nil || !*listResult.IsTruncated {
228+
break
229+
}
230+
continuationToken = listResult.NextContinuationToken
231+
}
232+
233+
return nil
161234
}
162235

163-
func (b *blobStoreImpl) ListObjectPaths(ctx context.Context, input ListObjectPathsInput) (*ListObjectPathsOutput, error) {
164-
//TODO implement me
165-
panic("implement me")
236+
func (b *blobStoreImpl) ListWorkflowPaths(ctx context.Context, input ListObjectPathsInput) (*ListObjectPathsOutput, error) {
237+
storeConfig, ok := b.supportedStore[input.StoreId]
238+
if !ok {
239+
return nil, errors.New("store not found for " + input.StoreId)
240+
}
241+
242+
listInput := &s3.ListObjectsV2Input{
243+
Bucket: aws.String(storeConfig.S3Bucket),
244+
Prefix: aws.String(b.pathPrefix),
245+
Delimiter: aws.String("/"),
246+
}
247+
248+
// Set continuation token if provided
249+
if input.ContinuationToken != nil {
250+
listInput.ContinuationToken = input.ContinuationToken
251+
}
252+
253+
result, err := b.s3Client.ListObjectsV2(ctx, listInput)
254+
if err != nil {
255+
return nil, err
256+
}
257+
258+
// Extract workflow paths from common prefixes
259+
workflowPaths := make([]string, 0, len(result.CommonPrefixes))
260+
for _, commonPrefix := range result.CommonPrefixes {
261+
if commonPrefix.Prefix != nil {
262+
// Remove the pathPrefix to get the workflow path (yyyymmdd$workflowId)
263+
prefixStr := *commonPrefix.Prefix
264+
if strings.HasPrefix(prefixStr, b.pathPrefix) {
265+
workflowPath := strings.TrimPrefix(prefixStr, b.pathPrefix)
266+
// Remove trailing "/" if present
267+
workflowPath = strings.TrimSuffix(workflowPath, "/")
268+
workflowPaths = append(workflowPaths, workflowPath)
269+
}
270+
}
271+
}
272+
273+
return &ListObjectPathsOutput{
274+
ContinuationToken: result.NextContinuationToken,
275+
WorkflowPaths: workflowPaths,
276+
}, nil
166277
}

0 commit comments

Comments
 (0)