Skip to content

Commit 0f3a857

Browse files
committed
Discard events for finished instances
1 parent cb80c07 commit 0f3a857

File tree

3 files changed

+29
-17
lines changed

3 files changed

+29
-17
lines changed

backend/redis/workflow.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -100,23 +100,6 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
100100
newEvents = append(newEvents, event)
101101
}
102102

103-
if instanceState.State == core.WorkflowInstanceStateFinished {
104-
l := rb.Logger().With(
105-
"task_id", instanceTask.TaskID,
106-
"id", instanceTask.ID,
107-
"instance_id", instanceState.Instance.InstanceID)
108-
109-
// This should never happen. For now, log information and then panic.
110-
l.Error("got workflow task for finished workflow instance")
111-
112-
// Log events that lead to this task
113-
for _, event := range newEvents {
114-
l.Error("pending_event", "id", event.ID, "event_type", event.Type.String(), "schedule_event_id", event.ScheduleEventID)
115-
}
116-
117-
panic("Dequeued already finished workflow instance task")
118-
}
119-
120103
return &task.Workflow{
121104
ID: instanceTask.TaskID,
122105
WorkflowInstance: instanceState.Instance,

backend/test/e2e.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,23 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
155155
require.Equal(t, 7, r)
156156
},
157157
},
158+
{
159+
name: "Signal_after_completion",
160+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
161+
wf := func(ctx workflow.Context) error {
162+
return nil
163+
}
164+
register(t, ctx, w, []interface{}{wf}, nil)
165+
166+
// Run workflow to completion
167+
instance := runWorkflow(t, ctx, c, wf)
168+
_, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*20)
169+
require.NoError(t, err)
170+
171+
err = c.SignalWorkflow(ctx, instance.InstanceID, "signal", nil)
172+
require.NoError(t, err)
173+
},
174+
},
158175
{
159176
name: "SubWorkflow_Simple",
160177
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {

internal/workflow/executor.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,18 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
9999

100100
logger.Debug("Executing workflow task", "task_last_sequence_id", t.LastSequenceID)
101101

102+
if t.WorkflowInstanceState == core.WorkflowInstanceStateFinished {
103+
// This should never happen. For now, log information and then panic.
104+
logger.Debug("Received workflow task for finished workflow instance, discarding events")
105+
106+
// Log events that caused this task to be scheduled
107+
for _, event := range t.NewEvents {
108+
logger.Debug("Discarded event:", "id", event.ID, "event_type", event.Type.String(), "schedule_event_id", event.ScheduleEventID)
109+
}
110+
111+
return &ExecutionResult{}, nil
112+
}
113+
102114
skipNewEvents := false
103115

104116
if t.LastSequenceID > e.lastSequenceID {

0 commit comments

Comments
 (0)