Skip to content

Commit 6ba0c9c

Browse files
committed
Reduce wrapper functions, reorganize packages
1 parent 02fbb85 commit 6ba0c9c

21 files changed

+175
-193
lines changed

internal/tester/tester.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ type WorkflowTester interface {
3232

3333
Registry() *workflow.Registry
3434

35-
OnActivity(activity workflow.Activity, args ...interface{}) *mock.Call
35+
OnActivity(activity interface{}, args ...interface{}) *mock.Call
3636

37-
OnSubWorkflow(workflow workflow.Workflow, args ...interface{}) *mock.Call
37+
OnSubWorkflow(workflow interface{}, args ...interface{}) *mock.Call
3838

3939
SignalWorkflow(signalName string, value interface{})
4040

@@ -78,7 +78,7 @@ type workflowTester struct {
7878
options *options
7979

8080
// Workflow under test
81-
wf workflow.Workflow
81+
wf interface{}
8282
wfi core.WorkflowInstance
8383

8484
// Workflows
@@ -107,7 +107,7 @@ type workflowTester struct {
107107
runningActivities int32
108108
}
109109

110-
func NewWorkflowTester(wf workflow.Workflow) WorkflowTester {
110+
func NewWorkflowTester(wf interface{}) WorkflowTester {
111111
// Start with the current wall-clock tiem
112112
clock := clock.NewMock()
113113
clock.Set(time.Now())
@@ -164,7 +164,7 @@ func (wt *workflowTester) ListenSubWorkflow(listener func(core.WorkflowInstance,
164164
wt.subWorkflowListener = listener
165165
}
166166

167-
func (wt *workflowTester) OnActivity(activity workflow.Activity, args ...interface{}) *mock.Call {
167+
func (wt *workflowTester) OnActivity(activity interface{}, args ...interface{}) *mock.Call {
168168
// Register activity so that we can correctly identify its arguments later
169169
wt.registry.RegisterActivity(activity)
170170

@@ -173,7 +173,7 @@ func (wt *workflowTester) OnActivity(activity workflow.Activity, args ...interfa
173173
return wt.ma.On(name, args...)
174174
}
175175

176-
func (wt *workflowTester) OnSubWorkflow(workflow workflow.Workflow, args ...interface{}) *mock.Call {
176+
func (wt *workflowTester) OnSubWorkflow(workflow interface{}, args ...interface{}) *mock.Call {
177177
// Register workflow so that we can correctly identify its arguments later
178178
wt.registry.RegisterWorkflow(workflow)
179179

@@ -573,7 +573,7 @@ func (wt *workflowTester) scheduleSubWorkflow(event history.WorkflowEvent) {
573573
}
574574
}
575575

576-
func (wt *workflowTester) getInitialEvent(wf workflow.Workflow, args []interface{}) history.Event {
576+
func (wt *workflowTester) getInitialEvent(wf interface{}, args []interface{}) history.Event {
577577
name := fn.Name(wf)
578578

579579
inputs, err := margs.ArgsToInputs(converter.DefaultConverter, args...)

internal/workflow/executor.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cschleiden/go-workflows/internal/payload"
1616
"github.com/cschleiden/go-workflows/internal/sync"
1717
"github.com/cschleiden/go-workflows/internal/task"
18+
"github.com/cschleiden/go-workflows/internal/workflowstate"
1819
"github.com/google/uuid"
1920
errs "github.com/pkg/errors"
2021
)
@@ -28,7 +29,7 @@ type WorkflowExecutor interface {
2829
type executor struct {
2930
registry *Registry
3031
workflow *workflow
31-
workflowState *workflowState
32+
workflowState *workflowstate.WfState
3233
workflowCtx sync.Context
3334
workflowCtxCancel sync.CancelFunc
3435
clock clock.Clock
@@ -37,12 +38,12 @@ type executor struct {
3738
}
3839

3940
func NewExecutor(registry *Registry, instance core.WorkflowInstance, clock clock.Clock) (WorkflowExecutor, error) {
40-
state := newWorkflowState(instance, clock)
41-
wfCtx, cancel := sync.WithCancel(WithWorkflowState(sync.Background(), state))
41+
s := workflowstate.NewWorkflowState(instance, clock)
42+
wfCtx, cancel := sync.WithCancel(workflowstate.WithWorkflowState(sync.Background(), s))
4243

4344
return &executor{
4445
registry: registry,
45-
workflowState: state,
46+
workflowState: s,
4647
workflowCtx: wfCtx,
4748
workflowCtxCancel: cancel,
4849
clock: clock,
@@ -63,7 +64,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) ([]history
6364
e.workflowState.ClearCommands()
6465
} else {
6566
// Replay history
66-
e.workflowState.setReplaying(true)
67+
e.workflowState.SetReplaying(true)
6768
for _, event := range t.History {
6869
if err := e.executeEvent(event); err != nil {
6970
return nil, nil, errs.Wrap(err, "error while replaying event")
@@ -95,7 +96,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) ([]history
9596
}
9697

9798
func (e *executor) executeNewEvents(newEvents []history.Event) error {
98-
e.workflowState.setReplaying(false)
99+
e.workflowState.SetReplaying(false)
99100

100101
for _, event := range newEvents {
101102
if err := e.executeEvent(event); err != nil {
@@ -198,7 +199,7 @@ func (e *executor) handleWorkflowCanceled() error {
198199
}
199200

200201
func (e *executor) handleWorkflowTaskStarted(event history.Event, a *history.WorkflowTaskStartedAttributes) error {
201-
e.workflowState.setTime(event.Timestamp)
202+
e.workflowState.SetTime(event.Timestamp)
202203

203204
return nil
204205
}
@@ -217,7 +218,7 @@ func (e *executor) handleActivityScheduled(event history.Event, a *history.Activ
217218
}
218219

219220
func (e *executor) handleActivityCompleted(event history.Event, a *history.ActivityCompletedAttributes) error {
220-
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
221+
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
221222
if !ok {
222223
return nil
223224
}
@@ -229,7 +230,7 @@ func (e *executor) handleActivityCompleted(event history.Event, a *history.Activ
229230
}
230231

231232
func (e *executor) handleActivityFailed(event history.Event, a *history.ActivityFailedAttributes) error {
232-
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
233+
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
233234
if !ok {
234235
return errors.New("no pending future found for activity failed event")
235236
}
@@ -248,7 +249,7 @@ func (e *executor) handleTimerScheduled(event history.Event, a *history.TimerSch
248249
}
249250

250251
func (e *executor) handleTimerFired(event history.Event, a *history.TimerFiredAttributes) error {
251-
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
252+
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
252253
if !ok {
253254
// Timer already canceled ignore
254255
return nil
@@ -274,7 +275,7 @@ func (e *executor) handleSubWorkflowScheduled(event history.Event, a *history.Su
274275
}
275276

276277
func (e *executor) handleSubWorkflowFailed(event history.Event, a *history.SubWorkflowFailedAttributes) error {
277-
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
278+
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
278279
if !ok {
279280
return errors.New("no pending future found for sub workflow failed event")
280281
}
@@ -287,7 +288,7 @@ func (e *executor) handleSubWorkflowFailed(event history.Event, a *history.SubWo
287288
}
288289

289290
func (e *executor) handleSubWorkflowCompleted(event history.Event, a *history.SubWorkflowCompletedAttributes) error {
290-
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
291+
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
291292
if !ok {
292293
return errors.New("no pending future found for sub workflow completed event")
293294
}
@@ -310,7 +311,7 @@ func (e *executor) handleSignalReceived(event history.Event, a *history.SignalRe
310311
}
311312

312313
func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEffectResultAttributes) error {
313-
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
314+
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
314315
if !ok {
315316
return errors.New("no pending future found for side effect result event")
316317
}
@@ -321,8 +322,7 @@ func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEf
321322
}
322323

323324
func (e *executor) workflowCompleted(result payload.Payload, err error) error {
324-
eventId := e.workflowState.scheduleEventID
325-
e.workflowState.scheduleEventID++
325+
eventId := e.workflowState.GetNextScheduleEventID()
326326

327327
cmd := command.NewCompleteWorkflowCommand(eventId, result, err)
328328
e.workflowState.AddCommand(&cmd)
@@ -332,7 +332,7 @@ func (e *executor) workflowCompleted(result payload.Payload, err error) error {
332332

333333
func (e *executor) processCommands(ctx context.Context, t *task.Workflow) ([]history.Event, []history.WorkflowEvent, error) {
334334
instance := t.WorkflowInstance
335-
commands := e.workflowState.commands
335+
commands := e.workflowState.Commands()
336336

337337
newEvents := make([]history.Event, 0)
338338
workflowEvents := make([]history.WorkflowEvent, 0)

internal/workflow/executor_test.go

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,19 @@ import (
1616
"github.com/cschleiden/go-workflows/internal/payload"
1717
"github.com/cschleiden/go-workflows/internal/sync"
1818
"github.com/cschleiden/go-workflows/internal/task"
19+
"github.com/cschleiden/go-workflows/internal/workflowstate"
20+
wf "github.com/cschleiden/go-workflows/workflow"
1921
"github.com/stretchr/testify/require"
2022
)
2123

2224
func newExecutor(r *Registry, i core.WorkflowInstance) *executor {
23-
state := newWorkflowState(i, clock.New())
24-
wfCtx, cancel := sync.WithCancel(WithWorkflowState(sync.Background(), state))
25+
s := workflowstate.NewWorkflowState(i, clock.New())
26+
wfCtx, cancel := sync.WithCancel(workflowstate.WithWorkflowState(sync.Background(), s))
2527

2628
return &executor{
2729
registry: r,
2830
workflow: NewWorkflow(reflect.ValueOf(workflow1)),
29-
workflowState: state,
31+
workflowState: s,
3032
workflowCtx: wfCtx,
3133
workflowCtxCancel: cancel,
3234
logger: log.Default(),
@@ -74,15 +76,15 @@ func Test_ExecuteWorkflow(t *testing.T) {
7476

7577
require.Equal(t, 1, workflowHits)
7678
require.True(t, e.workflow.Completed())
77-
require.Len(t, e.workflowState.commands, 1)
79+
require.Len(t, e.workflowState.Commands(), 1)
7880
}
7981

8082
var workflowActivityHit int
8183

8284
func workflowWithActivity(ctx sync.Context) error {
8385
workflowActivityHit++
8486

85-
f1 := ExecuteActivity(ctx, DefaultActivityOptions, activity1, 42)
87+
f1 := wf.ExecuteActivity(ctx, wf.DefaultActivityOptions, activity1, 42)
8688

8789
var r int
8890
err := f1.Get(ctx, &r)
@@ -144,7 +146,7 @@ func Test_ReplayWorkflowWithActivityResult(t *testing.T) {
144146

145147
require.Equal(t, 2, workflowActivityHit)
146148
require.True(t, e.workflow.Completed())
147-
require.Len(t, e.workflowState.commands, 1)
149+
require.Len(t, e.workflowState.Commands(), 1)
148150
}
149151

150152
func Test_ExecuteWorkflowWithActivityCommand(t *testing.T) {
@@ -174,7 +176,7 @@ func Test_ExecuteWorkflowWithActivityCommand(t *testing.T) {
174176
e.ExecuteTask(context.Background(), task)
175177

176178
require.Equal(t, 1, workflowActivityHit)
177-
require.Len(t, e.workflowState.commands, 1)
179+
require.Len(t, e.workflowState.Commands(), 1)
178180

179181
inputs, _ := converter.DefaultConverter.To(42)
180182
require.Equal(t, command.Command{
@@ -185,7 +187,7 @@ func Test_ExecuteWorkflowWithActivityCommand(t *testing.T) {
185187
Name: "activity1",
186188
Inputs: []payload.Payload{inputs},
187189
},
188-
}, *e.workflowState.commands[0])
190+
}, *e.workflowState.Commands()[0])
189191
}
190192

191193
var workflowTimerHits int
@@ -194,7 +196,7 @@ func workflowWithTimer(ctx sync.Context) error {
194196
workflowTimerHits++
195197

196198
var r bool
197-
if err := ScheduleTimer(ctx, time.Millisecond*5).Get(ctx, &r); err != nil {
199+
if err := wf.ScheduleTimer(ctx, time.Millisecond*5).Get(ctx, &r); err != nil {
198200
panic("error getting timer future")
199201
}
200202

@@ -229,19 +231,19 @@ func Test_ExecuteWorkflowWithTimer(t *testing.T) {
229231
e.ExecuteTask(context.Background(), task)
230232

231233
require.Equal(t, 1, workflowTimerHits)
232-
require.Len(t, e.workflowState.commands, 1)
234+
require.Len(t, e.workflowState.Commands(), 1)
233235

234-
require.Equal(t, 1, e.workflowState.commands[0].ID)
235-
require.Equal(t, command.CommandType_ScheduleTimer, e.workflowState.commands[0].Type)
236+
require.Equal(t, 1, e.workflowState.Commands()[0].ID)
237+
require.Equal(t, command.CommandType_ScheduleTimer, e.workflowState.Commands()[0].Type)
236238
}
237239

238240
var workflowWithSelectorHits int
239241

240242
func workflowWithSelector(ctx sync.Context) error {
241243
workflowWithSelectorHits++
242244

243-
f1 := ExecuteActivity(ctx, DefaultActivityOptions, activity1, 42)
244-
t := ScheduleTimer(ctx, time.Millisecond*2)
245+
f1 := wf.ExecuteActivity(ctx, wf.DefaultActivityOptions, activity1, 42)
246+
t := wf.ScheduleTimer(ctx, time.Millisecond*2)
245247

246248
sync.Select(
247249
ctx,
@@ -284,10 +286,10 @@ func Test_ExecuteWorkflowWithSelector(t *testing.T) {
284286
e.ExecuteTask(context.Background(), task)
285287

286288
require.Equal(t, 1, workflowWithSelectorHits)
287-
require.Len(t, e.workflowState.commands, 2)
289+
require.Len(t, e.workflowState.Commands(), 2)
288290

289-
require.Equal(t, command.CommandType_ScheduleTimer, e.workflowState.commands[0].Type)
290-
require.Equal(t, command.CommandType_ScheduleActivityTask, e.workflowState.commands[1].Type)
291+
require.Equal(t, command.CommandType_ScheduleTimer, e.workflowState.Commands()[0].Type)
292+
require.Equal(t, command.CommandType_ScheduleActivityTask, e.workflowState.Commands()[1].Type)
291293
}
292294

293295
func Test_ExecuteNewEvents(t *testing.T) {
@@ -332,7 +334,7 @@ func Test_ExecuteNewEvents(t *testing.T) {
332334
require.NoError(t, err)
333335
require.Equal(t, 1, workflowActivityHit)
334336
require.False(t, e.workflow.Completed())
335-
require.Len(t, e.workflowState.commands, 0)
337+
require.Len(t, e.workflowState.Commands(), 0)
336338

337339
h := []history.Event{}
338340
h = append(h, oldTask.NewEvents...)
@@ -360,14 +362,14 @@ func Test_ExecuteNewEvents(t *testing.T) {
360362
require.NoError(t, err)
361363
require.Equal(t, 2, workflowActivityHit)
362364
require.True(t, e.workflow.Completed())
363-
require.Len(t, e.workflowState.commands, 1)
365+
require.Len(t, e.workflowState.Commands(), 1)
364366
}
365367

366368
var workflowSignalHits int
367369

368370
func workflowWithSignal1(ctx sync.Context) error {
369371

370-
c := NewSignalChannel(ctx, "signal1")
372+
c := wf.NewSignalChannel(ctx, "signal1")
371373
c.Receive(ctx, nil)
372374

373375
workflowSignalHits++
@@ -409,5 +411,5 @@ func Test_ExecuteWorkflowWithSignal(t *testing.T) {
409411

410412
require.Equal(t, 1, workflowSignalHits)
411413
require.True(t, e.workflow.Completed())
412-
require.Len(t, e.workflowState.commands, 1)
414+
require.Len(t, e.workflowState.Commands(), 1)
413415
}

internal/workflow/instance.go

Lines changed: 0 additions & 11 deletions
This file was deleted.

internal/workflow/registry.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ type Registry struct {
1212
sync.Mutex
1313

1414
workflowMap map[string]Workflow
15-
activityMap map[string]Activity
15+
activityMap map[string]interface{}
1616
}
1717

1818
func NewRegistry() *Registry {
1919
return &Registry{
2020
Mutex: sync.Mutex{},
2121
workflowMap: make(map[string]Workflow),
22-
activityMap: make(map[string]Activity),
22+
activityMap: make(map[string]interface{}),
2323
}
2424
}
2525

@@ -33,7 +33,7 @@ func (r *Registry) RegisterWorkflow(workflow Workflow) error {
3333
return nil
3434
}
3535

36-
func (r *Registry) RegisterActivity(activity Activity) error {
36+
func (r *Registry) RegisterActivity(activity interface{}) error {
3737
r.Lock()
3838
defer r.Unlock()
3939

@@ -82,7 +82,7 @@ func (r *Registry) GetWorkflow(name string) (Workflow, error) {
8282
return nil, errors.New("workflow not found")
8383
}
8484

85-
func (r *Registry) GetActivity(name string) (Activity, error) {
85+
func (r *Registry) GetActivity(name string) (interface{}, error) {
8686
r.Lock()
8787
defer r.Unlock()
8888

internal/workflow/replaying.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

0 commit comments

Comments
 (0)