Skip to content

Commit 3b6d0af

Browse files
authored
Merge pull request #109 from cschleiden/fix-uncached-command-handling
Fix uncached command handling
2 parents 46998ea + a9ad225 commit 3b6d0af

File tree

16 files changed

+164
-63
lines changed

16 files changed

+164
-63
lines changed

.github/workflows/go.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
- name: Tests
3939
run: |
4040
go install github.com/jstemmer/go-junit-report/v2@latest
41-
go test -race -count 3 -v ./... 2>&1 | go-junit-report -set-exit-code -iocopy -out "${{ github.workspace }}/report.xml"
41+
go test -race -count 2 -v ./... 2>&1 | go-junit-report -set-exit-code -iocopy -out "${{ github.workspace }}/report.xml"
4242
4343
4444
- name: Test Summary

backend/mysql/mysql.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
517517
if event.Type == history.EventType_WorkflowExecutionStarted {
518518
a := event.Attributes.(*history.ExecutionStartedAttributes)
519519
// Create new instance
520-
if err := createInstance(ctx, tx, targetInstance, a.Metadata, true); err != nil {
520+
if err := createInstance(ctx, tx, &targetInstance, a.Metadata, true); err != nil {
521521
return err
522522
}
523523

backend/redis/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
210210
if event.Type == history.EventType_WorkflowExecutionStarted {
211211
// Create new instance
212212
a := event.Attributes.(*history.ExecutionStartedAttributes)
213-
if err := createInstanceP(ctx, p, targetInstance, a.Metadata, true); err != nil {
213+
if err := createInstanceP(ctx, p, &targetInstance, a.Metadata, true); err != nil {
214214
return err
215215
}
216216
}
@@ -222,7 +222,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
222222
}
223223

224224
// Try to queue workflow task
225-
if targetInstance != instance {
225+
if targetInstance != *instance {
226226
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstance.InstanceID, nil); err != nil {
227227
return fmt.Errorf("enqueuing workflow task: %w", err)
228228
}

backend/sqlite/sqlite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
406406
if event.Type == history.EventType_WorkflowExecutionStarted {
407407
a := event.Attributes.(*history.ExecutionStartedAttributes)
408408
// Create new instance
409-
if err := createInstance(ctx, tx, targetInstance, a.Metadata, true); err != nil {
409+
if err := createInstance(ctx, tx, &targetInstance, a.Metadata, true); err != nil {
410410
return err
411411
}
412412

backend/test/e2e.go

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/cschleiden/go-workflows/client"
1212
"github.com/cschleiden/go-workflows/internal/core"
1313
"github.com/cschleiden/go-workflows/internal/history"
14+
internalwf "github.com/cschleiden/go-workflows/internal/workflow"
1415
"github.com/cschleiden/go-workflows/worker"
1516
"github.com/cschleiden/go-workflows/workflow"
1617
"github.com/google/uuid"
@@ -166,7 +167,8 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
166167
return i * 2, nil
167168
}
168169

169-
ch := make(chan struct{}, 1)
170+
// Workflow will be executed multiple times, but the test will wait only once. Create buffered channel
171+
ch := make(chan struct{}, 10)
170172

171173
wf := func(ctx workflow.Context) (int, error) {
172174
swfs := make([]workflow.Future[int], 0)
@@ -177,6 +179,7 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
177179
// Unblock test. Should not do this in production code, but here we know that this will be executed in the same process.
178180
ch <- struct{}{}
179181

182+
// Wait for subworkflows to complete
180183
r := 0
181184

182185
for _, f := range swfs {
@@ -362,30 +365,60 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
362365
},
363366
}
364367

365-
for _, tt := range tests {
366-
t.Run(tt.name, func(t *testing.T) {
367-
b := setup()
368-
ctx := context.Background()
369-
ctx, cancel := context.WithCancel(ctx)
368+
run := func(suffix string, workerOptions *worker.Options) {
369+
for _, tt := range tests {
370+
t.Run(tt.name+suffix, func(t *testing.T) {
371+
b := setup()
372+
ctx := context.Background()
373+
ctx, cancel := context.WithCancel(ctx)
370374

371-
c := client.New(b)
372-
w := worker.New(b, &worker.DefaultWorkerOptions)
375+
c := client.New(b)
376+
w := worker.New(b, workerOptions)
373377

374-
t.Cleanup(func() {
375-
cancel()
376-
if err := w.WaitForCompletion(); err != nil {
377-
log.Println("Worker did not stop in time")
378-
t.FailNow()
379-
}
378+
t.Cleanup(func() {
379+
cancel()
380+
if err := w.WaitForCompletion(); err != nil {
381+
log.Println("Worker did not stop in time")
382+
t.FailNow()
383+
}
380384

381-
if teardown != nil {
382-
teardown(b)
383-
}
384-
})
385+
if teardown != nil {
386+
teardown(b)
387+
}
388+
})
385389

386-
tt.f(t, ctx, c, w, b)
387-
})
390+
tt.f(t, ctx, c, w, b)
391+
})
392+
}
388393
}
394+
395+
options := worker.DefaultWorkerOptions
396+
397+
// Run with cache
398+
run("", &options)
399+
400+
// Disable cache for this execution
401+
options.WorkflowExecutorCache = &noopWorkflowExecutorCache{}
402+
run("_without_cache", &options)
403+
}
404+
405+
type noopWorkflowExecutorCache struct {
406+
}
407+
408+
var _ internalwf.ExecutorCache = (*noopWorkflowExecutorCache)(nil)
409+
410+
// Get implements workflow.ExecutorCache
411+
func (*noopWorkflowExecutorCache) Get(ctx context.Context, instance *core.WorkflowInstance) (internalwf.WorkflowExecutor, bool, error) {
412+
return nil, false, nil
413+
}
414+
415+
// StartEviction implements workflow.ExecutorCache
416+
func (*noopWorkflowExecutorCache) StartEviction(ctx context.Context) {
417+
}
418+
419+
// Store implements workflow.ExecutorCache
420+
func (*noopWorkflowExecutorCache) Store(ctx context.Context, instance *core.WorkflowInstance, workflow internalwf.WorkflowExecutor) error {
421+
return nil
389422
}
390423

391424
func register(t *testing.T, ctx context.Context, w worker.Worker, workflows []interface{}, activities []interface{}) {

internal/command/cancel_subworkflow.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func (c *CancelSubWorkflowCommand) Commit(clock clock.Clock) *CommandResult {
4040
&history.SubWorkflowCancellationRequestedAttributes{
4141
SubWorkflowInstance: c.SubWorkflowInstance,
4242
},
43+
history.ScheduleEventID(c.id),
4344
),
4445
},
4546

internal/command/cancel_timer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ import (
55
"github.com/cschleiden/go-workflows/internal/history"
66
)
77

8-
type cancelTimerCommand struct {
8+
type CancelTimerCommand struct {
99
command
1010

1111
TimerScheduleEventID int64
1212
}
1313

14-
var _ Command = (*cancelTimerCommand)(nil)
14+
var _ Command = (*CancelTimerCommand)(nil)
1515

16-
func NewCancelTimerCommand(id int64, timerScheduleEventID int64) *cancelTimerCommand {
17-
return &cancelTimerCommand{
16+
func NewCancelTimerCommand(id int64, timerScheduleEventID int64) *CancelTimerCommand {
17+
return &CancelTimerCommand{
1818
command: command{
1919
state: CommandState_Pending,
2020
id: id,
@@ -23,11 +23,11 @@ func NewCancelTimerCommand(id int64, timerScheduleEventID int64) *cancelTimerCom
2323
}
2424
}
2525

26-
func (*cancelTimerCommand) Type() string {
26+
func (*CancelTimerCommand) Type() string {
2727
return "CancelTimer"
2828
}
2929

30-
func (c *cancelTimerCommand) Commit(clock clock.Clock) *CommandResult {
30+
func (c *CancelTimerCommand) Commit(clock clock.Clock) *CommandResult {
3131
c.commit()
3232

3333
return &CommandResult{

internal/command/command.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@ import (
77

88
type CommandState int
99

10-
// ┌───────┐
11-
// Pending
12-
// └───────┘
13-
// ▼
14-
// ┌─────────┐
15-
// │Committed│
16-
// └─────────┘
17-
// ▼
18-
// ┌────┐
19-
// │Done│
20-
// └────┘
10+
// ┌───────┐
11+
// ┌──────┤Pending│ - Command is pending, has just been added
12+
// └───────┘
13+
//
14+
// ┌─────────┐
15+
// │Committed│ - Command has been committed. Its results (e.g., events) have been checkpointed
16+
// └─────────┘
17+
//
18+
// ┌────┐
19+
// └──────►│Done│ - Command has been marked as done.
20+
// └────┘
2121
const (
2222
CommandState_Pending CommandState = iota
2323
CommandState_Committed
@@ -27,15 +27,17 @@ const (
2727
type Command interface {
2828
ID() int64
2929

30+
Type() string
31+
32+
State() CommandState
33+
34+
Committed() bool
35+
3036
Commit(clock clock.Clock) *CommandResult
3137

3238
// Done marks the command as done. This transitions the state to done and indicates that the result
3339
// of this command has been applied.
3440
Done()
35-
36-
State() CommandState
37-
38-
Type() string
3941
}
4042

4143
type CommandResult struct {
@@ -64,10 +66,15 @@ func (c *command) ID() int64 {
6466
return c.id
6567
}
6668

69+
func (c *command) Committed() bool {
70+
return c.state >= CommandState_Committed
71+
}
72+
6773
func (c *command) State() CommandState {
6874
return c.state
6975
}
7076

77+
// Done marks the command as done
7178
func (c *command) Done() {
7279
c.state = CommandState_Done
7380
}

internal/command/schedule_subworkflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (c *ScheduleSubWorkflowCommand) Commit(clock clock.Clock) *CommandResult {
4949
c.commit()
5050

5151
return &CommandResult{
52-
// Record scheduled sub-workflow
52+
// Record scheduled sub-workflow for source workflow instance
5353
Events: []history.Event{
5454
history.NewPendingEvent(
5555
clock.Now(),

internal/command/sideeffect.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ import (
66
"github.com/cschleiden/go-workflows/internal/payload"
77
)
88

9-
type sideEffectCommand struct {
9+
type SideEffectCommand struct {
1010
command
1111

1212
result payload.Payload
1313
}
1414

15-
var _ Command = (*sideEffectCommand)(nil)
15+
var _ Command = (*SideEffectCommand)(nil)
1616

17-
func NewSideEffectCommand(id int64, result payload.Payload) *sideEffectCommand {
18-
return &sideEffectCommand{
17+
func NewSideEffectCommand(id int64, result payload.Payload) *SideEffectCommand {
18+
return &SideEffectCommand{
1919
command: command{
2020
state: CommandState_Pending,
2121
id: id,
@@ -24,11 +24,11 @@ func NewSideEffectCommand(id int64, result payload.Payload) *sideEffectCommand {
2424
}
2525
}
2626

27-
func (c *sideEffectCommand) Type() string {
27+
func (c *SideEffectCommand) Type() string {
2828
return "SideEffect"
2929
}
3030

31-
func (c *sideEffectCommand) Commit(clock clock.Clock) *CommandResult {
31+
func (c *SideEffectCommand) Commit(clock clock.Clock) *CommandResult {
3232
c.commit()
3333

3434
return &CommandResult{

0 commit comments

Comments
 (0)