Skip to content

Commit 7295531

Browse files
authored
Merge pull request #138 from cschleiden/signal-after-completion-fix
Do not reset workflow instance state when receiving signal after comp…
2 parents a5f7b54 + e2fb8b9 commit 7295531

File tree

6 files changed

+33
-13
lines changed

6 files changed

+33
-13
lines changed

backend/mysql/mysql.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,10 +356,11 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
356356
}
357357

358358
t := &task.Workflow{
359-
ID: wfi.InstanceID,
360-
WorkflowInstance: wfi,
361-
Metadata: metadata,
362-
NewEvents: []history.Event{},
359+
ID: wfi.InstanceID,
360+
WorkflowInstance: wfi,
361+
WorkflowInstanceState: core.WorkflowInstanceStateActive,
362+
Metadata: metadata,
363+
NewEvents: []history.Event{},
363364
}
364365

365366
// Get new events

backend/sqlite/sqlite.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,10 +287,11 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
287287
}
288288

289289
t := &task.Workflow{
290-
ID: wfi.InstanceID,
291-
WorkflowInstance: wfi,
292-
Metadata: metadata,
293-
NewEvents: []history.Event{},
290+
ID: wfi.InstanceID,
291+
WorkflowInstance: wfi,
292+
WorkflowInstanceState: core.WorkflowInstanceStateActive,
293+
Metadata: metadata,
294+
NewEvents: []history.Event{},
294295
}
295296

296297
// Get new events

backend/test/e2e.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,13 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
170170

171171
err = c.SignalWorkflow(ctx, instance.InstanceID, "signal", nil)
172172
require.NoError(t, err)
173+
_, err = client.GetWorkflowResult[int](ctx, c, instance, time.Millisecond*5)
174+
require.NoError(t, err)
175+
176+
err = c.SignalWorkflow(ctx, instance.InstanceID, "signal", nil)
177+
require.NoError(t, err)
178+
_, err = client.GetWorkflowResult[int](ctx, c, instance, time.Millisecond*5)
179+
require.NoError(t, err)
173180
},
174181
},
175182
{

internal/metrickeys/metrickeys.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,7 @@ const (
2828
// Reason for evicting an entry from the workflow instance cache
2929
EvictionReason = "reason"
3030

31+
SubWorkflow = "subworkflow"
32+
3133
ActivityName = "activity"
3234
)

internal/worker/workflow.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func (ww *WorkflowWorker) runPoll(ctx context.Context) {
8989
}
9090

9191
if task != nil {
92+
ww.wg.Add(1)
9293
ww.workflowTaskQueue <- task
9394
}
9495
}
@@ -109,7 +110,6 @@ func (ww *WorkflowWorker) runDispatcher() {
109110

110111
t := t
111112

112-
ww.wg.Add(1)
113113
go func() {
114114
defer ww.wg.Done()
115115

@@ -131,18 +131,25 @@ func (ww *WorkflowWorker) handle(ctx context.Context, t *task.Workflow) {
131131
ww.backend.Metrics().Distribution(metrickeys.WorkflowTaskDelay, metrics.Tags{}, float64(timeInQueue/time.Millisecond))
132132

133133
timer := metrics.Timer(ww.backend.Metrics(), metrickeys.WorkflowTaskProcessed, metrics.Tags{})
134-
defer timer.Stop()
135134

136135
result, err := ww.handleTask(ctx, t)
137136
if err != nil {
138137
ww.logger.Panic("could not handle workflow task", "error", err)
139138
}
140139

140+
// Only record the time spent in the workflow code
141+
timer.Stop()
142+
141143
state := core.WorkflowInstanceStateActive
142144
if result.Completed {
143145
state = core.WorkflowInstanceStateFinished
144146

145-
ww.backend.Metrics().Counter(metrickeys.WorkflowInstanceFinished, metrics.Tags{}, 1)
147+
if t.WorkflowInstanceState != state {
148+
// If the workflow is now finished, record
149+
ww.backend.Metrics().Counter(metrickeys.WorkflowInstanceFinished, metrics.Tags{
150+
metrickeys.SubWorkflow: fmt.Sprint(t.WorkflowInstance.SubWorkflow()),
151+
}, 1)
152+
}
146153
}
147154

148155
ww.backend.Metrics().Counter(metrickeys.ActivityTaskScheduled, metrics.Tags{}, int64(len(result.ActivityEvents)))

internal/workflow/executor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,17 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
100100
logger.Debug("Executing workflow task", "task_last_sequence_id", t.LastSequenceID)
101101

102102
if t.WorkflowInstanceState == core.WorkflowInstanceStateFinished {
103-
// This should never happen. For now, log information and then panic.
103+
// This could happen if signals are delivered after the workflow is finished
104104
logger.Error("Received workflow task for finished workflow instance, discarding events")
105105

106106
// Log events that caused this task to be scheduled
107107
for _, event := range t.NewEvents {
108108
logger.Debug("Discarded event:", "id", event.ID, "event_type", event.Type.String(), "schedule_event_id", event.ScheduleEventID)
109109
}
110110

111-
return &ExecutionResult{}, nil
111+
return &ExecutionResult{
112+
Completed: true,
113+
}, nil
112114
}
113115

114116
skipNewEvents := false

0 commit comments

Comments
 (0)