Skip to content

Commit 1c1f93b

Browse files
authored
Merge pull request #7992 from devbugging/gregor/callbacks/process-fallback
[Scheduled Callbacks] Gracefully handle process execution failure
2 parents 650a67b + 45ecca6 commit 1c1f93b

File tree

2 files changed

+147
-24
lines changed

2 files changed

+147
-24
lines changed

engine/execution/computation/computer/computer.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -656,10 +656,15 @@ func (e *blockComputer) executeProcessCallback(
656656
}
657657

658658
if txn.Output().Err != nil {
659-
return nil, 0, fmt.Errorf(
660-
"process callback transaction %s error: %v",
661-
request.txnIdStr,
662-
txn.Output().Err)
659+
// if the process transaction fails we log the critical error but don't return an error
660+
// so that block execution continues and only the scheduled transactions halt
661+
callbackLogger.Error().
662+
Err(txn.Output().Err).
663+
Bool("critical_error", true).
664+
Uint64("height", request.ctx.BlockHeader.Height).
665+
Msg("system process transaction output error")
666+
667+
return nil, txnIndex, nil
663668
}
664669

665670
callbackTxs, err := blueprints.ExecuteCallbacksTransactions(e.vmCtx.Chain, txn.Output().Events)

engine/execution/computation/computer/computer_test.go

Lines changed: 138 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package computer_test
22

