Skip to content

Commit 597c9a9

Browse files
Merge pull request #7672 from devbugging/gregor/callbacks/e2e
[Scheduled Callbacks] E2E Tests
2 parents 7a3e7c6 + f8b5c19 commit 597c9a9

File tree

31 files changed

+866
-137
lines changed

31 files changed

+866
-137
lines changed

cmd/execution_builder.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,9 @@ func (exeNode *ExecutionNode) LoadProviderEngine(
575575
opts = append(opts, computation.DefaultFVMOptions(
576576
node.RootChainID,
577577
exeNode.exeConf.computationConfig.CadenceTracing,
578-
exeNode.exeConf.computationConfig.ExtensiveTracing)...)
578+
exeNode.exeConf.computationConfig.ExtensiveTracing,
579+
exeNode.exeConf.scheduleCallbacksEnabled)...)
580+
579581
vmCtx := fvm.NewContext(opts...)
580582

581583
var collector module.ExecutionMetrics

cmd/execution_config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type ExecutionConfig struct {
6060
transactionExecutionMetricsEnabled bool
6161
transactionExecutionMetricsBufferSize uint
6262
executionDataDBMode string
63+
scheduleCallbacksEnabled bool
6364

6465
computationConfig computation.ComputationConfig
6566
receiptRequestWorkers uint // common provider engine workers
@@ -139,6 +140,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
139140
flags.BoolVar(&exeConf.onflowOnlyLNs, "temp-onflow-only-lns", false, "do not use unless required. forces node to only request collections from onflow collection nodes")
140141
flags.BoolVar(&exeConf.enableStorehouse, "enable-storehouse", false, "enable storehouse to store registers on disk, default is false")
141142
flags.BoolVar(&exeConf.enableChecker, "enable-checker", true, "enable checker to check the correctness of the execution result, default is true")
143+
flags.BoolVar(&exeConf.scheduleCallbacksEnabled, "scheduled-callbacks-enabled", false, "enable execution of scheduled callbacks")
142144
// deprecated. Retain it to prevent nodes that previously had this configuration from crashing.
143145
var deprecatedEnableNewIngestionEngine bool
144146
flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true")

cmd/util/cmd/run-script/cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func run(*cobra.Command, []string) {
139139
registersByAccount.AccountCount(),
140140
)
141141

142-
options := computation.DefaultFVMOptions(chainID, false, false)
142+
options := computation.DefaultFVMOptions(chainID, false, false, false)
143143
options = append(
144144
options,
145145
fvm.WithContractDeploymentRestricted(false),

cmd/util/ledger/migrations/transaction_migration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func NewTransactionBasedMigration(
1919
) RegistersMigration {
2020
return func(registersByAccount *registers.ByAccount) error {
2121

22-
options := computation.DefaultFVMOptions(chainID, false, false)
22+
options := computation.DefaultFVMOptions(chainID, false, false, false)
2323
options = append(options,
2424
fvm.WithContractDeploymentRestricted(false),
2525
fvm.WithContractRemovalRestricted(false),

cmd/verification_builder.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ type VerificationConfig struct {
5555
blockWorkers uint64 // number of blocks processed in parallel.
5656
chunkWorkers uint64 // number of chunks processed in parallel.
5757

58-
stopAtHeight uint64 // height to stop the node on
58+
stopAtHeight uint64 // height to stop the node on
59+
scheduleCallbacksEnabled bool // enable execution of scheduled callbacks
5960
}
6061

6162
type VerificationNodeBuilder struct {
@@ -83,6 +84,7 @@ func (v *VerificationNodeBuilder) LoadFlags() {
8384
flags.Uint64Var(&v.verConf.blockWorkers, "block-workers", blockconsumer.DefaultBlockWorkers, "maximum number of blocks being processed in parallel")
8485
flags.Uint64Var(&v.verConf.chunkWorkers, "chunk-workers", chunkconsumer.DefaultChunkWorkers, "maximum number of execution nodes a chunk data pack request is dispatched to")
8586
flags.Uint64Var(&v.verConf.stopAtHeight, "stop-at-height", 0, "height to stop the node at (0 to disable)")
87+
flags.BoolVar(&v.verConf.scheduleCallbacksEnabled, "scheduled-callbacks-enabled", false, "enable execution of scheduled callbacks")
8688
})
8789
}
8890

@@ -211,7 +213,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
211213
)
212214

213215
// TODO(JanezP): cleanup creation of fvm context github.com/onflow/flow-go/issues/5249
214-
fvmOptions = append(fvmOptions, computation.DefaultFVMOptions(node.RootChainID, false, false)...)
216+
fvmOptions = append(fvmOptions, computation.DefaultFVMOptions(node.RootChainID, false, false, v.verConf.scheduleCallbacksEnabled)...)
215217
vmCtx := fvm.NewContext(fvmOptions...)
216218

217219
chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Logger)

engine/execution/computation/computer/computer.go

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,8 @@ func (e *blockComputer) queueSystemTransactions(
324324
defer close(txQueue)
325325

326326
for i, txBody := range allTxs {
327-
last := i == len(allTxs)-1
327+
last := i == len(allTxs)-1 // Is this last tx in collection
328+
328329
ctx := callbackCtx
329330
// last transaction is system chunk and has own context
330331
if last {
@@ -383,7 +384,8 @@ func (e *blockComputer) executeBlock(
383384
e.receiptHasher,
384385
parentBlockExecutionResultID,
385386
block,
386-
e.maxConcurrency*2, // we add some buffer just in case result collection becomes slower than the execution
387+
// Add buffer just in case result collection becomes slower than the execution
388+
e.maxConcurrency*2,
387389
e.colResCons,
388390
baseSnapshot,
389391
)
@@ -454,14 +456,16 @@ func (e *blockComputer) executeUserTransactions(
454456

455457
// executeSystemTransactions executes all system transactions in the block as part of the system collection.
456458
//
457-
// System transactions are executed in the following order:
458-
// 1. system transaction that processes the scheduled callbacks which is a blocking transaction and
459-
// the result is used for the next system transaction
460-
// 2. system transactions that each execute a single scheduled callback by the ID obtained from events
461-
// of the previous system transaction
462-
// 3. system transaction that executes the system chunk
459+
// When scheduled callbacks are enabled, system transactions are executed in the following order:
460+
// 1. Process callback transaction - queries the scheduler contract to identify ready callbacks
461+
// and emits events containing callback IDs and execution effort requirements
462+
// 2. Callback execution transactions - one transaction per callback ID from step 1 events,
463+
// each executing a single scheduled callback with its specified effort limit
464+
// 3. System chunk transaction - performs standard system operations
465+
//
466+
// When scheduled callbacks are disabled, only the system chunk transaction is executed.
463467
//
464-
// An error can be returned if the process callback transaction fails. This is a fatal error.
468+
// All errors are indicators of bugs or corrupted internal state (continuation impossible)
465469
func (e *blockComputer) executeSystemTransactions(
466470
block *entity.ExecutableBlock,
467471
blockSpan otelTrace.Span,
@@ -496,7 +500,7 @@ func (e *blockComputer) executeSystemTransactions(
496500
blockId: block.BlockID(),
497501
blockIdStr: block.BlockID().String(),
498502
blockHeight: block.Block.Height,
499-
collectionIndex: len(rawCollections),
503+
collectionIndex: userCollectionCount,
500504
CompleteCollection: &entity.CompleteCollection{
501505
Collection: flow.NewEmptyCollection(), // TODO(7749)
502506
},
@@ -522,6 +526,12 @@ func (e *blockComputer) executeSystemTransactions(
522526
txIndex = updatedTxnIndex
523527
}
524528

529+
// Update logger with number of transactions once they've become known
530+
// (user tx + callbacks + 2 (process, system)
531+
systemLogger = systemLogger.With().
532+
Uint32("num_txs", uint32(userTxCount+len(callbackTxs)+2)).
533+
Logger()
534+
525535
txQueue := e.queueSystemTransactions(
526536
callbackCtx,
527537
systemChunkCtx,
@@ -557,10 +567,20 @@ func (e *blockComputer) executeQueue(
557567
wg.Wait()
558568
}
559569

560-
// executeProcessCallback executes a transaction that calls callback scheduler contract process method.
561-
// The execution result contains events that are emitted for each callback which is ready for execution.
562-
// We use these events to prepare callback execution transactions, which are later executed as part of the system collection.
563-
// An error can be returned if the process callback transaction fails. This is a fatal error.
570+
// executeProcessCallback submits a transaction that invokes the `process` method
571+
// on the callback scheduler contract.
572+
//
573+
// The `process` method scans for scheduled callbacks and emits an event for each that should
574+
// be executed. These emitted events are used to construct callback execution transactions,
575+
// which are then added to the system transaction collection.
576+
//
577+
// If the `process` transaction fails, a fatal error is returned.
578+
//
579+
// Note: this transaction is executed serially and not concurrently with the system transaction.
580+
// This is because it's unclear whether the callback executions triggered by `process`
581+
// will result in additional system transactions.
582+
// In theory, if no additional transactions are emitted, concurrent execution could be optimized.
583+
// However, due to the added complexity, this optimization was deferred.
564584
func (e *blockComputer) executeProcessCallback(
565585
systemCtx fvm.Context,
566586
systemCollectionInfo collectionInfo,
@@ -591,8 +611,7 @@ func (e *blockComputer) executeProcessCallback(
591611
}
592612

593613
return nil, 0, fmt.Errorf(
594-
"failed to execute %s transaction %v (%d@%d) for block %s at height %v: %w",
595-
"system",
614+
"failed to execute system process transaction %v (%d@%d) for block %s at height %v: %w",
596615
request.txnIdStr,
597616
request.txnIndex,
598617
snapshotTime,

0 commit comments

Comments
 (0)