Skip to content

Commit 41a2fc5

Browse files
authored
Merge pull request #41 from cschleiden/remove-continuation-tasks
Remove continuation task
2 parents b2e8114 + 70ce103 commit 41a2fc5

File tree

22 files changed

+268
-291
lines changed

22 files changed

+268
-291
lines changed

backend/backend.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ type Backend interface {
3131
// GetWorkflowInstanceState returns the state of the given workflow instance
3232
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (WorkflowState, error)
3333

34-
// GetWorkflowInstanceHistory returns the full workflow history for the given instance
35-
GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance) ([]history.Event, error)
34+
// GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID
35+
// is given, only events after that event are returned. Otherwise the full history is returned.
36+
GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]history.Event, error)
3637

3738
// SignalWorkflow signals a running workflow instance
3839
SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error

backend/mock_Backend.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 23 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -127,18 +127,28 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
127127
return tx.Commit()
128128
}
129129

130-
func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance) ([]history.Event, error) {
130+
func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]history.Event, error) {
131131
tx, err := b.db.BeginTx(ctx, nil)
132132
if err != nil {
133133
return nil, err
134134
}
135135
defer tx.Rollback()
136136

137-
historyEvents, err := tx.QueryContext(
138-
ctx,
139-
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
140-
instance.InstanceID,
141-
)
137+
var historyEvents *sql.Rows
138+
if lastSequenceID != nil {
139+
historyEvents, err = tx.QueryContext(
140+
ctx,
141+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? AND sequence_id > ? ORDER BY sequence_id",
142+
instance.InstanceID,
143+
*lastSequenceID,
144+
)
145+
} else {
146+
historyEvents, err = tx.QueryContext(
147+
ctx,
148+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY sequence_id",
149+
instance.InstanceID,
150+
)
151+
}
142152
if err != nil {
143153
return nil, errors.Wrap(err, "could not get history")
144154
}
@@ -324,12 +334,6 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
324334
return nil, nil
325335
}
326336

327-
// Check if this task is using a dedicated queue and should be returned as a continuation
328-
var kind task.Kind
329-
if stickyUntil != nil && stickyUntil.After(now) {
330-
kind = task.Continuation
331-
}
332-
333337
var wfi *workflow.Instance
334338
if parentInstanceID != nil {
335339
wfi = core.NewSubWorkflowInstance(instanceID, executionID, *parentInstanceID, *parentEventID)
@@ -341,8 +345,6 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
341345
ID: wfi.InstanceID,
342346
WorkflowInstance: wfi,
343347
NewEvents: []history.Event{},
344-
History: []history.Event{},
345-
Kind: kind,
346348
}
347349

348350
// Get new events
@@ -390,75 +392,14 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
390392
return nil, nil
391393
}
392394

