Skip to content

Commit ed9f8c2

Browse files
ktropsKatie Atrops
andauthored
IWF-1153: checking all S3 objects aginst Temporal (#598)
Co-authored-by: Katie Atrops <katiea@indeed.com>
1 parent b357107 commit ed9f8c2

File tree

4 files changed

+26
-26
lines changed

4 files changed

+26
-26
lines changed

service/interpreter/activityImpl.go

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -560,54 +560,52 @@ func doLoadFromExternalStorage(ctx context.Context, input *iwfidl.EncodedObject)
560560

561561
func CleanupBlobStore(
562562
ctx context.Context, backendType service.BackendType, storeId string,
563-
) error {
563+
) (int, error) {
564564
store := env.GetBlobStore()
565565
provider := interfaces.GetActivityProviderByType(backendType)
566566
logger := provider.GetLogger(ctx)
567567
logger.Info("CleanupBlobStore started")
568-
cfg := env.GetSharedConfig()
569-
minAgeForCleanupCheckInDays := cfg.ExternalStorage.MinAgeForCleanupCheckInDays
570-
stopChecingkUnixSeconds := time.Now().Unix() - int64(minAgeForCleanupCheckInDays)*24*3600
571-
var continueToken *string
572568

573569
client := env.GetUnifiedClient()
570+
571+
var continueToken *string
572+
var totalDeleted int
573+
574574
for {
575575
listOutput, err := store.ListWorkflowPaths(ctx, blobstore.ListObjectPathsInput{
576576
StoreId: storeId,
577577
ContinuationToken: continueToken,
578578
})
579579
if err != nil {
580-
return err
580+
return totalDeleted, err
581581
}
582582
continueToken = listOutput.ContinuationToken
583583
for _, workflowPath := range listOutput.WorkflowPaths {
584-
currentTimestamp, valid := blobstore.ExtractYyyymmddToUnixSeconds(workflowPath)
584+
_, valid := blobstore.ExtractYyyymmddToUnixSeconds(workflowPath)
585585
if !valid {
586586
logger.Info("CleanupBlobStore skipped workflow path", "path", workflowPath)
587587
continue
588588
}
589-
if currentTimestamp < stopChecingkUnixSeconds {
590-
logger.Info("CleanupBlobStore stopped checking at", "currentTimestamp", currentTimestamp, "stopChecingkUnixSeconds", stopChecingkUnixSeconds)
591-
break
592-
}
593589

590+
// Check if workflow still exists in Temporal
591+
// We must always check because workflows can run indefinitely,
592+
// and the retention period only applies after workflow closure
594593
workflowId := blobstore.MustExtractWorkflowId(workflowPath)
595594
_, err := client.DescribeWorkflowExecution(ctx, workflowId, "", nil)
596595
if client.IsNotFoundError(err) {
597-
// this means workflow has been deleted from the history
596+
// Workflow has been removed from Temporal, safe to delete S3 objects
598597
err = store.DeleteWorkflowObjects(ctx, storeId, workflowPath)
599598
if err != nil {
600599
logger.Error("CleanupBlobStore failed to delete workflow objects", "workflowPath", workflowPath, "error", err)
601-
return err
602-
} else {
603-
logger.Info("CleanupBlobStore deleted workflow objects", "workflowPath", workflowPath)
604-
}
605-
} else {
606-
if err != nil {
607-
logger.Error("CleanupBlobStore failed to delete workflow objects", "workflowPath", workflowPath, "error", err)
608-
return err
600+
return totalDeleted, err
609601
}
602+
totalDeleted++
603+
logger.Info("CleanupBlobStore deleted workflow objects", "workflowPath", workflowPath)
604+
} else if err != nil {
605+
logger.Error("CleanupBlobStore failed to describe workflow", "workflowPath", workflowPath, "error", err)
606+
return totalDeleted, err
610607
}
608+
// If no error, workflow still exists (open or within retention), don't delete
611609

612610
// this is a long running activity
613611
// using record heartbeat so that it won't timeout at startToClose timeout
@@ -617,5 +615,6 @@ func CleanupBlobStore(
617615
break
618616
}
619617
}
620-
return nil
618+
logger.Info("CleanupBlobStore completed", "totalDeleted", totalDeleted)
619+
return totalDeleted, nil
621620
}

service/interpreter/cadence/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ func WaitforStateCompletionWorkflow(ctx workflow.Context) (*service.WaitForState
1515
return interpreter.WaitForStateCompletionWorkflowImpl(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider())
1616
}
1717

18-
func BlobStoreCleanup(ctx workflow.Context, storeId string) error {
18+
func BlobStoreCleanup(ctx workflow.Context, storeId string) (int, error) {
1919
return interpreter.BlobStoreCleanup(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider(), storeId)
2020
}

service/interpreter/temporal/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ func WaitforStateCompletionWorkflow(ctx workflow.Context) (*service.WaitForState
1818
return interpreter.WaitForStateCompletionWorkflowImpl(interfaces.NewUnifiedContext(ctx), newTemporalWorkflowProvider())
1919
}
2020

21-
func BlobStoreCleanup(ctx workflow.Context, storeId string) error {
21+
func BlobStoreCleanup(ctx workflow.Context, storeId string) (int, error) {
2222
return interpreter.BlobStoreCleanup(interfaces.NewUnifiedContext(ctx), newTemporalWorkflowProvider(), storeId)
2323
}

service/interpreter/workflowImpl.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,14 +1113,15 @@ func WaitForStateCompletionWorkflowImpl(
11131113

11141114
func BlobStoreCleanup(
11151115
ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, storeId string,
1116-
) error {
1116+
) (int, error) {
11171117
activityOptions := interfaces.ActivityOptions{
11181118
StartToCloseTimeout: 24 * time.Hour,
11191119
RetryPolicy: &iwfidl.RetryPolicy{
11201120
MaximumAttempts: iwfidl.PtrInt32(10),
11211121
},
11221122
}
11231123
ctx = provider.WithActivityOptions(ctx, activityOptions)
1124-
// the workflow simply execute the long running activity :)
1125-
return provider.ExecuteActivity(nil, false, ctx, CleanupBlobStore, provider.GetBackendType(), storeId)
1124+
var totalDeleted int
1125+
err := provider.ExecuteActivity(&totalDeleted, false, ctx, CleanupBlobStore, provider.GetBackendType(), storeId)
1126+
return totalDeleted, err
11261127
}

0 commit comments

Comments
 (0)