Skip to content

Commit 42c303d

Browse files
authored
Merge pull request #132 from cschleiden/discard-events-for-finished-instances
Discard events for completed workflow instances
2 parents 60adf88 + d63fec7 commit 42c303d

File tree

19 files changed

+146
-132
lines changed

19 files changed

+146
-132
lines changed

backend/backend.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,6 @@ import (
1515
var ErrInstanceNotFound = errors.New("workflow instance not found")
1616
var ErrInstanceAlreadyExists = errors.New("workflow instance already exists")
1717

18-
type WorkflowState int
19-
20-
const (
21-
WorkflowStateActive WorkflowState = iota
22-
WorkflowStateFinished
23-
)
24-
2518
const TracerName = "go-workflow"
2619

2720
//go:generate mockery --name=Backend --inpackage
@@ -33,7 +26,7 @@ type Backend interface {
3326
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error
3427

3528
// GetWorkflowInstanceState returns the state of the given workflow instance
36-
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (WorkflowState, error)
29+
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error)
3730

3831
// GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID
3932
// is given, only events after that event are returned. Otherwise the full history is returned.
@@ -54,7 +47,7 @@ type Backend interface {
5447
// which will be added to the workflow instance history. workflowEvents are new events for the
5548
// completed or other workflow instances.
5649
CompleteWorkflowTask(
57-
ctx context.Context, task *task.Workflow, instance *workflow.Instance, state WorkflowState,
50+
ctx context.Context, task *task.Workflow, instance *workflow.Instance, state core.WorkflowInstanceState,
5851
executedEvents, activityEvents, timerEvents []history.Event, workflowEvents []history.WorkflowEvent) error
5952

6053
// GetActivityTask returns a pending activity task or nil if there are no pending activities

backend/mock_Backend.go

Lines changed: 7 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/diagnostics.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"database/sql"
66
"time"
77

8-
"github.com/cschleiden/go-workflows/backend"
98
"github.com/cschleiden/go-workflows/diag"
109
"github.com/cschleiden/go-workflows/internal/core"
1110
)
@@ -58,9 +57,9 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
5857
return nil, err
5958
}
6059

61-
var state backend.WorkflowState
60+
var state core.WorkflowInstanceState
6261
if completedAt != nil {
63-
state = backend.WorkflowStateFinished
62+
state = core.WorkflowInstanceStateFinished
6463
}
6564

