Skip to content

Commit f601b20

Browse files
authored
Remove commands after they've been executed
1 parent 2601f71 commit f601b20

File tree

3 files changed

+78
-11
lines changed

3 files changed

+78
-11
lines changed

internal/workflow/executor.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ func NewExecutor(logger log.Logger, registry *Registry, historyProvider Workflow
6464
}
6565

6666
func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*ExecutionResult, error) {
67+
e.workflowState.ClearCommands()
68+
6769
if t.LastSequenceID > e.lastSequenceID {
6870
e.logger.Debug("Task has newer history than current state, fetching and replaying history", "task_sequence_id", t.LastSequenceID, "sequence_id", e.lastSequenceID)
6971

internal/workflow/executor_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,3 +436,61 @@ func Test_ExecuteWorkflowWithSignal(t *testing.T) {
436436
require.True(t, e.workflow.Completed())
437437
require.Len(t, e.workflowState.Commands(), 1)
438438
}
439+
440+
func Test_ClearCommandsBetweenRuns(t *testing.T) {
441+
r := NewRegistry()
442+
443+
workflowActivityHit = 0
444+
445+
r.RegisterWorkflow(workflowWithActivity)
446+
r.RegisterActivity(activity1)
447+
448+
task1 := &task.Workflow{
449+
ID: "oldtaskid",
450+
WorkflowInstance: core.NewWorkflowInstance("instanceID", "executionID"),
451+
NewEvents: []history.Event{
452+
history.NewPendingEvent(
453+
time.Now(),
454+
history.EventType_WorkflowExecutionStarted,
455+
&history.ExecutionStartedAttributes{
456+
Name: "workflowWithActivity",
457+
Inputs: []payload.Payload{},
458+
},
459+
),
460+
},
461+
}
462+
463+
historyProvider := &testHistoryProvider{[]history.Event{}}
464+
e := newExecutor(r, task1.WorkflowInstance, historyProvider)
465+
466+
r1, err := e.ExecuteTask(context.Background(), task1)
467+
require.NoError(t, err)
468+
require.Equal(t, 1, workflowActivityHit)
469+
require.False(t, e.workflow.Completed())
470+
require.Len(t, e.workflowState.Commands(), 1)
471+
472+
historyProvider.history = r1.Executed
473+
474+
task2 := &task.Workflow{
475+
ID: "oldtaskid",
476+
WorkflowInstance: core.NewWorkflowInstance("instanceID", "executionID"),
477+
NewEvents: []history.Event{
478+
history.NewPendingEvent(
479+
time.Now(),
480+
history.EventType_SignalReceived,
481+
&history.SignalReceivedAttributes{
482+
Name: "signalr`",
483+
Arg: []byte("arg"),
484+
},
485+
),
486+
},
487+
LastSequenceID: 4,
488+
}
489+
490+
r2, err := e.ExecuteTask(context.Background(), task2)
491+
require.NoError(t, err)
492+
require.Equal(t, 1, workflowActivityHit)
493+
require.False(t, e.workflow.Completed())
494+
require.Len(t, e.workflowState.Commands(), 0)
495+
require.Len(t, r2.Executed, 3)
496+
}

samples/timer/timer.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package main
33
import (
44
"context"
55
"log"
6-
"os"
7-
"os/signal"
86
"time"
97

108
"github.com/cschleiden/go-workflows/backend"
@@ -27,18 +25,16 @@ func main() {
2725
}
2826

2927
// Run worker
30-
go RunWorker(ctx, b)
28+
w := RunWorker(ctx, b)
3129

3230
// Start workflow via client
3331
c := client.New(b)
3432

3533
startWorkflow(ctx, c)
3634

37-
c2 := make(chan os.Signal, 1)
38-
signal.Notify(c2, os.Interrupt)
39-
<-c2
40-
4135
cancel()
36+
37+
w.Stop()
4238
}
4339

4440
func startWorkflow(ctx context.Context, c client.Client) {
@@ -49,10 +45,20 @@ func startWorkflow(ctx context.Context, c client.Client) {
4945
panic("could not start workflow")
5046
}
5147

52-
log.Println("Started workflow", wf.InstanceID)
48+
for i := 0; i < 10; i++ {
49+
time.Sleep(time.Millisecond * 200)
50+
c.SignalWorkflow(ctx, wf.InstanceID, "Hello world", "value")
51+
}
52+
53+
result, err := client.GetWorkflowResult[string](ctx, c, wf, time.Second*10)
54+
if err != nil {
55+
log.Fatal(err)
56+
}
57+
58+
log.Println("Workflow finished. Result:", result)
5359
}
5460

55-
func RunWorker(ctx context.Context, mb backend.Backend) {
61+
func RunWorker(ctx context.Context, mb backend.Backend) worker.Worker {
5662
w := worker.New(mb, nil)
5763

5864
w.RegisterWorkflow(Workflow1)
@@ -62,6 +68,8 @@ func RunWorker(ctx context.Context, mb backend.Backend) {
6268
if err := w.Start(ctx); err != nil {
6369
panic("could not start worker")
6470
}
71+
72+
return w
6573
}
6674

6775
func Workflow1(ctx workflow.Context, msg string) (string, error) {
@@ -72,12 +80,11 @@ func Workflow1(ctx workflow.Context, msg string) (string, error) {
7280
a1 := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12)
7381

7482
tctx, _ := workflow.WithCancel(ctx)
75-
t := workflow.ScheduleTimer(tctx, 2*time.Second)
7683
// cancel()
7784

7885
workflow.Select(
7986
ctx,
80-
workflow.Await(t, func(ctx workflow.Context, f workflow.Future[struct{}]) {
87+
workflow.Await(workflow.ScheduleTimer(tctx, 2*time.Second), func(ctx workflow.Context, f workflow.Future[struct{}]) {
8188
if _, err := f.Get(ctx); err != nil {
8289
logger.Debug("Timer canceled")
8390
} else {

0 commit comments

Comments
 (0)