393-
// Get historyEvents
394-
if kind != task.Continuation {
395-
historyEvents, err := tx.QueryContext(
396-
ctx,
397-
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
398-
instanceID,
399-
)
400-
if err != nil {
401-
return nil, errors.Wrap(err, "could not get history")
402-
}
403-
404-
for historyEvents.Next() {
405-
var instanceID string
406-
var attributes []byte
407-
408-
historyEvent := history.Event{}
409-
410-
if err := historyEvents.Scan(
411-
&historyEvent.ID,
412-
&historyEvent.SequenceID,
413-
&instanceID,
414-
&historyEvent.Type,
415-
&historyEvent.Timestamp,
416-
&historyEvent.ScheduleEventID,
417-
&attributes,
418-
&historyEvent.VisibleAt,
419-
); err != nil {
420-
return nil, errors.Wrap(err, "could not scan event")
421-
}
422-
423-
a, err := history.DeserializeAttributes(historyEvent.Type, attributes)
424-
if err != nil {
425-
return nil, errors.Wrap(err, "could not deserialize attributes")
426-
}
427-
428-
historyEvent.Attributes = a
429-
430-
t.History = append(t.History, historyEvent)
431-
}
432-
} else {
433-
// Get only most recent history event
434-
row := tx.QueryRowContext(ctx, "SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id DESC LIMIT 1", instanceID)
435-
436-
var instanceID string
437-
var attributes []byte
438-
439-
lastHistoryEvent := history.Event{}
440-
441-
if err := row.Scan(
442-
&lastHistoryEvent.ID,
443-
&lastHistoryEvent.SequenceID,
444-
&instanceID,
445-
&lastHistoryEvent.Type,
446-
&lastHistoryEvent.Timestamp,
447-
&lastHistoryEvent.ScheduleEventID,
448-
&attributes,
449-
&lastHistoryEvent.VisibleAt,
450-
); err != nil {
451-
return nil, errors.Wrap(err, "could not scan event")
452-
}
453-
454-
a, err := history.DeserializeAttributes(lastHistoryEvent.Type, attributes)
455-
if err != nil {
456-
return nil, errors.Wrap(err, "could not deserialize attributes")
395+
// Get most recent sequence id
396+
row = tx.QueryRowContext(ctx, "SELECT sequence_id FROM `history` WHERE instance_id = ? ORDER BY id DESC LIMIT 1", instanceID)
397+
if err := row.Scan(
398+
&t.LastSequenceID,
399+
); err != nil {
400+
if err != sql.ErrNoRows {
401+
return nil, errors.Wrap(err, "could not get most recent sequence id")
457402
}
458-
459-
lastHistoryEvent.Attributes = a
460-
461-
t.History = []history.Event{lastHistoryEvent}
462403
}
463404

464405
if err := tx.Commit(); err != nil {

backend/redis/instance.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, event histor
4949
return nil
5050
}
5151

52-
func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance) ([]history.Event, error) {
52+
func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]history.Event, error) {
5353
msgs, err := rb.rdb.XRange(ctx, historyKey(instance.InstanceID), "-", "+").Result()
5454
if err != nil {
5555
return nil, err
@@ -103,10 +103,11 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
103103
}
104104

105105
type instanceState struct {
106-
Instance *core.WorkflowInstance `json:"instance,omitempty"`
107-
State backend.WorkflowState `json:"state,omitempty"`
108-
CreatedAt time.Time `json:"created_at,omitempty"`
109-
CompletedAt *time.Time `json:"completed_at,omitempty"`
106+
Instance *core.WorkflowInstance `json:"instance,omitempty"`
107+
State backend.WorkflowState `json:"state,omitempty"`
108+
CreatedAt time.Time `json:"created_at,omitempty"`
109+
CompletedAt *time.Time `json:"completed_at,omitempty"`
110+
LastSequenceID int64 `json:"last_sequence_id,omitempty"`
110111
}
111112

112113
func createInstance(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance, ignoreDuplicate bool) error {

backend/redis/redis.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redis
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/cschleiden/go-workflows/backend"
@@ -43,9 +44,9 @@ func NewRedisBackend(address, username, password string, db int, opts ...RedisBa
4344
})
4445

4546
// // // TODO: Only for dev
46-
// if err := client.FlushDB(context.Background()).Err(); err != nil {
47-
// panic(err)
48-
// }
47+
if err := client.FlushDB(context.Background()).Err(); err != nil {
48+
panic(err)
49+
}
4950

5051
workflowQueue, err := taskqueue.New[workflowTaskData](client, "workflows")
5152
if err != nil {

backend/redis/workflow.go

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,10 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
7979
return nil, errors.Wrap(err, "could not read workflow instance")
8080
}
8181

82-
// History
83-
msgs, err := rb.rdb.XRange(ctx, historyKey(instanceTask.ID), "-", "+").Result()
84-
if err != nil {
85-
return nil, errors.Wrap(err, "could not read event stream")
86-
}
87-
88-
historyEvents := make([]history.Event, 0)
89-
90-
for _, msg := range msgs {
91-
var event history.Event
92-
93-
if err := json.Unmarshal([]byte(msg.Values["event"].(string)), &event); err != nil {
94-
return nil, errors.Wrap(err, "could not unmarshal event")
95-
}
96-
97-
historyEvents = append(historyEvents, event)
98-
}
99-
10082
// New Events
10183
newEvents := make([]history.Event, 0)
10284

103-
msgs, err = rb.rdb.XRange(ctx, pendingEventsKey(instanceTask.ID), "-", instanceTask.Data.LastPendingEventMessageID).Result()
85+
msgs, err := rb.rdb.XRange(ctx, pendingEventsKey(instanceTask.ID), "-", instanceTask.Data.LastPendingEventMessageID).Result()
10486
if err != nil {
10587
return nil, errors.Wrap(err, "could not read event stream")
10688
}
@@ -118,7 +100,7 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
118100
return &task.Workflow{
119101
ID: instanceTask.TaskID,
120102
WorkflowInstance: instanceState.Instance,
121-
History: historyEvents,
103+
LastSequenceID: instanceState.LastSequenceID,
122104
NewEvents: newEvents,
123105
}, nil
124106
}
@@ -207,6 +189,7 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
207189
}
208190

209191
instanceState.State = state
192+
instanceState.LastSequenceID = executedEvents[len(executedEvents)-1].SequenceID
210193

