Skip to content

Commit 0b2d07f

Browse files
authored
Merge pull request #299 from cschleiden/fix-internal-registry
Make the registry package public
2 parents 6c332fa + 593e289 commit 0b2d07f

File tree

16 files changed

+152
-143
lines changed

16 files changed

+152
-143
lines changed

internal/activity/executor.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
"github.com/cschleiden/go-workflows/backend/payload"
1414
"github.com/cschleiden/go-workflows/internal/args"
1515
"github.com/cschleiden/go-workflows/internal/log"
16-
"github.com/cschleiden/go-workflows/internal/workflow"
1716
"github.com/cschleiden/go-workflows/internal/workflowerrors"
17+
"github.com/cschleiden/go-workflows/registry"
1818
wf "github.com/cschleiden/go-workflows/workflow"
1919
"go.opentelemetry.io/otel/attribute"
2020
"go.opentelemetry.io/otel/trace"
@@ -25,10 +25,16 @@ type Executor struct {
2525
tracer trace.Tracer
2626
converter converter.Converter
2727
propagators []wf.ContextPropagator
28-
r *workflow.Registry
28+
r *registry.Registry
2929
}
3030

31-
func NewExecutor(logger *slog.Logger, tracer trace.Tracer, converter converter.Converter, propagators []wf.ContextPropagator, r *workflow.Registry) *Executor {
31+
func NewExecutor(
32+
logger *slog.Logger,
33+
tracer trace.Tracer,
34+
converter converter.Converter,
35+
propagators []wf.ContextPropagator,
36+
r *registry.Registry,
37+
) *Executor {
3238
return &Executor{
3339
logger: logger,
3440
tracer: tracer,

internal/activity/executor_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414
"github.com/cschleiden/go-workflows/core"
1515
"github.com/cschleiden/go-workflows/internal/args"
1616
"github.com/cschleiden/go-workflows/internal/fn"
17-
"github.com/cschleiden/go-workflows/internal/workflow"
1817
"github.com/cschleiden/go-workflows/internal/workflowerrors"
18+
"github.com/cschleiden/go-workflows/registry"
1919
"github.com/google/uuid"
2020
"github.com/stretchr/testify/require"
2121
"go.opentelemetry.io/otel/trace"
@@ -24,12 +24,12 @@ import (
2424
func TestExecutor_ExecuteActivity(t *testing.T) {
2525
tests := []struct {
2626
name string
27-
setup func(t *testing.T, r *workflow.Registry) *history.ActivityScheduledAttributes
27+
setup func(t *testing.T, r *registry.Registry) *history.ActivityScheduledAttributes
2828
result func(t *testing.T, result payload.Payload, err error)
2929
}{
3030
{
3131
name: "unknown activity",
32-
setup: func(t *testing.T, r *workflow.Registry) *history.ActivityScheduledAttributes {
32+
setup: func(t *testing.T, r *registry.Registry) *history.ActivityScheduledAttributes {
3333
return &history.ActivityScheduledAttributes{
3434
Name: "unknown",
3535
}
@@ -42,7 +42,7 @@ func TestExecutor_ExecuteActivity(t *testing.T) {
4242
},
4343
{
4444
name: "mismatched argument count",
45-
setup: func(t *testing.T, r *workflow.Registry) *history.ActivityScheduledAttributes {
45+
setup: func(t *testing.T, r *registry.Registry) *history.ActivityScheduledAttributes {
4646
a := func(context.Context, int, int) error { return nil }
4747
require.NoError(t, r.RegisterActivity(a))
4848

@@ -58,7 +58,7 @@ func TestExecutor_ExecuteActivity(t *testing.T) {
5858
},
5959
{
6060
name: "wrap error",
61-
setup: func(t *testing.T, r *workflow.Registry) *history.ActivityScheduledAttributes {
61+
setup: func(t *testing.T, r *registry.Registry) *history.ActivityScheduledAttributes {
6262
a := func(context.Context, int) error {
6363
return errors.New("some error")
6464
}
@@ -81,7 +81,7 @@ func TestExecutor_ExecuteActivity(t *testing.T) {
8181
},
8282
{
8383
name: "handle panic",
84-
setup: func(t *testing.T, r *workflow.Registry) *history.ActivityScheduledAttributes {
84+
setup: func(t *testing.T, r *registry.Registry) *history.ActivityScheduledAttributes {
8585
a := func(context.Context, int) error {
8686
panic("activity panic")
8787
}
@@ -107,7 +107,7 @@ func TestExecutor_ExecuteActivity(t *testing.T) {
107107
}
108108
for _, tt := range tests {
109109
t.Run(tt.name, func(t *testing.T) {
110-
r := workflow.NewRegistry()
110+
r := registry.New()
111111
attr := tt.setup(t, r)
112112

113113
e := &Executor{

internal/worker/activity.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,16 @@ import (
1313
"github.com/cschleiden/go-workflows/internal/activity"
1414
"github.com/cschleiden/go-workflows/internal/metrickeys"
1515
im "github.com/cschleiden/go-workflows/internal/metrics"
16-
"github.com/cschleiden/go-workflows/internal/workflow"
1716
"github.com/cschleiden/go-workflows/internal/workflowerrors"
17+
"github.com/cschleiden/go-workflows/registry"
1818
)
1919

20-
func NewActivityWorker(b backend.Backend, registry *workflow.Registry, clock clock.Clock, options WorkerOptions) *Worker[backend.ActivityTask, history.Event] {
20+
func NewActivityWorker(
21+
b backend.Backend,
22+
registry *registry.Registry,
23+
clock clock.Clock,
24+
options WorkerOptions,
25+
) *Worker[backend.ActivityTask, history.Event] {
2126
ae := activity.NewExecutor(b.Logger(), b.Tracer(), b.Converter(), b.ContextPropagators(), registry)
2227

2328
tw := &ActivityTaskWorker{

internal/worker/workflow.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
im "github.com/cschleiden/go-workflows/internal/metrics"
1717
"github.com/cschleiden/go-workflows/internal/workflow"
1818
"github.com/cschleiden/go-workflows/internal/workflow/cache"
19+
"github.com/cschleiden/go-workflows/registry"
1920
)
2021

2122
type WorkflowWorkerOptions struct {
@@ -26,7 +27,11 @@ type WorkflowWorkerOptions struct {
2627
WorkflowExecutorCacheTTL time.Duration
2728
}
2829

29-
func NewWorkflowWorker(b backend.Backend, registry *workflow.Registry, options WorkflowWorkerOptions) *Worker[backend.WorkflowTask, workflow.ExecutionResult] {
30+
func NewWorkflowWorker(
31+
b backend.Backend,
32+
registry *registry.Registry,
33+
options WorkflowWorkerOptions,
34+
) *Worker[backend.WorkflowTask, workflow.ExecutionResult] {
3035
if options.WorkflowExecutorCache == nil {
3136
options.WorkflowExecutorCache = cache.NewWorkflowExecutorLRUCache(b.Metrics(), options.WorkflowExecutorCacheSize, options.WorkflowExecutorCacheTTL)
3237
}
@@ -43,7 +48,7 @@ func NewWorkflowWorker(b backend.Backend, registry *workflow.Registry, options W
4348

4449
type WorkflowTaskWorker struct {
4550
backend backend.Backend
46-
registry *workflow.Registry
51+
registry *registry.Registry
4752
cache workflow.ExecutorCache
4853
logger *slog.Logger
4954
}

internal/workflow/cache/cache_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cschleiden/go-workflows/core"
1616
"github.com/cschleiden/go-workflows/internal/metrics"
1717
wf "github.com/cschleiden/go-workflows/internal/workflow"
18+
"github.com/cschleiden/go-workflows/registry"
1819
"github.com/cschleiden/go-workflows/workflow"
1920
"github.com/stretchr/testify/require"
2021
"go.opentelemetry.io/otel/trace"
@@ -23,7 +24,7 @@ import (
2324
func Test_Cache_StoreAndGet(t *testing.T) {
2425
c := NewWorkflowExecutorLRUCache(metrics.NewNoopMetricsClient(), 1, time.Second*10)
2526

26-
r := wf.NewRegistry()
27+
r := registry.New()
2728
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
2829

2930
i := core.NewWorkflowInstance("instanceID", "executionID")
@@ -65,7 +66,7 @@ func Test_Cache_AutoEviction(t *testing.T) {
6566
)
6667

6768
i := core.NewWorkflowInstance("instanceID", "executionID")
68-
r := wf.NewRegistry()
69+
r := registry.New()
6970
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
7071
e, err := wf.NewExecutor(
7172
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r,
@@ -95,7 +96,7 @@ func Test_Cache_Evict(t *testing.T) {
9596
)
9697

9798
i := core.NewWorkflowInstance("instanceID", "executionID")
98-
r := wf.NewRegistry()
99+
r := registry.New()
99100
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
100101
e, err := wf.NewExecutor(
101102
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r,

internal/workflow/executor.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cschleiden/go-workflows/internal/workflowerrors"
2323
"github.com/cschleiden/go-workflows/internal/workflowstate"
2424
"github.com/cschleiden/go-workflows/internal/workflowtracer"
25+
"github.com/cschleiden/go-workflows/registry"
2526
wf "github.com/cschleiden/go-workflows/workflow"
2627
"go.opentelemetry.io/otel/trace"
2728
)
@@ -45,7 +46,7 @@ type WorkflowExecutor interface {
4546
}
4647

4748
type executor struct {
48-
registry *Registry
49+
registry *registry.Registry
4950
historyProvider WorkflowHistoryProvider
5051
workflow *workflow
5152
workflowName string
@@ -64,7 +65,7 @@ type executor struct {
6465
func NewExecutor(
6566
logger *slog.Logger,
6667
tracer trace.Tracer,
67-
registry *Registry,
68+
registry *registry.Registry,
6869
cv converter.Converter,
6970
propagators []wf.ContextPropagator,
7071
historyProvider WorkflowHistoryProvider,

internal/workflow/executor_test.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cschleiden/go-workflows/internal/command"
1919
"github.com/cschleiden/go-workflows/internal/fn"
2020
"github.com/cschleiden/go-workflows/internal/sync"
21+
"github.com/cschleiden/go-workflows/registry"
2122
wf "github.com/cschleiden/go-workflows/workflow"
2223
"github.com/google/uuid"
2324
"github.com/stretchr/testify/require"
@@ -32,7 +33,7 @@ func (t *testHistoryProvider) GetWorkflowInstanceHistory(ctx context.Context, in
3233
return t.history, nil
3334
}
3435

35-
func newExecutor(r *Registry, i *core.WorkflowInstance, historyProvider WorkflowHistoryProvider) (*executor, error) {
36+
func newExecutor(r *registry.Registry, i *core.WorkflowInstance, historyProvider WorkflowHistoryProvider) (*executor, error) {
3637
logger := slog.Default()
3738
tracer := trace.NewNoopTracerProvider().Tracer("test")
3839

@@ -49,11 +50,11 @@ func activity1(ctx context.Context, r int) (int, error) {
4950
func Test_Executor(t *testing.T) {
5051
tests := []struct {
5152
name string
52-
f func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider)
53+
f func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider)
5354
}{
5455
{
5556
name: "Simple_workflow_to_completion",
56-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
57+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
5758
workflowHits := 0
5859
wf := func(ctx sync.Context) error {
5960
workflowHits++
@@ -75,7 +76,7 @@ func Test_Executor(t *testing.T) {
7576
},
7677
{
7778
name: "Workflow with activity command",
78-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
79+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
7980
workflowActivityHit := 0
8081
workflowWithActivity := func(ctx sync.Context) error {
8182
workflowActivityHit++
@@ -121,7 +122,7 @@ func Test_Executor(t *testing.T) {
121122
},
122123
{
123124
name: "Workflow with activity replay",
124-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
125+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
125126
workflowActivityHit := 0
126127
workflowWithActivity := func(ctx sync.Context) error {
127128
workflowActivityHit++
@@ -186,7 +187,7 @@ func Test_Executor(t *testing.T) {
186187
},
187188
{
188189
name: "Workflow with new events",
189-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
190+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
190191
workflowActivityHit := 0
191192
workflowWithActivity := func(ctx sync.Context) error {
192193
workflowActivityHit++
@@ -264,7 +265,7 @@ func Test_Executor(t *testing.T) {
264265
},
265266
{
266267
name: "Workflow with selector",
267-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
268+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
268269
var workflowWithSelectorHits int
269270

270271
workflowWithSelector := func(ctx sync.Context) error {
@@ -320,7 +321,7 @@ func Test_Executor(t *testing.T) {
320321
},
321322
{
322323
name: "Workflow with timer",
323-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
324+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
324325
workflowTimerHits := 0
325326

326327
workflowWithTimer := func(ctx sync.Context) error {
@@ -366,7 +367,7 @@ func Test_Executor(t *testing.T) {
366367
},
367368
{
368369
name: "Cancel timer multiple times",
369-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
370+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
370371
workflowWithTimer := func(ctx sync.Context) error {
371372
tctx, cancel := wf.WithCancel(ctx)
372373

@@ -402,7 +403,7 @@ func Test_Executor(t *testing.T) {
402403
},
403404
{
404405
name: "Workflow with signal",
405-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
406+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
406407
workflowSignalHits := 0
407408

408409
workflowWithSignal := func(ctx sync.Context) error {
@@ -453,7 +454,7 @@ func Test_Executor(t *testing.T) {
453454
},
454455
{
455456
name: "Completes workflow on unhandled error",
456-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
457+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
457458
workflowPanic := func(ctx sync.Context) error {
458459
panic("wf error")
459460
}
@@ -487,7 +488,7 @@ func Test_Executor(t *testing.T) {
487488
},
488489
{
489490
name: "Schedule subworkflow",
490-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
491+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
491492
subworkflow := func(ctx wf.Context) error {
492493
return nil
493494
}
@@ -515,7 +516,7 @@ func Test_Executor(t *testing.T) {
515516
},
516517
{
517518
name: "Schedule and cancel subworkflow",
518-
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
519+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
519520
subworkflow := func(ctx wf.Context) error {
520521
return nil
521522
}
@@ -583,7 +584,7 @@ func Test_Executor(t *testing.T) {
583584

584585
for _, tt := range tests {
585586
t.Run(tt.name, func(t *testing.T) {
586-
r := NewRegistry()
587+
r := registry.New()
587588

588589
i := core.NewWorkflowInstance(uuid.NewString(), "")
589590
hp := &testHistoryProvider{}

internal/workflow/registry_option.go

Lines changed: 0 additions & 27 deletions
This file was deleted.

registry/errors.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package registry
2+
3+
type ErrInvalidWorkflow struct {
4+
msg string
5+
}
6+
7+
func (e *ErrInvalidWorkflow) Error() string {
8+
return e.msg
9+
}
10+
11+
type ErrWorkflowAlreadyRegistered struct {
12+
msg string
13+
}
14+
15+
func (e *ErrWorkflowAlreadyRegistered) Error() string {
16+
return e.msg
17+
}
18+
19+
type ErrInvalidActivity struct {
20+
msg string
21+
}
22+
23+
func (e *ErrInvalidActivity) Error() string {
24+
return e.msg
25+
}
26+
27+
type ErrActivityAlreadyRegistered struct {
28+
msg string
29+
}
30+
31+
func (e *ErrActivityAlreadyRegistered) Error() string {
32+
return e.msg
33+
}

0 commit comments

Comments
 (0)