Skip to content

Commit e847817

Browse files
committed
Use LRU cache implementation
This replaces the custom cache implementation with an expiring LRU cache with limited capacity.
1 parent 896df55 commit e847817

File tree

10 files changed

+207
-160
lines changed

10 files changed

+207
-160
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/go-sql-driver/mysql v1.6.0
88
github.com/golangci/golangci-lint v1.45.2
99
github.com/google/uuid v1.3.0
10+
github.com/jellydator/ttlcache/v3 v3.0.0
1011
github.com/jstemmer/go-junit-report/v2 v2.0.0-beta1
1112
github.com/mattn/go-sqlite3 v1.14.12
1213
github.com/stretchr/testify v1.7.1

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,8 @@ github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ
424424
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
425425
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
426426
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
427+
github.com/jellydator/ttlcache/v3 v3.0.0 h1:zmFhqrB/4sKiEiJHhtseJsNRE32IMVmJSs4++4gaQO4=
428+
github.com/jellydator/ttlcache/v3 v3.0.0/go.mod h1:WwTaEmcXQ3MTjOm4bsZoDFiCu/hMvNWLO1w67RXz6h4=
427429
github.com/jgautheron/goconst v1.5.1 h1:HxVbL1MhydKs8R8n/HE5NPvzfaYmQJA3o879lE4+WcM=
428430
github.com/jgautheron/goconst v1.5.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4=
429431
github.com/jhump/protoreflect v1.6.1/go.mod h1:RZQ/lnuN+zqeRVpQigTwO6o0AJUkxbnSnpuG7toUTG4=
@@ -808,6 +810,7 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
808810
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
809811
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
810812
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
813+
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
811814
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
812815
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
813816
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
@@ -855,6 +858,7 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu
855858
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
856859
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
857860
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
861+
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
858862
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
859863
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
860864
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=

internal/worker/options.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package worker
22

3-
import "time"
3+
import (
4+
"time"
5+
6+
"github.com/cschleiden/go-workflows/internal/workflow"
7+
)
48