33
import (
4+
"bytes"
45
"context"
6+
"encoding/json"
57
"fmt"
68
"math/rand"
79
"strings"
@@ -1422,26 +1424,36 @@ func Test_ScheduledCallback(t *testing.T) {
14221424

14231425
t.Run("process callback transaction execution error", func(t *testing.T) {
14241426
processCallbackError := fvmErrors.NewInvalidAddressErrorf(flow.EmptyAddress, "process callback execution failed")
1425-
testScheduledCallbackWithError(t, chain, []cadence.Event{}, 0, processCallbackError)
1427+
testScheduledCallbackWithError(t, chain, []cadence.Event{}, 0, processCallbackError, nil)
14261428
})
14271429

14281430
t.Run("process callback transaction output error", func(t *testing.T) {
14291431
processCallbackError := fvmErrors.NewInvalidAddressErrorf(flow.EmptyAddress, "process callback output error")
1430-
testScheduledCallbackWithError(t, chain, []cadence.Event{}, 0, processCallbackError)
1432+
testScheduledCallbackWithError(t, chain, []cadence.Event{}, 2, nil, processCallbackError)
14311433
})
14321434
}
14331435

14341436
func testScheduledCallback(t *testing.T, chain flow.Chain, callbackEvents []cadence.Event, expectedTransactionCount int) {
1435-
testScheduledCallbackWithError(t, chain, callbackEvents, expectedTransactionCount, nil)
1437+
testScheduledCallbackWithError(t, chain, callbackEvents, expectedTransactionCount, nil, nil)
14361438
}
14371439

1438-
func testScheduledCallbackWithError(t *testing.T, chain flow.Chain, callbackEvents []cadence.Event, expectedTransactionCount int, processCallbackError fvmErrors.CodedError) {
1440+
func testScheduledCallbackWithError(
1441+
t *testing.T,
1442+
chain flow.Chain,
1443+
callbackEvents []cadence.Event,
1444+
expectedTransactionCount int,
1445+
processExecuteError fvmErrors.CodedError,
1446+
processOutputError fvmErrors.CodedError,
1447+
) {
14391448
rag := &RandomAddressGenerator{}
14401449
executorID := unittest.IdentifierFixture()
14411450

1451+
testLogger := NewTestLogger()
1452+
14421453
execCtx := fvm.NewContext(
14431454
fvm.WithScheduleCallbacksEnabled(true), // Enable callbacks
14441455
fvm.WithChain(chain),
1456+
fvm.WithLogger(testLogger.Logger),
14451457
)
14461458

14471459
// track which transactions were executed and their details
@@ -1474,9 +1486,10 @@ func testScheduledCallbackWithError(t *testing.T, chain flow.Chain, callbackEven
14741486
vm := &callbackTestVM{
14751487
testVM: testVM{
14761488
t: t,
1477-
eventsPerTransaction: 0, // we'll handle events manually
1478-
err: processCallbackError, // inject error if provided
1489+
eventsPerTransaction: 0, // we'll handle events manually
1490+
err: processExecuteError, // inject error if provided
14791491
},
1492+
processOutputErr: processOutputError,
14801493
executedTransactions: executedTransactions,
14811494
executedMutex: &executedTransactionsMutex,
14821495
eventPayloads: eventPayloads,
@@ -1507,15 +1520,34 @@ func testScheduledCallbackWithError(t *testing.T, chain flow.Chain, callbackEven
15071520
Return(nil).
15081521
Times(1)
15091522

1510-
// expect the specified number of transactions
1511-
exemetrics.On("ExecutionTransactionExecuted",
1512-
mock.Anything,
1513-
mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool {
1514-
return !arg.Failed && (arg.SystemTransaction || arg.ScheduledTransaction)
1515-
}),
1516-
mock.Anything).
1517-
Return(nil).
1518-
Times(expectedTransactionCount)
1523+
if processOutputError != nil {
1524+
// expect 1 failed transaction (process callback) + 1 successful transaction (system chunk)
1525+
exemetrics.On("ExecutionTransactionExecuted",
1526+
mock.Anything,
1527+
mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool {
1528+
return arg.Failed && (arg.SystemTransaction || arg.ScheduledTransaction)
1529+
}),
1530+
mock.Anything).
1531+
Return(nil).
1532+
Times(1)
1533+
exemetrics.On("ExecutionTransactionExecuted",
1534+
mock.Anything,
1535+
mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool {
1536+
return !arg.Failed && (arg.SystemTransaction || arg.ScheduledTransaction)
1537+
}),
1538+
mock.Anything).
1539+
Return(nil).
1540+
Times(expectedTransactionCount - 1)
1541+
} else {
1542+
exemetrics.On("ExecutionTransactionExecuted",
1543+
mock.Anything,
1544+
mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool {
1545+
return !arg.Failed && (arg.SystemTransaction || arg.ScheduledTransaction)
1546+
}),
1547+
mock.Anything).
1548+
Return(nil).
1549+
Times(expectedTransactionCount)
1550+
}
15191551

15201552
exemetrics.On(
15211553
"ExecutionChunkDataPackGenerated",
@@ -1556,7 +1588,7 @@ func testScheduledCallbackWithError(t *testing.T, chain flow.Chain, callbackEven
15561588
execCtx,
15571589
exemetrics,
15581590
trace.NewNoopTracer(),
1559-
zerolog.Nop(),
1591+
testLogger.Logger,
15601592
committer,
15611593
me,
15621594
prov,
@@ -1577,12 +1609,32 @@ func testScheduledCallbackWithError(t *testing.T, chain flow.Chain, callbackEven
15771609
derived.NewEmptyDerivedBlockData(0))
15781610

15791611
// If we expect an error, verify it and return early
1580-
if processCallbackError != nil {
1612+
if processExecuteError != nil {
15811613
require.Error(t, err)
15821614
require.Contains(t, err.Error(), "system process transaction")
15831615
return
15841616
}
15851617

1618+
if processOutputError != nil {
1619+
require.NoError(t, err)
1620+
require.Truef(
1621+
t,
1622+
testLogger.HasLogWithField("system process transaction output error", "critical_error", true),
1623+
"expected critical error log not found",
1624+
)
1625+
1626+
// verify the process callback transaction failed as expected
1627+
require.Len(t, result.AllTransactionResults(), expectedTransactionCount)
1628+
processCallbackResult := result.AllTransactionResults()[0]
1629+
require.NotEmpty(t, processCallbackResult.ErrorMessage, "process callback transaction should have failed")
1630+
require.Contains(t, processCallbackResult.ErrorMessage, "process callback output error")
1631+
1632+
// verify system chunk transaction succeeded
1633+
systemChunkResult := result.AllTransactionResults()[1]
1634+
require.Empty(t, systemChunkResult.ErrorMessage, "system chunk transaction should not have failed")
1635+
return
1636+
}
1637+
15861638
require.NoError(t, err)
15871639

15881640
// verify execution results
@@ -1640,7 +1692,8 @@ func testScheduledCallbackWithError(t *testing.T, chain flow.Chain, callbackEven
16401692

16411693
// callbackTestVM is a custom VM for testing callback execution
16421694
type callbackTestVM struct {
1643-
testVM // Embed testVM
1695+
testVM
1696+
processOutputErr fvmErrors.CodedError
16441697
executedTransactions map[string]string
16451698
executedMutex *sync.Mutex
16461699
eventPayloads [][]byte
@@ -1689,17 +1742,18 @@ func (c *callbackTestExecutor) Execute() error {
16891742
// from the output of the procedure executor
16901743
func (c *callbackTestExecutor) Output() fvm.ProcedureOutput {
16911744
// Return error if one was injected for process callback transaction
1692-
if c.vm.err != nil {
1745+
if c.vm.processOutputErr != nil {
16931746
txProc, ok := c.proc.(*fvm.TransactionProcedure)
16941747
if ok {
16951748
script := string(txProc.Transaction.Script)
16961749
if strings.Contains(script, "scheduler.process") {
16971750
return fvm.ProcedureOutput{
1698-
Err: c.vm.err,
1751+
Err: c.vm.processOutputErr,
16991752
}
17001753
}
17011754
}
17021755
}
1756+
17031757
c.vm.executedMutex.Lock()
17041758
defer c.vm.executedMutex.Unlock()
17051759

@@ -2066,3 +2120,67 @@ func (p *programLoader) Compute(
20662120
) {
20672121
return p.load()
20682122
}
2123+
2124+
// TestLogger captures log output for testing and provides methods to verify logged messages.
2125+
type TestLogger struct {
2126+
buffer bytes.Buffer
2127+
Logger zerolog.Logger
2128+
}
2129+
2130+
func NewTestLogger() *TestLogger {
2131+
tl := &TestLogger{}
2132+
tl.Logger = zerolog.New(&tl.buffer).Level(zerolog.DebugLevel)
2133+
return tl
2134+
}
2135+
2136+
type LogEntry struct {
2137+
Level string
2138+
Message string
2139+
Fields map[string]interface{}
2140+
}
2141+
2142+
func (tl *TestLogger) Logs() []LogEntry {
2143+
var entries []LogEntry
2144+
lines := strings.Split(tl.buffer.String(), "\n")
2145+
for _, line := range lines {
2146+
if line == "" {
2147+
continue
2148+
}
2149+
var rawEntry map[string]interface{}
2150+
if err := json.Unmarshal([]byte(line), &rawEntry); err != nil {
2151+
continue
2152+
}
2153+
entry := LogEntry{
2154+
Fields: make(map[string]interface{}),
2155+
}
2156+
for k, v := range rawEntry {
2157+
switch k {
2158+
case "level":
2159+
entry.Level = fmt.Sprintf("%v", v)
2160+
case "message":
2161+
entry.Message = fmt.Sprintf("%v", v)
2162+
default:
2163+
entry.Fields[k] = v
2164+
}
2165+
}
2166+
entries = append(entries, entry)
2167+
}
2168+
return entries
2169+
}
2170+
2171+
func (tl *TestLogger) HasLog(message string) bool {
2172+
return strings.Contains(tl.buffer.String(), message)
2173+
}
2174+
2175+
func (tl *TestLogger) HasLogWithField(message string, fieldName string, fieldValue interface{}) bool {
2176+
for _, entry := range tl.Logs() {
2177+
if strings.Contains(entry.Message, message) {
2178+
if val, ok := entry.Fields[fieldName]; ok {
2179+
if val == fieldValue {
2180+
return true
2181+
}
2182+
}
2183+
}
2184+
}
2185+
return false
2186+
}

0 commit comments

Comments
 (0)