Skip to content

Commit 84fa6f3

Browse files
authored
Merge pull request #95 from cschleiden/timer-cleanup
Cleanup future pending events when cancelling timers
2 parents 2995cea + a9eb7d9 commit 84fa6f3

21 files changed

+325
-49
lines changed

.vscode/settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
2-
"go.testFlags": ["-race", "-count", "1"], // , "-short"],
2+
// Some integration tests take quite long to run, increase the overall limit for now
3+
"go.testFlags": ["-timeout", "120s", "-race", "-count", "1"], // , "-short"],
34
"workbench.colorCustomizations": {
45
"sash.hoverBorder": "#3399ff",
56
"titleBar.activeBackground": "#007fff",

backend/mysql/events.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/cschleiden/go-workflows/internal/history"
99
)
1010

11-
func insertNewEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []history.Event) error {
11+
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []history.Event) error {
1212
return insertEvents(ctx, tx, "pending_events", instanceID, newEvents)
1313
}
1414

@@ -51,3 +51,14 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
5151

5252
return nil
5353
}
54+
55+
func removeFutureEvent(ctx context.Context, tx *sql.Tx, instanceID string, scheduleEventID int64) error {
56+
_, err := tx.ExecContext(
57+
ctx,
58+
"DELETE FROM `pending_events` WHERE instance_id = ? AND schedule_event_id = ? AND visible_at IS NOT NULL",
59+
instanceID,
60+
scheduleEventID,
61+
)
62+
63+
return err
64+
}

backend/mysql/mysql.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
//go:embed schema.sql
2323
var schema string
2424

25-
func NewMysqlBackend(host string, port int, user, password, database string, opts ...backend.BackendOption) backend.Backend {
25+
func NewMysqlBackend(host string, port int, user, password, database string, opts ...backend.BackendOption) *mysqlBackend {
2626
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true&interpolateParams=true", user, password, host, port, database)
2727

2828
schemaDsn := dsn + "&multiStatements=true"
@@ -73,7 +73,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
7373
}
7474

7575
// Initial history is empty, store only new events
76-
if err := insertNewEvents(ctx, tx, m.WorkflowInstance.InstanceID, []history.Event{m.HistoryEvent}); err != nil {
76+
if err := insertPendingEvents(ctx, tx, m.WorkflowInstance.InstanceID, []history.Event{m.HistoryEvent}); err != nil {
7777
return fmt.Errorf("inserting new event: %w", err)
7878
}
7979

@@ -110,7 +110,7 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
110110
return err
111111
}
112112

113-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
113+
if err := insertPendingEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
114114
return fmt.Errorf("inserting cancellation event: %w", err)
115115
}
116116

@@ -252,7 +252,7 @@ func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, ev
252252
return backend.ErrInstanceNotFound
253253
}
254254

255-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{event}); err != nil {
255+
if err := insertPendingEvents(ctx, tx, instanceID, []history.Event{event}); err != nil {
256256
return fmt.Errorf("inserting signal event: %w", err)
257257
}
258258

@@ -475,6 +475,15 @@ func (b *mysqlBackend) CompleteWorkflowTask(
475475
}
476476
}
477477

478+
for _, event := range executedEvents {
479+
switch event.Type {
480+
case history.EventType_TimerCanceled:
481+
if err := removeFutureEvent(ctx, tx, instance.InstanceID, event.ScheduleEventID); err != nil {
482+
return fmt.Errorf("removing future event: %w", err)
483+
}
484+
}
485+
}
486+
478487
// Insert new workflow events
479488
groupedEvents := make(map[*workflow.Instance][]history.Event)
480489
for _, m := range workflowEvents {
@@ -493,7 +502,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
493502
}
494503
}
495504

496-
if err := insertNewEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
505+
if err := insertPendingEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
497506
return fmt.Errorf("inserting messages: %w", err)
498507
}
499508
}
@@ -631,7 +640,7 @@ func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workf
631640
}
632641

633642
// Insert new event generated during this workflow execution
634-
if err := insertNewEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
643+
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
635644
return fmt.Errorf("inserting new events for completed activity: %w", err)
636645
}
637646

backend/mysql/mysql_test.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package mysql
22