59
type Options struct {
610
// WorkflowsPollers is the number of pollers to start. Defaults to 2.
@@ -28,6 +32,16 @@ type Options struct {
2832

2933
// WorkflowHeartbeatInterval is the interval between heartbeat attempts on workflow tasks, when enabled.
3034
WorkflowHeartbeatInterval time.Duration
35+
36+
// WorkflowExecutorCache is the max size of the workflow executor cache. Defaults to 128
37+
WorkflowExecutorCacheSize int
38+
39+
// WorkflowExecutorCache is the max TTL of the workflow executor cache. Defaults to 10 seconds
40+
WorkflowExecutorCacheTTL time.Duration
41+
42+
// WorkflowExecutorCache is the cache to use for workflow executors. If nil, a default cache implementation
43+
// will be used.
44+
WorkflowExecutorCache workflow.ExecutorCache
3145
}
3246

3347
var DefaultOptions = Options{
@@ -37,4 +51,8 @@ var DefaultOptions = Options{
3751
MaxParallelActivityTasks: 0,
3852
ActivityHeartbeatInterval: 25 * time.Second,
3953
WorkflowHeartbeatInterval: 25 * time.Second,
54+
55+
WorkflowExecutorCacheSize: 128,
56+
WorkflowExecutorCacheTTL: time.Second * 10,
57+
WorkflowExecutorCache: nil,
4058
}

internal/worker/workflow.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/cschleiden/go-workflows/backend"
1111
"github.com/cschleiden/go-workflows/internal/task"
1212
"github.com/cschleiden/go-workflows/internal/workflow"
13+
"github.com/cschleiden/go-workflows/internal/workflow/cache"
1314
"github.com/cschleiden/go-workflows/log"
1415
)
1516

@@ -26,7 +27,7 @@ type workflowWorker struct {
2627

2728
registry *workflow.Registry
2829

29-
cache workflow.WorkflowExecutorCache
30+
cache workflow.ExecutorCache
3031

3132
workflowTaskQueue chan *task.Workflow
3233

@@ -36,6 +37,13 @@ type workflowWorker struct {
3637
}
3738

3839
func NewWorkflowWorker(backend backend.Backend, registry *workflow.Registry, options *Options) WorkflowWorker {
40+
var c workflow.ExecutorCache
41+
if options.WorkflowExecutorCache != nil {
42+
c = options.WorkflowExecutorCache
43+
} else {
44+
c = cache.NewWorkflowExecutorLRUCache(options.WorkflowExecutorCacheSize, options.WorkflowExecutorCacheTTL)
45+
}
46+
3947
return &workflowWorker{
4048
backend: backend,
4149

@@ -44,7 +52,7 @@ func NewWorkflowWorker(backend backend.Backend, registry *workflow.Registry, opt
4452
registry: registry,
4553
workflowTaskQueue: make(chan *task.Workflow),
4654

47-
cache: workflow.NewWorkflowExecutorCache(workflow.DefaultWorkflowExecutorCacheOptions),
55+
cache: c,
4856

4957
logger: backend.Logger(),
5058

@@ -53,8 +61,6 @@ func NewWorkflowWorker(backend backend.Backend, registry *workflow.Registry, opt
5361
}
5462

5563
func (ww *workflowWorker) Start(ctx context.Context) error {
56-
go ww.cache.StartEviction(ctx)
57-
5864
for i := 0; i <= ww.options.WorkflowPollers; i++ {
5965
go ww.runPoll(ctx)
6066
}

internal/workflow/cache.go

Lines changed: 1 addition & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -2,105 +2,12 @@ package workflow
22

33
import (
44
"context"
5-
"fmt"
6-
"sync"
7-
"time"
85

96
"github.com/cschleiden/go-workflows/internal/core"
107
)
118

12-
type WorkflowExecutorCache interface {
9+
type ExecutorCache interface {
1310
Store(ctx context.Context, instance *core.WorkflowInstance, workflow WorkflowExecutor) error
1411
Get(ctx context.Context, instance *core.WorkflowInstance) (WorkflowExecutor, bool, error)
1512
StartEviction(ctx context.Context)
1613
}
17-
18-
type workflowExecutorCache struct {
19-
options WorkflowExecutorCacheOptions
20-
t *time.Ticker
21-
mu *sync.Mutex
22-
cache map[string]*workflowExecutorCacheEntry
23-
}
24-
25-
type workflowExecutorCacheEntry struct {
26-
executor WorkflowExecutor
27-
lastAccess time.Time
28-
}
29-
30-
type WorkflowExecutorCacheOptions struct {
31-
// CacheDuration is the duration after which a workflow executor is removed from the cache.
32-
CacheDuration time.Duration
33-
}
34-
35-
var DefaultWorkflowExecutorCacheOptions = WorkflowExecutorCacheOptions{
36-
CacheDuration: 30 * time.Second,
37-
}
38-
39-
func NewWorkflowExecutorCache(options WorkflowExecutorCacheOptions) WorkflowExecutorCache {
40-
c := workflowExecutorCache{
41-
options: options,
42-
t: time.NewTicker(options.CacheDuration),
43-
mu: &sync.Mutex{},
44-
cache: make(map[string]*workflowExecutorCacheEntry),
45-
}
46-
47-
return &c
48-
}
49-
50-
func (c *workflowExecutorCache) Store(ctx context.Context, instance *core.WorkflowInstance, executor WorkflowExecutor) error {
51-
c.mu.Lock()
52-
defer c.mu.Unlock()
53-
54-
if entry, ok := c.cache[getKey(instance)]; ok && entry.executor != executor {
55-
// Close existing executor to prevent leaks
56-
entry.executor.Close()
57-
}
58-
59-
c.cache[getKey(instance)] = &workflowExecutorCacheEntry{
60-
executor: executor,
61-
lastAccess: time.Now(),
62-
}
63-
64-
return nil
65-
}
66-
67-
func (c *workflowExecutorCache) Get(ctx context.Context, instance *core.WorkflowInstance) (WorkflowExecutor, bool, error) {
68-
c.mu.Lock()
69-
defer c.mu.Unlock()
70-
71-
if entry, ok := c.cache[getKey(instance)]; ok {
72-
entry.lastAccess = time.Now()
73-
return entry.executor, true, nil
74-
}
75-
76-
return nil, false, nil
77-
}
78-
79-
func (c *workflowExecutorCache) StartEviction(ctx context.Context) {
80-
for {
81-
select {
82-
case <-c.t.C:
83-
c.mu.Lock()
84-
85-
cutoff := time.Now().Add(-c.options.CacheDuration)
86-
87-
// Check cache entries for eviction
88-
for instance, entry := range c.cache {
89-
if entry.lastAccess.Before(cutoff) {
90-
entry.executor.Close()
91-
92-
delete(c.cache, instance)
93-
}
94-
}
95-
96-
c.mu.Unlock()
97-
98-
case <-ctx.Done():
99-
return
100-
}
101-
}
102-
}
103-
104-
func getKey(instance *core.WorkflowInstance) string {
105-
return fmt.Sprintf("%s-%s", instance.InstanceID, instance.ExecutionID)
106-
}

internal/workflow/cache/cache.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/cschleiden/go-workflows/internal/core"
8+
"github.com/cschleiden/go-workflows/internal/workflow"
9+
"github.com/jellydator/ttlcache/v3"
10+
)
11+
12+
type LruCache struct {
13+
c *ttlcache.Cache[string, workflow.WorkflowExecutor]
14+
}
15+
16+
func NewWorkflowExecutorLRUCache(size int, expiration time.Duration) workflow.ExecutorCache {
17+
c := ttlcache.New(
18+
ttlcache.WithCapacity[string, workflow.WorkflowExecutor](uint64(size)),
19+
ttlcache.WithTTL[string, workflow.WorkflowExecutor](expiration),
20+
)
21+
22+
c.OnEviction(func(ctx context.Context, er ttlcache.EvictionReason, i *ttlcache.Item[string, workflow.WorkflowExecutor]) {
23+
i.Value().Close()
24+
})
25+
26+
return &LruCache{
27+
c: c,
28+
}
29+
}
30+
31+
func (lc *LruCache) Get(ctx context.Context, instance *core.WorkflowInstance) (workflow.WorkflowExecutor, bool, error) {
32+
e := lc.c.Get(getKey(instance))
33+
if e != nil {
34+
return e.Value(), true, nil
35+
}
36+
37+
return nil, false, nil
38+
}
39+
40+
func (lc *LruCache) Store(ctx context.Context, instance *core.WorkflowInstance, executor workflow.WorkflowExecutor) error {
41+
lc.c.Set(getKey(instance), executor, ttlcache.DefaultTTL)
42+
43+
return nil
44+
}
45+
46+
func (lc *LruCache) StartEviction(ctx context.Context) {
47+
go lc.c.Start()
48+
49+
<-ctx.Done()
50+
51+
lc.c.Stop()
52+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"runtime"
6+
"testing"
7+
"time"
8+
9+
"github.com/benbjohnson/clock"
10+
"github.com/cschleiden/go-workflows/backend"
11+
"github.com/cschleiden/go-workflows/internal/core"
12+
"github.com/cschleiden/go-workflows/internal/history"
13+
"github.com/cschleiden/go-workflows/internal/logger"
14+
wf "github.com/cschleiden/go-workflows/internal/workflow"
15+
"github.com/cschleiden/go-workflows/workflow"
16+
"github.com/stretchr/testify/require"
17+
"go.opentelemetry.io/otel/trace"
18+
)
19+
20+
func Test_Cache_StoreAndGet(t *testing.T) {
21+
c := NewWorkflowExecutorLRUCache(1, time.Second*10)
22+
23+
r := wf.NewRegistry()
24+
r.RegisterWorkflow(workflowWithActivity)
25+
26+
i := core.NewWorkflowInstance("instanceID", "executionID")
27+
e, err := wf.NewExecutor(
28+
logger.NewDefaultLogger(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r, &testHistoryProvider{}, i, clock.New())
29+
require.NoError(t, err)
30+
31+
i2 := core.NewWorkflowInstance("instanceID2", "executionID2")
32+
e2, err := wf.NewExecutor(
33+
logger.NewDefaultLogger(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r, &testHistoryProvider{}, i, clock.New())
34+
require.NoError(t, err)
35+
36+
err = c.Store(context.Background(), i, e)
37+
require.NoError(t, err)
38+
39+
re, ok, err := c.Get(context.Background(), i)
40+
require.NoError(t, err)
41+
require.True(t, ok)
42+
require.Equal(t, e, re)
43+
44+
// Store another executor, this should evict the first one
45+
err = c.Store(context.Background(), i2, e2)
46+
require.NoError(t, err)
47+
48+
re, ok, err = c.Get(context.Background(), i)
49+
require.NoError(t, err)
50+
require.False(t, ok)
51+
}
52+
53+
func Test_Cache_Evict(t *testing.T) {
54+
c := NewWorkflowExecutorLRUCache(128,
55+
1, // Should evict immediately
56+
)
57+
58+
i := core.NewWorkflowInstance("instanceID", "executionID")
59+
r := wf.NewRegistry()
60+
r.RegisterWorkflow(workflowWithActivity)
61+
e, err := wf.NewExecutor(
62+
logger.NewDefaultLogger(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r, &testHistoryProvider{}, i, clock.New())
63+
require.NoError(t, err)
64+
65+
err = c.Store(context.Background(), i, e)
66+
require.NoError(t, err)
67+
68+
go c.StartEviction(context.Background())
69+
time.Sleep(1 * time.Millisecond)
70+
runtime.Gosched()
71+
72+
e2, ok, err := c.Get(context.Background(), i)
73+
require.NoError(t, err)
74+
require.False(t, ok)
75+
require.Nil(t, e2)
76+
}
77+
78+
func workflowWithActivity(ctx workflow.Context) (int, error) {
79+
r, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
80+
RetryOptions: workflow.RetryOptions{
81+
MaxAttempts: 2,
82+
},
83+
}, activity1).Get(ctx)
84+
if err != nil {
85+
return 0, err
86+
}
87+
88+
return r, nil
89+
}
90+
91+
func activity1(ctx context.Context) (int, error) {
92+
return 23, nil
93+
}
94+
95+
type testHistoryProvider struct {
96+
history []history.Event
97+
}
98+
99+
func (t *testHistoryProvider) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]history.Event, error) {
100+
return t.history, nil
101+
}

internal/workflow/cache/key.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package cache
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/cschleiden/go-workflows/internal/core"
7+
)
8+
9+
func getKey(instance *core.WorkflowInstance) string {
10+
return fmt.Sprintf("%s-%s", instance.InstanceID, instance.ExecutionID)
11+
}

0 commit comments

Comments
 (0)