211194
if err := updateInstance(ctx, rb.rdb, instance.InstanceID, instanceState); err != nil {
212195
return errors.Wrap(err, "could not update workflow instance")

backend/sqlite/events.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,14 @@ func getPendingEvents(ctx context.Context, tx *sql.Tx, instanceID string) ([]his
3131
return pendingEvents, nil
3232
}
3333

34-
func getHistory(ctx context.Context, tx *sql.Tx, instanceID string) ([]history.Event, error) {
35-
historyEvents, err := tx.QueryContext(ctx, "SELECT * FROM `history` WHERE instance_id = ?", instanceID)
34+
func getHistory(ctx context.Context, tx *sql.Tx, instanceID string, lastSequenceID *int64) ([]history.Event, error) {
35+
var historyEvents *sql.Rows
36+
var err error
37+
if lastSequenceID != nil {
38+
historyEvents, err = tx.QueryContext(ctx, "SELECT * FROM `history` WHERE instance_id = ? AND sequence_id > ?", instanceID, *lastSequenceID)
39+
} else {
40+
historyEvents, err = tx.QueryContext(ctx, "SELECT * FROM `history` WHERE instance_id = ?", instanceID)
41+
}
3642
if err != nil {
3743
return nil, errors.Wrap(err, "could not get history")
3844
}

backend/sqlite/sqlite.go

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,14 @@ func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *w
163163
return tx.Commit()
164164
}
165165

166-
func (sb *sqliteBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance) ([]history.Event, error) {
166+
func (sb *sqliteBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]history.Event, error) {
167167
tx, err := sb.db.BeginTx(ctx, nil)
168168
if err != nil {
169169
return nil, err
170170
}
171171
defer tx.Rollback()
172172

173-
h, err := getHistory(ctx, tx, instance.InstanceID)
173+
h, err := getHistory(ctx, tx, instance.InstanceID, lastSequenceID)
174174
if err != nil {
175175
return nil, errors.Wrap(err, "could not get workflow history")
176176
}
@@ -267,12 +267,6 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
267267
return nil, errors.Wrap(err, "could not lock workflow task")
268268
}
269269

270-
// Check if this task is using a dedicated queue and should be returned as a continuation
271-
var kind task.Kind
272-
if stickyUntil != nil && stickyUntil.After(now) {
273-
kind = task.Continuation
274-
}
275-
276270
var wfi *workflow.Instance
277271
if parentInstanceID != nil {
278272
wfi = core.NewSubWorkflowInstance(instanceID, executionID, *parentInstanceID, *parentEventID)
@@ -284,8 +278,6 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
284278
ID: wfi.InstanceID,
285279
WorkflowInstance: wfi,
286280
NewEvents: []history.Event{},
287-
History: []history.Event{},
288-
Kind: kind,
289281
}
290282

291283
// Get new events
@@ -301,25 +293,13 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
301293

302294
t.NewEvents = pendingEvents
303295

304-
// Get workflow history
305-
if kind != task.Continuation {
306-
// Retrieve full history
307-
h, err := getHistory(ctx, tx, instanceID)
308-
if err != nil {
309-
return nil, errors.Wrap(err, "could not get workflow history")
296+
// Get only most recent sequence ID
297+
// TODO: Denormalize to instances table
298+
row = tx.QueryRowContext(ctx, "SELECT sequence_id FROM `history` WHERE instance_id = ? ORDER BY rowid DESC LIMIT 1", instanceID)
299+
if err := row.Scan(&t.LastSequenceID); err != nil {
300+
if err != sql.ErrNoRows {
301+
return nil, errors.Wrap(err, "could not get most recent sequence id")
310302
}
311-
312-
t.History = h
313-
} else {
314-
// Get only most recent history event
315-
row := tx.QueryRowContext(ctx, "SELECT * FROM `history` WHERE instance_id = ? ORDER BY rowid DESC LIMIT 1", instanceID)
316-
317-
lastHistoryEvent, err := scanEvent(row)
318-
if err != nil {
319-
return nil, errors.Wrap(err, "could not get workflow history")
320-
}
321-
322-
t.History = []history.Event{lastHistoryEvent}
323303
}
324304

325305
if err := tx.Commit(); err != nil {

backend/test/backendtest.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
168168

169169
time.Sleep(time.Second)
170170

171-
h, err := b.GetWorkflowInstanceHistory(ctx, wfi)
171+
h, err := b.GetWorkflowInstanceHistory(ctx, wfi, nil)
172172
require.NoError(t, err)
173173
require.Equal(t, len(events), len(h))
174174
for i, event := range events {

client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func GetWorkflowResult[T any](ctx context.Context, c Client, instance *workflow.
148148
ic := c.(*client)
149149
b := ic.backend
150150

151-
h, err := b.GetWorkflowInstanceHistory(ctx, instance)
151+
h, err := b.GetWorkflowInstanceHistory(ctx, instance, nil)
152152
if err != nil {
153153
return z, errors.Wrap(err, "could not get workflow history")
154154
}

0 commit comments

Comments
 (0)