33
import (
4+
"context"
45
"database/sql"
56
"fmt"
67
"strings"
78
"testing"
89

910
"github.com/cschleiden/go-workflows/backend"
1011
"github.com/cschleiden/go-workflows/backend/test"
12+
"github.com/cschleiden/go-workflows/internal/history"
1113
"github.com/google/uuid"
1214
)
1315

@@ -24,7 +26,7 @@ func Test_MysqlBackend(t *testing.T) {
2426

2527
var dbName string
2628

27-
test.BackendTest(t, func() backend.Backend {
29+
test.BackendTest(t, func() test.TestBackend {
2830
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
2931
if err != nil {
3032
panic(err)
@@ -40,7 +42,7 @@ func Test_MysqlBackend(t *testing.T) {
4042
}
4143

4244
return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, backend.WithStickyTimeout(0))
43-
}, func(b backend.Backend) {
45+
}, func(b test.TestBackend) {
4446
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
4547
if err != nil {
4648
panic(err)
@@ -63,7 +65,7 @@ func TestMySqlBackendE2E(t *testing.T) {
6365

6466
var dbName string
6567

66-
test.EndToEndBackendTest(t, func() backend.Backend {
68+
test.EndToEndBackendTest(t, func() test.TestBackend {
6769
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
6870
if err != nil {
6971
panic(err)
@@ -79,7 +81,7 @@ func TestMySqlBackendE2E(t *testing.T) {
7981
}
8082

8183
return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, backend.WithStickyTimeout(0))
82-
}, func(b backend.Backend) {
84+
}, func(b test.TestBackend) {
8385
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
8486
if err != nil {
8587
panic(err)
@@ -94,3 +96,55 @@ func TestMySqlBackendE2E(t *testing.T) {
9496
}
9597
})
9698
}
99+
100+
var _ test.TestBackend = (*mysqlBackend)(nil)
101+
102+
func (mb *mysqlBackend) GetFutureEvents(ctx context.Context) ([]history.Event, error) {
103+
tx, err := mb.db.BeginTx(ctx, nil)
104+
if err != nil {
105+
return nil, err
106+
}
107+
defer tx.Rollback()
108+
109+
// There is no index on `visible_at`, but this is okay for test only usage.
110+
futureEvents, err := tx.QueryContext(
111+
ctx,
112+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
113+
)
114+
if err != nil {
115+
return nil, fmt.Errorf("getting history: %w", err)
116+
}
117+
118+
f := make([]history.Event, 0)
119+
120+
for futureEvents.Next() {
121+
var instanceID string
122+
var attributes []byte
123+
124+
fe := history.Event{}
125+
126+
if err := futureEvents.Scan(
127+
&fe.ID,
128+
&fe.SequenceID,
129+
&instanceID,
130+
&fe.Type,
131+
&fe.Timestamp,
132+
&fe.ScheduleEventID,
133+
&attributes,
134+
&fe.VisibleAt,
135+
); err != nil {
136+
return nil, fmt.Errorf("scanning event: %w", err)
137+
}
138+
139+
a, err := history.DeserializeAttributes(fe.Type, attributes)
140+
if err != nil {
141+
return nil, fmt.Errorf("deserializing attributes: %w", err)
142+
}
143+
144+
fe.Attributes = a
145+
146+
f = append(f, fe)
147+
}
148+
149+
return f, nil
150+
}

backend/mysql/schema.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS `pending_events` (
2828
`visible_at` DATETIME NULL,
2929

3030
INDEX `idx_pending_events_instance_id` (`instance_id`),
31-
INDEX `idx_pending_events_instance_id_visible_at` (`instance_id`, `visible_at`)
31+
INDEX `idx_pending_events_instance_id_visible_at_schedule_event_id` (`instance_id`, `visible_at`, `schedule_event_id`)
3232
);
3333

3434

backend/redis/events.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.Work
7070

7171
addFutureEventCmd.Run(
7272
ctx, p,
73-
[]string{futureEventsKey(), futureEventKey(instance.InstanceID, event.ID)},
73+
[]string{futureEventsKey(), futureEventKey(instance.InstanceID, event.ScheduleEventID)},
7474
event.VisibleAt.Unix(),
7575
instance.InstanceID,
7676
string(eventData),
@@ -86,7 +86,8 @@ var removeFutureEventCmd = redis.NewScript(`
8686
return redis.call("DEL", KEYS[2])
8787
`)
8888

89+
// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID
8990
func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) {
90-
key := futureEventKey(instance.InstanceID, event.ID)
91+
key := futureEventKey(instance.InstanceID, event.ScheduleEventID)
9192
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key})
9293
}

backend/redis/keys.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ func futureEventsKey() string {
2424
return "future-events"
2525
}
2626

27-
func futureEventKey(instanceID, eventID string) string {
28-
return fmt.Sprintf("future-event:%v:%v", instanceID, eventID)
27+
func futureEventKey(instanceID string, scheduleEventID int64) string {
28+
return fmt.Sprintf("future-event:%v:%v", instanceID, scheduleEventID)
2929
}

backend/redis/queue.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
6363
}
6464

6565
for name, cmd := range cmds {
66-
// DEBUG: REMOVE
67-
fmt.Println(name, cmd.Val())
66+
// fmt.Println(name, cmd.Val())
6867

6968
if cmd.Err() != nil {
7069
return nil, fmt.Errorf("loading redis script: %v %w", name, cmd.Err())

backend/redis/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
7777
"requeueInstanceCmd": requeueInstanceCmd.Load(ctx, rb.rdb),
7878
}
7979
for name, cmd := range cmds {
80-
fmt.Println(name, cmd.Val())
80+
// fmt.Println(name, cmd.Val())
8181

8282
if cmd.Err() != nil {
8383
return nil, fmt.Errorf("loading redis script: %v %w", name, cmd.Err())

backend/redis/redis_test.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package redis
22

33
import (
44
"context"
5+
"encoding/json"
6+
"fmt"
57
"strings"
68
"testing"
79
"time"
810

911
"github.com/cschleiden/go-workflows/backend"
1012
"github.com/cschleiden/go-workflows/backend/test"
13+
"github.com/cschleiden/go-workflows/internal/history"
1114
"github.com/cschleiden/go-workflows/log"
1215
"github.com/go-redis/redis/v8"
1316
)
@@ -26,7 +29,7 @@ func Benchmark_RedisBackend(b *testing.B) {
2629
client := getClient()
2730
setup := getCreateBackend(client, true)
2831

29-
test.SimpleWorkflowBenchmark(b, setup, func(b backend.Backend) {
32+
test.SimpleWorkflowBenchmark(b, setup, func(b test.TestBackend) {
3033
if err := b.(*redisBackend).Close(); err != nil {
3134
panic(err)
3235
}
@@ -66,8 +69,8 @@ func getClient() redis.UniversalClient {
6669
return client
6770
}
6871

69-
func getCreateBackend(client redis.UniversalClient, ignoreLog bool) func() backend.Backend {
70-
return func() backend.Backend {
72+
func getCreateBackend(client redis.UniversalClient, ignoreLog bool) func() test.TestBackend {
73+
return func() test.TestBackend {
7174
// Flush database
7275
if err := client.FlushDB(context.Background()).Err(); err != nil {
7376
panic(err)
@@ -125,3 +128,35 @@ func (nl *nullLogger) With(fields ...interface{}) log.Logger {
125128
}
126129

127130
var _ log.Logger = (*nullLogger)(nil)
131+
132+
var _ test.TestBackend = (*redisBackend)(nil)
133+
134+
// GetFutureEvents
135+
func (rb *redisBackend) GetFutureEvents(ctx context.Context) ([]history.Event, error) {
136+
r, err := rb.rdb.ZRangeByScore(ctx, futureEventsKey(), &redis.ZRangeBy{
137+
Min: "-inf",
138+
Max: "+inf",
139+
}).Result()
140+
141+
if err != nil {
142+
return nil, fmt.Errorf("getting future events: %w", err)
143+
}
144+
145+
events := make([]history.Event, 0)
146+
147+
for _, eventID := range r {
148+
eventStr, err := rb.rdb.HGet(ctx, eventID, "event").Result()
149+
if err != nil {
150+
return nil, fmt.Errorf("getting event %v: %w", eventID, err)
151+
}
152+
153+
var event history.Event
154+
if err := json.Unmarshal([]byte(eventStr), &event); err != nil {
155+
return nil, fmt.Errorf("unmarshaling event %v: %w", eventID, err)
156+
}
157+
158+
events = append(events, event)
159+
}
160+
161+
return events, nil
162+
}

0 commit comments

Comments
 (0)