Skip to content

Commit bfcf0c2

Browse files
authored
Cleanup future canceled timer messages for mysql backend
1 parent 45b778c commit bfcf0c2

File tree

4 files changed

+74
-9
lines changed

4 files changed

+74
-9
lines changed

backend/mysql/events.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/cschleiden/go-workflows/internal/history"
99
)
1010

11-
func insertNewEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []history.Event) error {
11+
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []history.Event) error {
1212
return insertEvents(ctx, tx, "pending_events", instanceID, newEvents)
1313
}
1414

@@ -51,3 +51,14 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
5151

5252
return nil
5353
}
54+
55+
func removeFutureEvent(ctx context.Context, tx *sql.Tx, instanceID string, scheduleEventID int64) error {
56+
_, err := tx.ExecContext(
57+
ctx,
58+
"DELETE FROM `pending_events` WHERE instance_id = ? AND schedule_event_id = ? AND visible_at IS NOT NULL",
59+
instanceID,
60+
scheduleEventID,
61+
)
62+
63+
return err
64+
}

backend/mysql/mysql.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
7373
}
7474

7575
// Initial history is empty, store only new events
76-
if err := insertNewEvents(ctx, tx, m.WorkflowInstance.InstanceID, []history.Event{m.HistoryEvent}); err != nil {
76+
if err := insertPendingEvents(ctx, tx, m.WorkflowInstance.InstanceID, []history.Event{m.HistoryEvent}); err != nil {
7777
return fmt.Errorf("inserting new event: %w", err)
7878
}
7979

@@ -110,7 +110,7 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
110110
return err
111111
}
112112

113-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
113+
if err := insertPendingEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
114114
return fmt.Errorf("inserting cancellation event: %w", err)
115115
}
116116

@@ -252,7 +252,7 @@ func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, ev
252252
return backend.ErrInstanceNotFound
253253
}
254254

255-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{event}); err != nil {
255+
if err := insertPendingEvents(ctx, tx, instanceID, []history.Event{event}); err != nil {
256256
return fmt.Errorf("inserting signal event: %w", err)
257257
}
258258

@@ -475,6 +475,15 @@ func (b *mysqlBackend) CompleteWorkflowTask(
475475
}
476476
}
477477

478+
for _, event := range executedEvents {
479+
switch event.Type {
480+
case history.EventType_TimerCanceled:
481+
if err := removeFutureEvent(ctx, tx, instance.InstanceID, event.ScheduleEventID); err != nil {
482+
return fmt.Errorf("removing future event: %w", err)
483+
}
484+
}
485+
}
486+
478487
// Insert new workflow events
479488
groupedEvents := make(map[*workflow.Instance][]history.Event)
480489
for _, m := range workflowEvents {
@@ -493,7 +502,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
493502
}
494503
}
495504

496-
if err := insertNewEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
505+
if err := insertPendingEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
497506
return fmt.Errorf("inserting messages: %w", err)
498507
}
499508
}
@@ -631,7 +640,7 @@ func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workf
631640
}
632641

633642
// Insert new event generated during this workflow execution
634-
if err := insertNewEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
643+
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
635644
return fmt.Errorf("inserting new events for completed activity: %w", err)
636645
}
637646

backend/mysql/mysql_test.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,51 @@ func TestMySqlBackendE2E(t *testing.T) {
100100
var _ test.TestBackend = (*mysqlBackend)(nil)
101101

102102
func (mb *mysqlBackend) GetFutureEvents(ctx context.Context) ([]history.Event, error) {
103-
// TODO: TESTING: Implement
104-
return nil, nil
103+
tx, err := mb.db.BeginTx(ctx, nil)
104+
if err != nil {
105+
return nil, err
106+
}
107+
defer tx.Rollback()
108+
109+
// There is no index on `visible_at`, but this is okay for test only usage.
110+
futureEvents, err := tx.QueryContext(
111+
ctx,
112+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
113+
)
114+
if err != nil {
115+
return nil, fmt.Errorf("getting history: %w", err)
116+
}
117+
118+
f := make([]history.Event, 0)
119+
120+
for futureEvents.Next() {
121+
var instanceID string
122+
var attributes []byte
123+
124+
fe := history.Event{}
125+
126+
if err := futureEvents.Scan(
127+
&fe.ID,
128+
&fe.SequenceID,
129+
&instanceID,
130+
&fe.Type,
131+
&fe.Timestamp,
132+
&fe.ScheduleEventID,
133+
&attributes,
134+
&fe.VisibleAt,
135+
); err != nil {
136+
return nil, fmt.Errorf("scanning event: %w", err)
137+
}
138+
139+
a, err := history.DeserializeAttributes(fe.Type, attributes)
140+
if err != nil {
141+
return nil, fmt.Errorf("deserializing attributes: %w", err)
142+
}
143+
144+
fe.Attributes = a
145+
146+
f = append(f, fe)
147+
}
148+
149+
return f, nil
105150
}

backend/mysql/schema.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS `pending_events` (
2828
`visible_at` DATETIME NULL,
2929

3030
INDEX `idx_pending_events_instance_id` (`instance_id`),
31-
INDEX `idx_pending_events_instance_id_visible_at` (`instance_id`, `visible_at`)
31+
INDEX `idx_pending_events_instance_id_visible_at_schedule_event_id` (`instance_id`, `visible_at`, `schedule_event_id`)
3232
);
3333

3434

0 commit comments

Comments
 (0)