Skip to content

Commit b70a356

Browse files
authored
Merge pull request #336 from cschleiden/cache-interface
Enable other cache implementations
2 parents 1a0942e + 9f3a0ab commit b70a356

File tree

15 files changed

+57
-50
lines changed

15 files changed

+57
-50
lines changed

backend/test/e2e.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import (
1313
"github.com/cschleiden/go-workflows/client"
1414
"github.com/cschleiden/go-workflows/core"
1515
"github.com/cschleiden/go-workflows/internal/sync"
16-
internalwf "github.com/cschleiden/go-workflows/internal/workflow"
1716
"github.com/cschleiden/go-workflows/worker"
1817
"github.com/cschleiden/go-workflows/workflow"
18+
"github.com/cschleiden/go-workflows/workflow/executor"
1919
"github.com/google/uuid"
2020
"github.com/stretchr/testify/require"
2121
)
@@ -834,10 +834,10 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
834834
type noopWorkflowExecutorCache struct {
835835
}
836836

837-
var _ internalwf.ExecutorCache = (*noopWorkflowExecutorCache)(nil)
837+
var _ executor.Cache = (*noopWorkflowExecutorCache)(nil)
838838

839839
// Get implements workflow.ExecutorCache
840-
func (*noopWorkflowExecutorCache) Get(ctx context.Context, instance *core.WorkflowInstance) (internalwf.WorkflowExecutor, bool, error) {
840+
func (*noopWorkflowExecutorCache) Get(ctx context.Context, instance *core.WorkflowInstance) (executor.WorkflowExecutor, bool, error) {
841841
return nil, false, nil
842842
}
843843

@@ -851,7 +851,7 @@ func (*noopWorkflowExecutorCache) StartEviction(ctx context.Context) {
851851
}
852852

853853
// Store implements workflow.ExecutorCache
854-
func (*noopWorkflowExecutorCache) Store(ctx context.Context, instance *core.WorkflowInstance, workflow internalwf.WorkflowExecutor) error {
854+
func (*noopWorkflowExecutorCache) Store(ctx context.Context, instance *core.WorkflowInstance, workflow executor.WorkflowExecutor) error {
855855
return nil
856856
}
857857

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0
1818
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0
1919
go.opentelemetry.io/otel/trace v1.16.0
20+
go.uber.org/goleak v1.3.0
2021
golang.org/x/tools v0.12.0
2122
modernc.org/sqlite v1.27.0
2223
)
@@ -57,7 +58,6 @@ require (
5758
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
5859
go.tmz.dev/musttag v0.7.2 // indirect
5960
go.uber.org/atomic v1.7.0 // indirect
60-
go.uber.org/goleak v1.3.0 // indirect
6161
go.uber.org/multierr v1.6.0 // indirect
6262
go.uber.org/zap v1.24.0 // indirect
6363
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -672,8 +672,6 @@ go.tmz.dev/musttag v0.7.2 h1:1J6S9ipDbalBSODNT5jCep8dhZyMr4ttnjQagmGYR5s=
672672
go.tmz.dev/musttag v0.7.2/go.mod h1:m6q5NiiSKMnQYokefa2xGoyoXnrswCbJ0AWYzf4Zs28=
673673
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
674674
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
675-
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
676-
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
677675
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
678676
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
679677
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=

internal/worker/workflow.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ import (
1515
"github.com/cschleiden/go-workflows/internal/log"
1616
"github.com/cschleiden/go-workflows/internal/metrickeys"
1717
im "github.com/cschleiden/go-workflows/internal/metrics"
18-
"github.com/cschleiden/go-workflows/internal/workflow"
19-
"github.com/cschleiden/go-workflows/internal/workflow/cache"
2018
"github.com/cschleiden/go-workflows/registry"
19+
"github.com/cschleiden/go-workflows/workflow/executor"
20+
"github.com/cschleiden/go-workflows/workflow/executor/cache"
2121
)
2222

2323
type WorkflowWorkerOptions struct {
2424
WorkerOptions
2525

26-
WorkflowExecutorCache workflow.ExecutorCache
26+
WorkflowExecutorCache executor.Cache
2727
WorkflowExecutorCacheSize int
2828
WorkflowExecutorCacheTTL time.Duration
2929
}
@@ -32,7 +32,7 @@ func NewWorkflowWorker(
3232
b backend.Backend,
3333
registry *registry.Registry,
3434
options WorkflowWorkerOptions,
35-
) *Worker[backend.WorkflowTask, workflow.ExecutionResult] {
35+
) *Worker[backend.WorkflowTask, executor.ExecutionResult] {
3636
if options.WorkflowExecutorCache == nil {
3737
options.WorkflowExecutorCache = cache.NewWorkflowExecutorLRUCache(b.Metrics(), options.WorkflowExecutorCacheSize, options.WorkflowExecutorCacheTTL)
3838
}
@@ -44,13 +44,13 @@ func NewWorkflowWorker(
4444
logger: b.Logger(),
4545
}
4646

47-
return NewWorker[backend.WorkflowTask, workflow.ExecutionResult](b, tw, &options.WorkerOptions)
47+
return NewWorker[backend.WorkflowTask, executor.ExecutionResult](b, tw, &options.WorkerOptions)
4848
}
4949

5050
type WorkflowTaskWorker struct {
5151
backend backend.Backend
5252
registry *registry.Registry
53-
cache workflow.ExecutorCache
53+
cache executor.Cache
5454
logger *slog.Logger
5555
}
5656

@@ -63,7 +63,7 @@ func (wtw *WorkflowTaskWorker) Start(ctx context.Context) error {
6363
}
6464

6565
// Complete implements TaskWorker.
66-
func (wtw *WorkflowTaskWorker) Complete(ctx context.Context, result *workflow.ExecutionResult, t *backend.WorkflowTask) error {
66+
func (wtw *WorkflowTaskWorker) Complete(ctx context.Context, result *executor.ExecutionResult, t *backend.WorkflowTask) error {
6767
logger := wtw.logger.With(
6868
slog.String(log.TaskIDKey, t.ID),
6969
slog.String(log.InstanceIDKey, t.WorkflowInstance.InstanceID),
@@ -99,7 +99,7 @@ func (wtw *WorkflowTaskWorker) Complete(ctx context.Context, result *workflow.Ex
9999
return nil
100100
}
101101

102-
func (wtw *WorkflowTaskWorker) Execute(ctx context.Context, t *backend.WorkflowTask) (*workflow.ExecutionResult, error) {
102+
func (wtw *WorkflowTaskWorker) Execute(ctx context.Context, t *backend.WorkflowTask) (*executor.ExecutionResult, error) {
103103
// Record how long this task was in the queue
104104
firstEvent := t.NewEvents[0]
105105
var scheduledAt time.Time
@@ -154,15 +154,15 @@ func (wtw *WorkflowTaskWorker) Get(ctx context.Context) (*backend.WorkflowTask,
154154
return t, nil
155155
}
156156

157-
func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.WorkflowTask) (workflow.WorkflowExecutor, error) {
157+
func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.WorkflowTask) (executor.WorkflowExecutor, error) {
158158
// Try to get a cached executor
159-
executor, ok, err := wtw.cache.Get(ctx, t.WorkflowInstance)
159+
e, ok, err := wtw.cache.Get(ctx, t.WorkflowInstance)
160160
if err != nil {
161161
wtw.logger.ErrorContext(ctx, "could not get cached workflow task executor", "error", err)
162162
}
163163

164164
if !ok {
165-
executor, err = workflow.NewExecutor(
165+
e, err = executor.NewExecutor(
166166
wtw.logger.With(
167167
slog.String(log.InstanceIDKey, t.WorkflowInstance.InstanceID),
168168
slog.String(log.ExecutionIDKey, t.WorkflowInstance.ExecutionID),
@@ -182,9 +182,9 @@ func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.Workf
182182
}
183183

184184
// Cache executor instance for future continuation tasks, or refresh last access time
185-
if err := wtw.cache.Store(ctx, t.WorkflowInstance, executor); err != nil {
185+
if err := wtw.cache.Store(ctx, t.WorkflowInstance, e); err != nil {
186186
wtw.logger.ErrorContext(ctx, "error while caching workflow task executor:", "error", err)
187187
}
188188

189-
return executor, nil
189+
return e, nil
190190
}

tester/tester.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import (
2424
"github.com/cschleiden/go-workflows/internal/fn"
2525
"github.com/cschleiden/go-workflows/internal/log"
2626
"github.com/cschleiden/go-workflows/internal/signals"
27-
wf "github.com/cschleiden/go-workflows/internal/workflow"
2827
"github.com/cschleiden/go-workflows/internal/workflowerrors"
2928
"github.com/cschleiden/go-workflows/registry"
3029
"github.com/cschleiden/go-workflows/workflow"
30+
"github.com/cschleiden/go-workflows/workflow/executor"
3131
"github.com/google/uuid"
3232
"github.com/stretchr/testify/mock"
3333
"go.opentelemetry.io/otel/trace"
@@ -346,7 +346,7 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) {
346346
tw.pendingEvents = tw.pendingEvents[:0]
347347

348348
// Execute task
349-
e, err := wf.NewExecutor(wt.logger, wt.tracer, wt.registry, wt.converter, wt.propagators, &testHistoryProvider{tw.history}, tw.instance, tw.metadata, wt.clock)
349+
e, err := executor.NewExecutor(wt.logger, wt.tracer, wt.registry, wt.converter, wt.propagators, &testHistoryProvider{tw.history}, tw.instance, tw.metadata, wt.clock)
350350
if err != nil {
351351
panic(fmt.Errorf("could not create workflow executor: %v", err))
352352
}

worker/options.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package worker
33
import (
44
"time"
55

6-
"github.com/cschleiden/go-workflows/internal/workflow"
6+
"github.com/cschleiden/go-workflows/workflow/executor"
77
)
88

99
type Options struct {
@@ -47,7 +47,7 @@ type Options struct {
4747

4848
// WorkflowExecutorCache is the cache to use for workflow executors. If nil, a default cache implementation
4949
// will be used.
50-
WorkflowExecutorCache workflow.ExecutorCache
50+
WorkflowExecutorCache executor.Cache
5151
}
5252

5353
var DefaultOptions = Options{

worker/worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
"github.com/cschleiden/go-workflows/client"
1212
"github.com/cschleiden/go-workflows/internal/signals"
1313
internal "github.com/cschleiden/go-workflows/internal/worker"
14-
workflowinternal "github.com/cschleiden/go-workflows/internal/workflow"
1514
"github.com/cschleiden/go-workflows/registry"
1615
"github.com/cschleiden/go-workflows/workflow"
16+
"github.com/cschleiden/go-workflows/workflow/executor"
1717
)
1818

1919
type Worker struct {
@@ -24,7 +24,7 @@ type Worker struct {
2424

2525
registry *registry.Registry
2626

27-
workflowWorker *internal.Worker[backend.WorkflowTask, workflowinternal.ExecutionResult]
27+
workflowWorker *internal.Worker[backend.WorkflowTask, executor.ExecutionResult]
2828
activityWorker *internal.Worker[backend.ActivityTask, history.Event]
2929
}
3030

internal/workflow/cache.go renamed to workflow/executor/cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
package workflow
1+
package executor
22

33
import (
44
"context"
55

66
"github.com/cschleiden/go-workflows/core"
77
)
88

9-
type ExecutorCache interface {
9+
type Cache interface {
1010
Store(ctx context.Context, instance *core.WorkflowInstance, workflow WorkflowExecutor) error
1111
Evict(ctx context.Context, instance *core.WorkflowInstance) error
1212
Get(ctx context.Context, instance *core.WorkflowInstance) (WorkflowExecutor, bool, error)

internal/workflow/cache/cache.go renamed to workflow/executor/cache/cache.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,22 @@ import (
77
"github.com/cschleiden/go-workflows/backend/metrics"
88
"github.com/cschleiden/go-workflows/core"
99
"github.com/cschleiden/go-workflows/internal/metrickeys"
10-
"github.com/cschleiden/go-workflows/internal/workflow"
10+
"github.com/cschleiden/go-workflows/workflow/executor"
1111
"github.com/jellydator/ttlcache/v3"
1212
)
1313

1414
type lruCache struct {
1515
mc metrics.Client
16-
c *ttlcache.Cache[string, workflow.WorkflowExecutor]
16+
c *ttlcache.Cache[string, executor.WorkflowExecutor]
1717
}
1818

1919
func NewWorkflowExecutorLRUCache(mc metrics.Client, size int, expiration time.Duration) *lruCache {
2020
c := ttlcache.New(
21-
ttlcache.WithCapacity[string, workflow.WorkflowExecutor](uint64(size)),
22-
ttlcache.WithTTL[string, workflow.WorkflowExecutor](expiration),
21+
ttlcache.WithCapacity[string, executor.WorkflowExecutor](uint64(size)),
22+
ttlcache.WithTTL[string, executor.WorkflowExecutor](expiration),
2323
)
2424

25-
c.OnEviction(func(ctx context.Context, er ttlcache.EvictionReason, i *ttlcache.Item[string, workflow.WorkflowExecutor]) {
25+
c.OnEviction(func(ctx context.Context, er ttlcache.EvictionReason, i *ttlcache.Item[string, executor.WorkflowExecutor]) {
2626
// Close the executor to allow it to clean up resources.
2727
i.Value().Close()
2828

@@ -43,7 +43,7 @@ func NewWorkflowExecutorLRUCache(mc metrics.Client, size int, expiration time.Du
4343
}
4444
}
4545

46-
func (lc *lruCache) Get(ctx context.Context, instance *core.WorkflowInstance) (workflow.WorkflowExecutor, bool, error) {
46+
func (lc *lruCache) Get(ctx context.Context, instance *core.WorkflowInstance) (executor.WorkflowExecutor, bool, error) {
4747
e := lc.c.Get(getKey(instance))
4848
if e != nil {
4949
return e.Value(), true, nil
@@ -52,7 +52,7 @@ func (lc *lruCache) Get(ctx context.Context, instance *core.WorkflowInstance) (w
5252
return nil, false, nil
5353
}
5454

55-
func (lc *lruCache) Store(ctx context.Context, instance *core.WorkflowInstance, executor workflow.WorkflowExecutor) error {
55+
func (lc *lruCache) Store(ctx context.Context, instance *core.WorkflowInstance, executor executor.WorkflowExecutor) error {
5656
lc.c.Set(getKey(instance), executor, ttlcache.DefaultTTL)
5757

5858
lc.mc.Gauge(metrickeys.WorkflowInstanceCacheSize, metrics.Tags{}, int64(lc.c.Len()))

internal/workflow/cache/cache_test.go renamed to workflow/executor/cache/cache_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import (
1414
"github.com/cschleiden/go-workflows/backend/metadata"
1515
"github.com/cschleiden/go-workflows/core"
1616
"github.com/cschleiden/go-workflows/internal/metrics"
17-
wf "github.com/cschleiden/go-workflows/internal/workflow"
1817
"github.com/cschleiden/go-workflows/registry"
1918
"github.com/cschleiden/go-workflows/workflow"
19+
"github.com/cschleiden/go-workflows/workflow/executor"
2020
"github.com/stretchr/testify/require"
2121
"go.opentelemetry.io/otel/trace"
2222
)
@@ -28,14 +28,14 @@ func Test_Cache_StoreAndGet(t *testing.T) {
2828
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
2929

3030
i := core.NewWorkflowInstance("instanceID", "executionID")
31-
e, err := wf.NewExecutor(
31+
e, err := executor.NewExecutor(
3232
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter,
3333
[]workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(),
3434
)
3535
require.NoError(t, err)
3636

3737
i2 := core.NewWorkflowInstance("instanceID2", "executionID2")
38-
e2, err := wf.NewExecutor(
38+
e2, err := executor.NewExecutor(
3939
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter,
4040
[]workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(),
4141
)
@@ -68,7 +68,7 @@ func Test_Cache_AutoEviction(t *testing.T) {
6868
i := core.NewWorkflowInstance("instanceID", "executionID")
6969
r := registry.New()
7070
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
71-
e, err := wf.NewExecutor(
71+
e, err := executor.NewExecutor(
7272
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r,
7373
converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i,
7474
&metadata.WorkflowMetadata{}, clock.New(),
@@ -98,7 +98,7 @@ func Test_Cache_Evict(t *testing.T) {
9898
i := core.NewWorkflowInstance("instanceID", "executionID")
9999
r := registry.New()
100100
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
101-
e, err := wf.NewExecutor(
101+
e, err := executor.NewExecutor(
102102
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r,
103103
converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i,
104104
&metadata.WorkflowMetadata{}, clock.New(),

0 commit comments

Comments
 (0)