Skip to content

Commit 02fbb85

Browse files
committed
Export workflow state
1 parent f1b5ffb commit 02fbb85

File tree

11 files changed

+44
-44
lines changed

11 files changed

+44
-44
lines changed

internal/workflow/activity.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ func executeActivity(ctx sync.Context, options ActivityOptions, activity Activit
3535
return f
3636
}
3737

38-
wfState := getWfState(ctx)
39-
scheduleEventID := wfState.getNextScheduleEventID()
38+
wfState := WorkflowState(ctx)
39+
scheduleEventID := wfState.GetNextScheduleEventID()
4040

4141
name := fn.Name(activity)
4242
cmd := command.NewScheduleActivityTaskCommand(scheduleEventID, name, inputs)
43-
wfState.addCommand(&cmd)
43+
wfState.AddCommand(&cmd)
4444

4545
wfState.pendingFutures[scheduleEventID] = f
4646

@@ -55,7 +55,7 @@ func executeActivity(ctx sync.Context, options ActivityOptions, activity Activit
5555
return
5656
}
5757

58-
wfState.removeCommand(cmd)
58+
wfState.RemoveCommand(cmd)
5959
delete(wfState.pendingFutures, scheduleEventID)
6060
f.Set(nil, sync.Canceled)
6161
})

