Skip to content

Commit 7ad7240

Browse files
authored
Merge pull request #278 from cschleiden/explicitly-remove-workflow-from-cache
Explicitly evict finished workflow instances from cache
2 parents d8ce2bd + f06fe8e commit 7ad7240

File tree

4 files changed

+21
-0
lines changed

4 files changed

+21
-0
lines changed

backend/test/e2e.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,11 @@ func (*noopWorkflowExecutorCache) Get(ctx context.Context, instance *core.Workfl
724724
return nil, false, nil
725725
}
726726

727+
// Evict implements workflow.ExecutorCache
728+
func (*noopWorkflowExecutorCache) Evict(ctx context.Context, instance *core.WorkflowInstance) error {
729+
return nil
730+
}
731+
727732
// StartEviction implements workflow.ExecutorCache
728733
func (*noopWorkflowExecutorCache) StartEviction(ctx context.Context) {
729734
}

internal/worker/workflow.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ func (wtw *WorkflowTaskWorker) Complete(ctx context.Context, result *workflow.Ex
5959
metrickeys.ContinuedAsNew: fmt.Sprint(state == core.WorkflowInstanceStateContinuedAsNew),
6060
}, 1)
6161
}
62+
63+
// Workflow is finished, explicitly evict from cache (if one is used)
64+
if wtw.cache != nil {
65+
if err := wtw.cache.Evict(ctx, t.WorkflowInstance); err != nil {
66+
wtw.logger.ErrorContext(ctx, "could not evict workflow executor from cache", "error", err)
67+
}
68+
}
6269
}
6370

6471
wtw.backend.Metrics().Counter(metrickeys.ActivityTaskScheduled, metrics.Tags{}, int64(len(result.ActivityEvents)))

internal/workflow/cache.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
type ExecutorCache interface {
1010
Store(ctx context.Context, instance *core.WorkflowInstance, workflow WorkflowExecutor) error
11+
Evict(ctx context.Context, instance *core.WorkflowInstance) error
1112
Get(ctx context.Context, instance *core.WorkflowInstance) (WorkflowExecutor, bool, error)
1213
StartEviction(ctx context.Context)
1314
}

internal/workflow/cache/cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ func (lc *LruCache) Store(ctx context.Context, instance *core.WorkflowInstance,
6060
return nil
6161
}
6262

63+
func (lc *LruCache) Evict(ctx context.Context, instance *core.WorkflowInstance) error {
64+
lc.c.Delete(getKey(instance))
65+
66+
lc.mc.Gauge(metrickeys.WorkflowInstanceCacheSize, metrics.Tags{}, int64(lc.c.Len()))
67+
68+
return nil
69+
}
70+
6371
func (lc *LruCache) StartEviction(ctx context.Context) {
6472
go lc.c.Start()
6573

0 commit comments

Comments
 (0)