Skip to content

Commit d1ef343

Browse files
Plumbing err to workflow completed event (#21200)
* Add error propagation to WorkflowExecutionFinished events - Add execErr parameter to EmitExecutionFinishedEvent and populate the v2 proto Error field - Pass error at all 3 existing v2 engine call sites - Fix 2 orphaned early returns that emitted started but not finished - Pass nil at v1 engine call site for compilation - Add test case verifying error is included in v2 event * bumping protos + fixing build failure * moving event emission to defer pattern * make gomodtidy * lint --------- Co-authored-by: De Clercq Wentzel <10665586+wentzeld@users.noreply.github.com>
1 parent 16a95df commit d1ef343

File tree

18 files changed

+63
-35
lines changed

18 files changed

+63
-35
lines changed

core/scripts/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ require (
504504
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
505505
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect
506506
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3 // indirect
507-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260205231316-3b9c600dd791 // indirect
507+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f // indirect
508508
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260211115641-f96bb4343198 // indirect
509509
github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260209164410-3aec83b0246f // indirect
510510
github.com/smartcontractkit/chainlink-sui v0.0.0-20260205175622-33e65031f9a9 // indirect

core/scripts/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,8 +1670,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C
16701670
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA=
16711671
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3 h1:X8Pekpv+cy0eW1laZTwATuYLTLZ6gRTxz1ZWOMtU74o=
16721672
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
1673-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260205231316-3b9c600dd791 h1:S+yHuhcny3AKOhCekMQa65uUeR/p9rGrUIb8eifkSTY=
1674-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260205231316-3b9c600dd791/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc=
1673+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f h1:3+vQMwuWL6+OqNutFqo/+gkczJwcr+MBPqeSxcjfI1Y=
1674+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc=
16751675
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260211115641-f96bb4343198 h1:wWLBlbexHxP87lEdGR022Ve+2OW/MxvfWxg2U+uoMis=
16761676
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260211115641-f96bb4343198/go.mod h1:/cw67XEnsP9wjQQH4BhL347Qy9HDg51+ETrMpdRTPIo=
16771677
github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260209164410-3aec83b0246f h1:XlAfD2E1/xrHRgMp8TwB8sdbbLZc+TzhGAeDDKqtLk0=

core/services/workflows/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter
697697

698698
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d (seconds)", executionDuration), l)
699699
l.Infof("execution duration: %d (seconds)", executionDuration)
700-
err = events.EmitExecutionFinishedEvent(ctx, cma.Labels(), status, executionID, l)
700+
err = events.EmitExecutionFinishedEvent(ctx, cma.Labels(), status, executionID, nil, l)
701701
if err != nil {
702702
e.logger.Errorf("failed to emit execution finished event: %+v", err)
703703
}

core/services/workflows/events/emit.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func EmitExecutionStartedEvent(
137137
return multiErr
138138
}
139139

140-
func EmitExecutionFinishedEvent(ctx context.Context, labels map[string]string, status string, executionID string, lggr logger.Logger) error {
140+
func EmitExecutionFinishedEvent(ctx context.Context, labels map[string]string, status string, executionID string, execErr error, lggr logger.Logger) error {
141141
metadata := buildWorkflowMetadata(labels, executionID)
142142

143143
event := &events.WorkflowExecutionFinished{
@@ -161,12 +161,18 @@ func EmitExecutionFinishedEvent(ctx context.Context, labels map[string]string, s
161161
executionStatus = eventsv2.ExecutionStatus_EXECUTION_STATUS_UNSPECIFIED
162162
}
163163

164+
var errMsg string
165+
if execErr != nil {
166+
errMsg = execErr.Error()
167+
}
168+
164169
v2Event := &eventsv2.WorkflowExecutionFinished{
165170
CreInfo: creInfo,
166171
Workflow: workflowKey,
167172
WorkflowExecutionID: executionID,
168173
Timestamp: time.Now().Format(time.RFC3339),
169174
Status: executionStatus,
175+
Error: errMsg,
170176
}
171177

172178
// Emit both v1 and v2 events

core/services/workflows/events/emit_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package events_test
22

33
import (
4+
"errors"
45
"regexp"
56
"testing"
67

@@ -45,7 +46,7 @@ func TestEmit(t *testing.T) {
4546
})
4647

4748
t.Run(events.WorkflowExecutionFinished, func(t *testing.T) {
48-
require.NoError(t, events.EmitExecutionFinishedEvent(t.Context(), labels, "status", executionID, nil))
49+
require.NoError(t, events.EmitExecutionFinishedEvent(t.Context(), labels, "status", executionID, nil, nil))
4950
require.Len(t, labels, 1)
5051

5152
msgs := beholderObserver.Messages(t, "beholder_entity", "workflows.v1."+events.WorkflowExecutionFinished)
@@ -57,6 +58,20 @@ func TestEmit(t *testing.T) {
5758
assert.True(t, timeMatcher.MatchString(expected.Timestamp), expected.Timestamp)
5859
})
5960

61+
t.Run(events.WorkflowExecutionFinished+"_with_error", func(t *testing.T) {
62+
testErr := errors.New("something went wrong")
63+
require.NoError(t, events.EmitExecutionFinishedEvent(t.Context(), labels, "errored", executionID, testErr, nil))
64+
65+
v2Msgs := beholderObserver.Messages(t, "beholder_entity", "workflows.v2."+events.WorkflowExecutionFinished)
66+
require.NotEmpty(t, v2Msgs)
67+
68+
// Check the latest v2 message contains the error
69+
var v2Event eventsv2.WorkflowExecutionFinished
70+
require.NoError(t, proto.Unmarshal(v2Msgs[len(v2Msgs)-1].Body, &v2Event))
71+
assert.Equal(t, "something went wrong", v2Event.Error)
72+
assert.Equal(t, eventsv2.ExecutionStatus_EXECUTION_STATUS_FAILED, v2Event.Status)
73+
})
74+
6075
t.Run(events.CapabilityExecutionStarted, func(t *testing.T) {
6176
require.NoError(t, events.EmitCapabilityStartedEvent(t.Context(), labels, executionID, capabilityID, stepRef, "test-method"))
6277
require.Len(t, labels, 1)

core/services/workflows/v2/engine.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,16 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue
668668
_ = events.EmitExecutionStartedEvent(ctx, loggerLabels, triggerEvent.ID, executionID)
669669
e.metrics.With("workflowID", e.cfg.WorkflowID, "workflowName", e.cfg.WorkflowName.String()).IncrementWorkflowExecutionStartedCounter(ctx)
670670

671+
// Track execution error for deferred event emission
672+
var execErr error
673+
defer func() {
674+
_ = events.EmitExecutionFinishedEvent(ctx, loggerLabels, executionStatus, executionID, execErr, lggr)
675+
e.cfg.Hooks.OnExecutionFinished(executionID, executionStatus)
676+
if execErr != nil {
677+
e.cfg.Hooks.OnExecutionError(execErr.Error())
678+
}
679+
}()
680+
671681
var timeProvider TimeProvider = &types.LocalTimeProvider{}
672682
if !e.cfg.UseLocalTimeProvider {
673683
timeProvider = NewDonTimeProvider(e.cfg.DonTimeStore, e.cfg.WorkflowID, lggr)
@@ -676,18 +686,23 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue
676686
moduleExecuteMaxResponseSizeBytes, err := e.cfg.LocalLimiters.ExecutionResponse.Limit(ctx)
677687
if err != nil {
678688
lggr.Errorw("Failed to get execution response size limit", "err", err)
689+
executionStatus = store.StatusErrored
690+
execErr = err
679691
return
680692
}
681693
if moduleExecuteMaxResponseSizeBytes < 0 {
682-
lggr.Errorf("invalid moduleExecuteMaxResponseSizeBytes; must not be negative: %d", moduleExecuteMaxResponseSizeBytes)
694+
execErr = fmt.Errorf("invalid moduleExecuteMaxResponseSizeBytes; must not be negative: %d", moduleExecuteMaxResponseSizeBytes)
695+
lggr.Errorw(execErr.Error())
696+
executionStatus = store.StatusErrored
683697
return
684698
}
685699
execHelper := &ExecutionHelper{
686700
Engine: e, WorkflowExecutionID: executionID, UserLogChan: userLogChan,
687701
TimeProvider: timeProvider, SecretsFetcher: e.secretsFetcher(executionID),
688702
}
689703
execHelper.initLimiters(e.cfg.LocalLimiters)
690-
result, execErr := e.cfg.Module.Execute(execCtx, &sdkpb.ExecuteRequest{
704+
var result *sdkpb.ExecutionResult
705+
result, execErr = e.cfg.Module.Execute(execCtx, &sdkpb.ExecuteRequest{
691706
Request: &sdkpb.ExecuteRequest_Trigger{
692707
Trigger: &sdkpb.Trigger{
693708
Id: tid,
@@ -730,11 +745,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue
730745
} else {
731746
e.metrics.UpdateWorkflowErrorDurationHistogram(ctx, int64(executionDuration.Seconds()))
732747
}
733-
734748
executionLogger.Errorw("Workflow execution failed with module execution error", "status", executionStatus, "durationMs", executionDuration.Milliseconds(), "err", execErr)
735-
_ = events.EmitExecutionFinishedEvent(ctx, loggerLabels, executionStatus, executionID, lggr)
736-
e.cfg.Hooks.OnExecutionFinished(executionID, executionStatus)
737-
e.cfg.Hooks.OnExecutionError(execErr.Error())
738749
return
739750
}
740751

@@ -744,22 +755,18 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue
744755

745756
if len(result.GetError()) > 0 {
746757
executionStatus = store.StatusErrored
758+
execErr = errors.New(result.GetError())
747759
e.metrics.UpdateWorkflowErrorDurationHistogram(ctx, int64(executionDuration.Seconds()))
748760
e.metrics.With("workflowID", e.cfg.WorkflowID, "workflowName", e.cfg.WorkflowName.String()).IncrementWorkflowExecutionFailedCounter(ctx)
749761
executionLogger.Errorw("Workflow execution failed", "status", executionStatus, "durationMs", executionDuration.Milliseconds(), "error", result.GetError())
750-
_ = events.EmitExecutionFinishedEvent(ctx, loggerLabels, executionStatus, executionID, lggr)
751-
e.cfg.Hooks.OnExecutionFinished(executionID, executionStatus)
752-
e.cfg.Hooks.OnExecutionError(result.GetError())
753762
return
754763
}
755764

756765
executionStatus = store.StatusCompleted
757766
executionLogger.Infow("Workflow execution finished successfully", "durationMs", executionDuration.Milliseconds())
758-
_ = events.EmitExecutionFinishedEvent(ctx, loggerLabels, executionStatus, executionID, lggr)
759767
e.metrics.UpdateWorkflowCompletedDurationHistogram(ctx, int64(executionDuration.Seconds()))
760768
e.metrics.With("workflowID", e.cfg.WorkflowID, "workflowName", e.cfg.WorkflowName.String()).IncrementWorkflowExecutionSucceededCounter(ctx)
761769
e.cfg.Hooks.OnResultReceived(result)
762-
e.cfg.Hooks.OnExecutionFinished(executionID, executionStatus)
763770
}
764771

765772
func (e *Engine) secretsFetcher(phaseID string) SecretsFetcher {

deployment/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ require (
440440
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
441441
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect
442442
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3 // indirect
443-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260205231316-3b9c600dd791 // indirect
443+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f // indirect
444444
github.com/smartcontractkit/chainlink-testing-framework/parrot v0.6.2 // indirect
445445
github.com/smartcontractkit/chainlink-testing-framework/seth v1.51.3 // indirect
446446
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20251014143056-a0c6328c91e9 // indirect

deployment/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1427,8 +1427,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C
14271427
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA=
14281428
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3 h1:X8Pekpv+cy0eW1laZTwATuYLTLZ6gRTxz1ZWOMtU74o=
14291429
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
1430-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260205231316-3b9c600dd791 h1:S+yHuhcny3AKOhCekMQa65uUeR/p9rGrUIb8eifkSTY=
1431-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260205231316-3b9c600dd791/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc=
1430+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f h1:3+vQMwuWL6+OqNutFqo/+gkczJwcr+MBPqeSxcjfI1Y=
1431+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc=
14321432
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260211115641-f96bb4343198 h1:wWLBlbexHxP87lEdGR022Ve+2OW/MxvfWxg2U+uoMis=
14331433
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260211115641-f96bb4343198/go.mod h1:/cw67XEnsP9wjQQH4BhL347Qy9HDg51+ETrMpdRTPIo=
14341434
github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260209164410-3aec83b0246f h1:XlAfD2E1/xrHRgMp8TwB8sdbbLZc+TzhGAeDDKqtLk0=

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ require (
101101
github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.0
102102
github.com/smartcontractkit/chainlink-protos/ring/go v0.0.0-20260128151123-605e9540b706
103103
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0
104-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260205231316-3b9c600dd791
104+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f
105105
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260211115641-f96bb4343198
106106
github.com/smartcontractkit/chainlink-sui v0.0.0-20260124000807-bff5e296dfb7
107107
github.com/smartcontractkit/chainlink-ton v0.0.0-20260211155338-cd4708d2b938

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,8 +1235,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C
12351235
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA=
12361236
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3 h1:X8Pekpv+cy0eW1laZTwATuYLTLZ6gRTxz1ZWOMtU74o=
12371237
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
1238-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260205231316-3b9c600dd791 h1:S+yHuhcny3AKOhCekMQa65uUeR/p9rGrUIb8eifkSTY=
1239-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260205231316-3b9c600dd791/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc=
1238+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f h1:3+vQMwuWL6+OqNutFqo/+gkczJwcr+MBPqeSxcjfI1Y=
1239+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc=
12401240
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260211115641-f96bb4343198 h1:wWLBlbexHxP87lEdGR022Ve+2OW/MxvfWxg2U+uoMis=
12411241
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260211115641-f96bb4343198/go.mod h1:/cw67XEnsP9wjQQH4BhL347Qy9HDg51+ETrMpdRTPIo=
12421242
github.com/smartcontractkit/chainlink-sui v0.0.0-20260124000807-bff5e296dfb7 h1:06HM7tgzZW24XrJEMFcB6U+HwvmGfKU8u2jrI1wrFeI=

0 commit comments

Comments
 (0)