internal/workflow/executor.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type executor struct {
3838

3939
func NewExecutor(registry *Registry, instance core.WorkflowInstance, clock clock.Clock) (WorkflowExecutor, error) {
4040
state := newWorkflowState(instance, clock)
41-
wfCtx, cancel := sync.WithCancel(withWfState(sync.Background(), state))
41+
wfCtx, cancel := sync.WithCancel(WithWorkflowState(sync.Background(), state))
4242

4343
return &executor{
4444
registry: registry,
@@ -60,7 +60,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) ([]history
6060
}
6161

6262
// Clear commands from previous executions
63-
e.workflowState.clearCommands()
63+
e.workflowState.ClearCommands()
6464
} else {
6565
// Replay history
6666
e.workflowState.setReplaying(true)
@@ -204,7 +204,7 @@ func (e *executor) handleWorkflowTaskStarted(event history.Event, a *history.Wor
204204
}
205205

206206
func (e *executor) handleActivityScheduled(event history.Event, a *history.ActivityScheduledAttributes) error {
207-
c := e.workflowState.removeCommandByEventID(event.ScheduleEventID)
207+
c := e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
208208
if c != nil {
209209
// Ensure the same activity is scheduled again
210210
ca := c.Attr.(*command.ScheduleActivityTaskCommandAttr)
@@ -222,7 +222,7 @@ func (e *executor) handleActivityCompleted(event history.Event, a *history.Activ
222222
return nil
223223
}
224224

225-
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
225+
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
226226
f.Set(a.Result, nil)
227227

228228
return e.workflow.Continue(e.workflowCtx)
@@ -234,15 +234,15 @@ func (e *executor) handleActivityFailed(event history.Event, a *history.Activity
234234
return errors.New("no pending future found for activity failed event")
235235
}
236236

237-
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
237+
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
238238

239239
f.Set(nil, errors.New(a.Reason))
240240

241241
return e.workflow.Continue(e.workflowCtx)
242242
}
243243

244244
func (e *executor) handleTimerScheduled(event history.Event, a *history.TimerScheduledAttributes) error {
245-
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
245+
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
246246

247247
return nil
248248
}
@@ -254,15 +254,15 @@ func (e *executor) handleTimerFired(event history.Event, a *history.TimerFiredAt
254254
return nil
255255
}
256256

257-
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
257+
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
258258

259259
f.Set(nil, nil)
260260

261261
return e.workflow.Continue(e.workflowCtx)
262262
}
263263

264264
func (e *executor) handleSubWorkflowScheduled(event history.Event, a *history.SubWorkflowScheduledAttributes) error {
265-
c := e.workflowState.removeCommandByEventID(event.ScheduleEventID)
265+
c := e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
266266
if c != nil {
267267
ca := c.Attr.(*command.ScheduleSubWorkflowCommandAttr)
268268
if a.Name != ca.Name {
@@ -279,7 +279,7 @@ func (e *executor) handleSubWorkflowFailed(event history.Event, a *history.SubWo
279279
return errors.New("no pending future found for sub workflow failed event")
280280
}
281281

282-
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
282+
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
283283

284284
f.Set(nil, errors.New(a.Error))
285285

@@ -292,7 +292,7 @@ func (e *executor) handleSubWorkflowCompleted(event history.Event, a *history.Su
292292
return errors.New("no pending future found for sub workflow completed event")
293293
}
294294

295-
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
295+
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
296296

297297
f.Set(a.Result, nil)
298298

@@ -301,10 +301,10 @@ func (e *executor) handleSubWorkflowCompleted(event history.Event, a *history.Su
301301

302302
func (e *executor) handleSignalReceived(event history.Event, a *history.SignalReceivedAttributes) error {
303303
// Send signal to workflow channel
304-
sc := e.workflowState.getSignalChannel(a.Name)
304+
sc := e.workflowState.GetSignalChannel(a.Name)
305305
sc.SendNonblocking(e.workflowCtx, a.Arg)
306306

307-
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
307+
e.workflowState.RemoveCommandByEventID(event.ScheduleEventID)
308308

309309
return e.workflow.Continue(e.workflowCtx)
310310
}
@@ -325,7 +325,7 @@ func (e *executor) workflowCompleted(result payload.Payload, err error) error {
325325
e.workflowState.scheduleEventID++
326326

327327
cmd := command.NewCompleteWorkflowCommand(eventId, result, err)
328-
e.workflowState.addCommand(&cmd)
328+
e.workflowState.AddCommand(&cmd)
329329

330330
return nil
331331
}

internal/workflow/executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
func newExecutor(r *Registry, i core.WorkflowInstance) *executor {
2323
state := newWorkflowState(i, clock.New())
24-
wfCtx, cancel := sync.WithCancel(withWfState(sync.Background(), state))
24+
wfCtx, cancel := sync.WithCancel(WithWorkflowState(sync.Background(), state))
2525

2626
return &executor{
2727
registry: r,

internal/workflow/instance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ import (
66
)
77

88
func WorkflowInstance2(ctx sync.Context) core.WorkflowInstance {
9-
wfState := getWfState(ctx)
9+
wfState := WorkflowState(ctx)
1010
return wfState.instance
1111
}

internal/workflow/now.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ import (
77
)
88

99
func Now(ctx sync.Context) time.Time {
10-
wfState := getWfState(ctx)
10+
wfState := WorkflowState(ctx)
1111
return wfState.time
1212
}

internal/workflow/replaying.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package workflow
33
import "github.com/cschleiden/go-workflows/internal/sync"
44

55
func Replaying(ctx sync.Context) bool {
6-
wfState := getWfState(ctx)
6+
wfState := WorkflowState(ctx)
77
return wfState.replaying
88
}
99

1010
func SetReplaying(ctx sync.Context, replaying bool) {
11-
wfState := getWfState(ctx)
11+
wfState := WorkflowState(ctx)
1212
wfState.replaying = replaying
1313
}

internal/workflow/sideeffect.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import (
77
)
88

99
func SideEffect(ctx sync.Context, f func(ctx sync.Context) interface{}) sync.Future {
10-
wfState := getWfState(ctx)
10+
wfState := WorkflowState(ctx)
1111

12-
scheduleEventID := wfState.getNextScheduleEventID()
12+
scheduleEventID := wfState.GetNextScheduleEventID()
1313

1414
future := sync.NewFuture()
1515

@@ -31,7 +31,7 @@ func SideEffect(ctx sync.Context, f func(ctx sync.Context) interface{}) sync.Fut
3131
}
3232

3333
cmd := command.NewSideEffectCommand(scheduleEventID, payload)
34-
wfState.addCommand(&cmd)
34+
wfState.AddCommand(&cmd)
3535

3636
future.Set(r, nil)
3737

internal/workflow/signal.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
func NewSignalChannel(ctx sync.Context, name string) sync.Channel {
8-
wfState := getWfState(ctx)
8+
wfState := WorkflowState(ctx)
99

10-
return wfState.getSignalChannel(name)
10+
return wfState.GetSignalChannel(name)
1111
}

internal/workflow/state.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,25 @@ func newWorkflowState(instance core.WorkflowInstance, clock clock.Clock) *workfl
3636
}
3737
}
3838

39-
func getWfState(ctx sync.Context) *workflowState {
39+
func WorkflowState(ctx sync.Context) *workflowState {
4040
return ctx.Value(workflowCtxKey).(*workflowState)
4141
}
4242

43-
func withWfState(ctx sync.Context, wfState *workflowState) sync.Context {
43+
func WithWorkflowState(ctx sync.Context, wfState *workflowState) sync.Context {
4444
return sync.WithValue(ctx, workflowCtxKey, wfState)
4545
}
4646

47-
func (wf *workflowState) getNextScheduleEventID() int {
47+
func (wf *workflowState) GetNextScheduleEventID() int {
4848
scheduleEventID := wf.scheduleEventID
4949
wf.scheduleEventID++
5050
return scheduleEventID
5151
}
5252

53-
func (wf *workflowState) addCommand(cmd *command.Command) {
53+
func (wf *workflowState) AddCommand(cmd *command.Command) {
5454
wf.commands = append(wf.commands, cmd)
5555
}
5656

57-
func (wf *workflowState) removeCommandByEventID(eventID int) *command.Command {
57+
func (wf *workflowState) RemoveCommandByEventID(eventID int) *command.Command {
5858
for i, c := range wf.commands {
5959
if c.ID == eventID {
6060
wf.commands = append(wf.commands[:i], wf.commands[i+1:]...)
@@ -65,7 +65,7 @@ func (wf *workflowState) removeCommandByEventID(eventID int) *command.Command {
6565
return nil
6666
}
6767

68-
func (wf *workflowState) removeCommand(cmd command.Command) {
68+
func (wf *workflowState) RemoveCommand(cmd command.Command) {
6969
for i, c := range wf.commands {
7070
if *c == cmd {
7171
// TODO: Move to state machines?
@@ -77,23 +77,23 @@ func (wf *workflowState) removeCommand(cmd command.Command) {
7777
}
7878
}
7979

80-
func (wf *workflowState) clearCommands() {
80+
func (wf *workflowState) ClearCommands() {
8181
wf.commands = []*command.Command{}
8282
}
8383

84-
func (wf *workflowState) createSignalChannel(name string) sync.Channel {
84+
func (wf *workflowState) CreateSignalChannel(name string) sync.Channel {
8585
cs := sync.NewBufferedChannel(10_000)
8686
wf.signalChannels[name] = cs
8787
return cs
8888
}
8989

90-
func (wf *workflowState) getSignalChannel(name string) sync.Channel {
90+
func (wf *workflowState) GetSignalChannel(name string) sync.Channel {
9191
cs, ok := wf.signalChannels[name]
9292
if ok {
9393
return cs
9494
}
9595

96-
return wf.createSignalChannel(name)
96+
return wf.CreateSignalChannel(name)
9797
}
9898

9999
func (wf *workflowState) setReplaying(replaying bool) {

internal/workflow/subworkflow.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ func createSubWorkflowInstance(ctx sync.Context, options SubWorkflowOptions, wor
3434
return f
3535
}
3636

37-
wfState := getWfState(ctx)
37+
wfState := WorkflowState(ctx)
3838

39-
scheduleEventID := wfState.getNextScheduleEventID()
39+
scheduleEventID := wfState.GetNextScheduleEventID()
4040

4141
name := fn.Name(workflow)
4242
cmd := command.NewScheduleSubWorkflowCommand(scheduleEventID, options.InstanceID, name, inputs)
43-
wfState.addCommand(&cmd)
43+
wfState.AddCommand(&cmd)
4444

4545
wfState.pendingFutures[scheduleEventID] = f
4646

@@ -55,7 +55,7 @@ func createSubWorkflowInstance(ctx sync.Context, options SubWorkflowOptions, wor
5555
return
5656
}
5757

58-
wfState.removeCommand(cmd)
58+
wfState.RemoveCommand(cmd)
5959
delete(wfState.pendingFutures, scheduleEventID)
6060
f.Set(nil, sync.Canceled)
6161
})

0 commit comments

Comments
 (0)