6665
instances = append(instances, &diag.WorkflowInstanceRef{
@@ -96,9 +95,9 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instanceID stri
9695
return nil, err
9796
}
9897

99-
var state backend.WorkflowState
98+
var state core.WorkflowInstanceState
10099
if completedAt != nil {
101-
state = backend.WorkflowStateFinished
100+
state = core.WorkflowInstanceStateFinished
102101
}
103102

104103
return &diag.WorkflowInstanceRef{

backend/mysql/mysql.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
183183
return h, nil
184184
}
185185

186-
func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (backend.WorkflowState, error) {
186+
func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error) {
187187
row := b.db.QueryRowContext(
188188
ctx,
189189
"SELECT completed_at FROM instances WHERE instance_id = ? AND execution_id = ?",
@@ -194,15 +194,15 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
194194
var completedAt sql.NullTime
195195
if err := row.Scan(&completedAt); err != nil {
196196
if err == sql.ErrNoRows {
197-
return backend.WorkflowStateActive, backend.ErrInstanceNotFound
197+
return core.WorkflowInstanceStateActive, backend.ErrInstanceNotFound
198198
}
199199
}
200200

201201
if completedAt.Valid {
202-
return backend.WorkflowStateFinished, nil
202+
return core.WorkflowInstanceStateFinished, nil
203203
}
204204

205-
return backend.WorkflowStateActive, nil
205+
return core.WorkflowInstanceStateActive, nil
206206
}
207207

208208
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata, ignoreDuplicate bool) error {
@@ -427,7 +427,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
427427
ctx context.Context,
428428
task *task.Workflow,
429429
instance *workflow.Instance,
430-
state backend.WorkflowState,
430+
state core.WorkflowInstanceState,
431431
executedEvents, activityEvents, timerEvents []history.Event,
432432
workflowEvents []history.WorkflowEvent,
433433
) error {
@@ -441,7 +441,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
441441

442442
// Unlock instance, but keep it sticky to the current worker
443443
var completedAt *time.Time
444-
if state == backend.WorkflowStateFinished {
444+
if state == core.WorkflowInstanceStateFinished {
445445
t := time.Now()
446446
completedAt = &t
447447
}

backend/redis/instance.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
8282
return events, nil
8383
}
8484

85-
func (rb *redisBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (backend.WorkflowState, error) {
85+
func (rb *redisBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error) {
8686
instanceState, err := readInstance(ctx, rb.rdb, instance.InstanceID)
8787
if err != nil {
88-
return backend.WorkflowStateActive, err
88+
return core.WorkflowInstanceStateActive, err
8989
}
9090

9191
return instanceState.State, nil
@@ -110,8 +110,8 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
110110
}
111111

112112
type instanceState struct {
113-
Instance *core.WorkflowInstance `json:"instance,omitempty"`
114-
State backend.WorkflowState `json:"state,omitempty"`
113+
Instance *core.WorkflowInstance `json:"instance,omitempty"`
114+
State core.WorkflowInstanceState `json:"state,omitempty"`
115115

116116
Metadata *core.WorkflowMetadata `json:"metadata,omitempty"`
117117

@@ -128,7 +128,7 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
128128

129129
b, err := json.Marshal(&instanceState{
130130
Instance: instance,
131-
State: backend.WorkflowStateActive,
131+
State: core.WorkflowInstanceStateActive,
132132
Metadata: metadata,
133133
CreatedAt: createdAt,
134134
})

backend/redis/workflow.go

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"strconv"
88
"time"
99

10-
"github.com/cschleiden/go-workflows/backend"
11-
1210
"github.com/cschleiden/go-workflows/internal/core"
1311
"github.com/cschleiden/go-workflows/internal/history"
1412
"github.com/cschleiden/go-workflows/internal/task"
@@ -102,30 +100,14 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
102100
newEvents = append(newEvents, event)
103101
}
104102

105-
if instanceState.State == backend.WorkflowStateFinished {
106-
l := rb.Logger().With(
107-
"task_id", instanceTask.TaskID,
108-
"id", instanceTask.ID,
109-
"instance_id", instanceState.Instance.InstanceID)
110-
111-
// This should never happen. For now, log information and then panic.
112-
l.Error("got workflow task for finished workflow instance")
113-
114-
// Log events that lead to this task
115-
for _, event := range newEvents {
116-
l.Error("pending_event", "id", event.ID, "event_type", event.Type.String(), "schedule_event_id", event.ScheduleEventID)
117-
}
118-
119-
panic("Dequeued already finished workflow instance task")
120-
}
121-
122103
return &task.Workflow{
123-
ID: instanceTask.TaskID,
124-
WorkflowInstance: instanceState.Instance,
125-
Metadata: instanceState.Metadata,
126-
LastSequenceID: instanceState.LastSequenceID,
127-
NewEvents: newEvents,
128-
CustomData: msgs[len(msgs)-1].ID, // Id of last pending message in stream at this point
104+
ID: instanceTask.TaskID,
105+
WorkflowInstance: instanceState.Instance,
106+
WorkflowInstanceState: instanceState.State,
107+
Metadata: instanceState.Metadata,
108+
LastSequenceID: instanceState.LastSequenceID,
109+
NewEvents: newEvents,
110+
CustomData: msgs[len(msgs)-1].ID, // Id of last pending message in stream at this point
129111
}, nil
130112
}
131113

@@ -167,7 +149,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
167149
ctx context.Context,
168150
task *task.Workflow,
169151
instance *core.WorkflowInstance,
170-
state backend.WorkflowState,
152+
state core.WorkflowInstanceState,
171153
executedEvents, activityEvents, timerEvents []history.Event,
172154
workflowEvents []history.WorkflowEvent,
173155
) error {
@@ -231,7 +213,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
231213

232214
instanceState.State = state
233215

234-
if state == backend.WorkflowStateFinished {
216+
if state == core.WorkflowInstanceStateFinished {
235217
t := time.Now()
236218
instanceState.CompletedAt = &t
237219
}
@@ -290,7 +272,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
290272
return fmt.Errorf("completing workflow task: %w", err)
291273
}
292274

293-
if state == backend.WorkflowStateFinished {
275+
if state == core.WorkflowInstanceStateFinished {
294276
ctx = tracing.UnmarshalSpan(ctx, instanceState.Metadata)
295277
_, span := rb.Tracer().Start(ctx, "WorkflowComplete",
296278
trace.WithAttributes(

backend/sqlite/diagnostics.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"database/sql"
66
"time"
77

8-
"github.com/cschleiden/go-workflows/backend"
98
"github.com/cschleiden/go-workflows/diag"
109
"github.com/cschleiden/go-workflows/internal/core"
1110
)
@@ -58,9 +57,9 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
5857
return nil, err
5958
}
6059

61-
var state backend.WorkflowState
60+
var state core.WorkflowInstanceState
6261
if completedAt != nil {
63-
state = backend.WorkflowStateFinished
62+
state = core.WorkflowInstanceStateFinished
6463
}
6564

6665
instances = append(instances, &diag.WorkflowInstanceRef{
@@ -96,9 +95,9 @@ func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instanceID str
9695
return nil, err
9796
}
9897

99-
var state backend.WorkflowState
98+
var state core.WorkflowInstanceState
10099
if completedAt != nil {
101-
state = backend.WorkflowStateFinished
100+
state = core.WorkflowInstanceStateFinished
102101
}
103102

104103
return &diag.WorkflowInstanceRef{

backend/sqlite/sqlite.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (sb *sqliteBackend) GetWorkflowInstanceHistory(ctx context.Context, instanc
176176
return h, nil
177177
}
178178

179-
func (s *sqliteBackend) GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (backend.WorkflowState, error) {
179+
func (s *sqliteBackend) GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error) {
180180
row := s.db.QueryRowContext(
181181
ctx,
182182
"SELECT completed_at FROM instances WHERE id = ? AND execution_id = ?",
@@ -187,15 +187,15 @@ func (s *sqliteBackend) GetWorkflowInstanceState(ctx context.Context, instance *
187187
var completedAt sql.NullTime
188188
if err := row.Scan(&completedAt); err != nil {
189189
if err == sql.ErrNoRows {
190-
return backend.WorkflowStateActive, backend.ErrInstanceNotFound
190+
return core.WorkflowInstanceStateActive, backend.ErrInstanceNotFound
191191
}
192192
}
193193

194194
if completedAt.Valid {
195-
return backend.WorkflowStateFinished, nil
195+
return core.WorkflowInstanceStateFinished, nil
196196
}
197197

198-
return backend.WorkflowStateActive, nil
198+
return core.WorkflowInstanceStateActive, nil
199199
}
200200

201201
func (sb *sqliteBackend) SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error {
@@ -322,7 +322,7 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
322322
ctx context.Context,
323323
task *task.Workflow,
324324
instance *workflow.Instance,
325-
state backend.WorkflowState,
325+
state core.WorkflowInstanceState,
326326
executedEvents, activityEvents, timerEvents []history.Event,
327327
workflowEvents []history.WorkflowEvent,
328328
) error {
@@ -333,7 +333,7 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
333333
defer tx.Rollback()
334334

335335
var completedAt *time.Time
336-
if state == backend.WorkflowStateFinished {
336+
if state == core.WorkflowInstanceStateFinished {
337337
t := time.Now()
338338
completedAt = &t
339339
}

backend/test/backendtest.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,11 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
145145
require.NotNil(t, tk)
146146

147147
// Complete workflow task
148-
err = b.CompleteWorkflowTask(ctx, tk, wfi, backend.WorkflowStateActive, tk.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
148+
err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
149149
require.NoError(t, err)
150150

151151
// Task is already completed, this should error
152-
err = b.CompleteWorkflowTask(ctx, tk, wfi, backend.WorkflowStateActive, tk.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
152+
err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
153153
require.Error(t, err)
154154
},
155155
},
@@ -185,7 +185,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
185185

186186
workflowEvents := []history.WorkflowEvent{}
187187

188-
err = b.CompleteWorkflowTask(ctx, task, wfi, backend.WorkflowStateActive, events, activityEvents, []history.Event{}, workflowEvents)
188+
err = b.CompleteWorkflowTask(ctx, task, wfi, core.WorkflowInstanceStateActive, events, activityEvents, []history.Event{}, workflowEvents)
189189
require.NoError(t, err)
190190

191191
time.Sleep(time.Second)
@@ -228,15 +228,15 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
228228
events[i].SequenceID = sequenceID
229229
}
230230

231-
err = b.CompleteWorkflowTask(ctx, task, wfi, backend.WorkflowStateFinished, events, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
231+
err = b.CompleteWorkflowTask(ctx, task, wfi, core.WorkflowInstanceStateFinished, events, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
232232
require.NoError(t, err)
233233

234234
time.Sleep(time.Second)
235235

236236
db := b.(diag.Backend)
237237
s, err := db.GetWorkflowInstance(ctx, wfi.InstanceID)
238238
require.NoError(t, err)
239-
require.Equal(t, backend.WorkflowStateFinished, s.State)
239+
require.Equal(t, core.WorkflowInstanceStateFinished, s.State)
240240
require.NotNil(t, s.CompletedAt)
241241
},
242242
},
@@ -290,7 +290,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
290290
// Simulate context and sub-workflow cancellation
291291
task, err := b.GetWorkflowTask(ctx)
292292
require.NoError(t, err)
293-
err = b.CompleteWorkflowTask(ctx, task, instance, backend.WorkflowStateActive, task.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{
293+
err = b.CompleteWorkflowTask(ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{
294294
{
295295
WorkflowInstance: subInstance1,
296296
HistoryEvent: history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionCanceled, &history.SubWorkflowCancellationRequestedAttributes{
@@ -344,6 +344,6 @@ func startWorkflow(t *testing.T, ctx context.Context, b backend.Backend, c clien
344344
require.NoError(t, err)
345345

346346
err = b.CompleteWorkflowTask(
347-
ctx, task, instance, backend.WorkflowStateActive, task.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
347+
ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
348348
require.NoError(t, err)
349349
}

0 commit comments

Comments
 (0)