Skip to content

Commit e518e4f

Browse files
authored
Clean up future events for cancelled timers for sqlite
1 parent ea73420 commit e518e4f

File tree

4 files changed

+69
-3
lines changed

4 files changed

+69
-3
lines changed

backend/sqlite/events.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,14 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
127127
}
128128
return nil
129129
}
130+
131+
func removeFutureEvent(ctx context.Context, tx *sql.Tx, instanceID string, scheduleEventID int64) error {
132+
_, err := tx.ExecContext(
133+
ctx,
134+
"DELETE FROM `pending_events` WHERE instance_id = ? AND schedule_event_id = ? AND visible_at IS NOT NULL",
135+
instanceID,
136+
scheduleEventID,
137+
)
138+
139+
return err
140+
}

backend/sqlite/schema.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ CREATE TABLE IF NOT EXISTS `pending_events` (
2525
PRIMARY KEY(`id`, `instance_id`)
2626
);
2727

28-
CREATE INDEX IF NOT EXISTS `idx_pending_events_instance_id_visible_at` ON `pending_events` (`instance_id`, `visible_at`);
28+
CREATE INDEX IF NOT EXISTS `idx_pending_events_instance_id_visible_at_schedule_event_id` ON `pending_events` (`instance_id`, `visible_at`, `schedule_event_id`);
2929

3030
CREATE TABLE IF NOT EXISTS `history` (
3131
`id` TEXT,

backend/sqlite/sqlite.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,15 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
365365
}
366366
}
367367

368+
for _, event := range executedEvents {
369+
switch event.Type {
370+
case history.EventType_TimerCanceled:
371+
if err := removeFutureEvent(ctx, tx, instance.InstanceID, event.ScheduleEventID); err != nil {
372+
return fmt.Errorf("removing future event: %w", err)
373+
}
374+
}
375+
}
376+
368377
// Insert new workflow events
369378
groupedEvents := make(map[*workflow.Instance][]history.Event)
370379
for _, m := range workflowEvents {

backend/sqlite/sqlite_test.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sqlite
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
67

78
"github.com/cschleiden/go-workflows/backend"
@@ -26,6 +27,51 @@ func Test_EndToEndSqliteBackend(t *testing.T) {
2627
var _ test.TestBackend = (*sqliteBackend)(nil)
2728

2829
func (sb *sqliteBackend) GetFutureEvents(ctx context.Context) ([]history.Event, error) {
29-
// TODO: TESTING: Implement
30-
return nil, nil
30+
tx, err := sb.db.BeginTx(ctx, nil)
31+
if err != nil {
32+
return nil, err
33+
}
34+
defer tx.Rollback()
35+
36+
// There is no index on `visible_at`, but this is okay for test only usage.
37+
futureEvents, err := tx.QueryContext(
38+
ctx,
39+
"SELECT id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
40+
)
41+
if err != nil {
42+
return nil, fmt.Errorf("getting history: %w", err)
43+
}
44+
45+
f := make([]history.Event, 0)
46+
47+
for futureEvents.Next() {
48+
var instanceID string
49+
var attributes []byte
50+
51+
fe := history.Event{}
52+
53+
if err := futureEvents.Scan(
54+
&fe.ID,
55+
&fe.SequenceID,
56+
&instanceID,
57+
&fe.Type,
58+
&fe.Timestamp,
59+
&fe.ScheduleEventID,
60+
&attributes,
61+
&fe.VisibleAt,
62+
); err != nil {
63+
return nil, fmt.Errorf("scanning event: %w", err)
64+
}
65+
66+
a, err := history.DeserializeAttributes(fe.Type, attributes)
67+
if err != nil {
68+
return nil, fmt.Errorf("deserializing attributes: %w", err)
69+
}
70+
71+
fe.Attributes = a
72+
73+
f = append(f, fe)
74+
}
75+
76+
return f, nil
3177
}

0 commit comments

Comments
 (0)