Skip to content

Commit 5206222

Browse files
committed
Merge branch 'master' into tim/7749-address-system-collection-malleability
2 parents 2c1a05d + 597c9a9 commit 5206222

File tree

31 files changed

+866
-137
lines changed

31 files changed

+866
-137
lines changed

cmd/execution_builder.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,9 @@ func (exeNode *ExecutionNode) LoadProviderEngine(
575575
opts = append(opts, computation.DefaultFVMOptions(
576576
node.RootChainID,
577577
exeNode.exeConf.computationConfig.CadenceTracing,
578-
exeNode.exeConf.computationConfig.ExtensiveTracing)...)
578+
exeNode.exeConf.computationConfig.ExtensiveTracing,
579+
exeNode.exeConf.scheduleCallbacksEnabled)...)
580+
579581
vmCtx := fvm.NewContext(opts...)
580582

581583
var collector module.ExecutionMetrics

cmd/execution_config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type ExecutionConfig struct {
6060
transactionExecutionMetricsEnabled bool
6161
transactionExecutionMetricsBufferSize uint
6262
executionDataDBMode string
63+
scheduleCallbacksEnabled bool
6364

6465
computationConfig computation.ComputationConfig
6566
receiptRequestWorkers uint // common provider engine workers
@@ -139,6 +140,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
139140
flags.BoolVar(&exeConf.onflowOnlyLNs, "temp-onflow-only-lns", false, "do not use unless required. forces node to only request collections from onflow collection nodes")
140141
flags.BoolVar(&exeConf.enableStorehouse, "enable-storehouse", false, "enable storehouse to store registers on disk, default is false")
141142
flags.BoolVar(&exeConf.enableChecker, "enable-checker", true, "enable checker to check the correctness of the execution result, default is true")
143+
flags.BoolVar(&exeConf.scheduleCallbacksEnabled, "scheduled-callbacks-enabled", false, "enable execution of scheduled callbacks")
142144
// deprecated. Retain it to prevent nodes that previously had this configuration from crashing.
143145
var deprecatedEnableNewIngestionEngine bool
144146
flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true")

cmd/util/cmd/run-script/cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func run(*cobra.Command, []string) {
139139
registersByAccount.AccountCount(),
140140
)
141141

142-
options := computation.DefaultFVMOptions(chainID, false, false)
142+
options := computation.DefaultFVMOptions(chainID, false, false, false)
143143
options = append(
144144
options,
145145
fvm.WithContractDeploymentRestricted(false),

cmd/util/ledger/migrations/transaction_migration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func NewTransactionBasedMigration(
1919
) RegistersMigration {
2020
return func(registersByAccount *registers.ByAccount) error {
2121

22-
options := computation.DefaultFVMOptions(chainID, false, false)
22+
options := computation.DefaultFVMOptions(chainID, false, false, false)
2323
options = append(options,
2424
fvm.WithContractDeploymentRestricted(false),
2525
fvm.WithContractRemovalRestricted(false),

cmd/verification_builder.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ type VerificationConfig struct {
5555
blockWorkers uint64 // number of blocks processed in parallel.
5656
chunkWorkers uint64 // number of chunks processed in parallel.
5757

58-
stopAtHeight uint64 // height to stop the node on
58+
stopAtHeight uint64 // height to stop the node on
59+
scheduleCallbacksEnabled bool // enable execution of scheduled callbacks
5960
}
6061

6162
type VerificationNodeBuilder struct {
@@ -83,6 +84,7 @@ func (v *VerificationNodeBuilder) LoadFlags() {
8384
flags.Uint64Var(&v.verConf.blockWorkers, "block-workers", blockconsumer.DefaultBlockWorkers, "maximum number of blocks being processed in parallel")
8485
flags.Uint64Var(&v.verConf.chunkWorkers, "chunk-workers", chunkconsumer.DefaultChunkWorkers, "maximum number of execution nodes a chunk data pack request is dispatched to")
8586
flags.Uint64Var(&v.verConf.stopAtHeight, "stop-at-height", 0, "height to stop the node at (0 to disable)")
87+
flags.BoolVar(&v.verConf.scheduleCallbacksEnabled, "scheduled-callbacks-enabled", false, "enable execution of scheduled callbacks")
8688
})
8789
}
8890

@@ -211,7 +213,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
211213
)
212214

213215
// TODO(JanezP): cleanup creation of fvm context github.com/onflow/flow-go/issues/5249
214-
fvmOptions = append(fvmOptions, computation.DefaultFVMOptions(node.RootChainID, false, false)...)
216+
fvmOptions = append(fvmOptions, computation.DefaultFVMOptions(node.RootChainID, false, false, v.verConf.scheduleCallbacksEnabled)...)
215217
vmCtx := fvm.NewContext(fvmOptions...)
216218

217219
chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Logger)

engine/execution/computation/computer/computer.go

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,8 @@ func (e *blockComputer) queueSystemTransactions(
321321
defer close(txQueue)
322322

323323
for i, txBody := range allTxs {
324-
last := i == len(allTxs)-1
324+
last := i == len(allTxs)-1 // Is this last tx in collection
325+
325326
ctx := callbackCtx
326327
// last transaction is system chunk and has own context
327328
if last {
@@ -380,7 +381,8 @@ func (e *blockComputer) executeBlock(
380381
e.receiptHasher,
381382
parentBlockExecutionResultID,
382383
block,
383-
e.maxConcurrency*2, // we add some buffer just in case result collection becomes slower than the execution
384+
// Add buffer just in case result collection becomes slower than the execution
385+
e.maxConcurrency*2,
384386
e.colResCons,
385387
baseSnapshot,
386388
)
@@ -451,14 +453,16 @@ func (e *blockComputer) executeUserTransactions(
451453

452454
// executeSystemTransactions executes all system transactions in the block as part of the system collection.
453455
//
454-
// System transactions are executed in the following order:
455-
// 1. system transaction that processes the scheduled callbacks which is a blocking transaction and
456-
// the result is used for the next system transaction
457-
// 2. system transactions that each execute a single scheduled callback by the ID obtained from events
458-
// of the previous system transaction
459-
// 3. system transaction that executes the system chunk
456+
// When scheduled callbacks are enabled, system transactions are executed in the following order:
457+
// 1. Process callback transaction - queries the scheduler contract to identify ready callbacks
458+
// and emits events containing callback IDs and execution effort requirements
459+
// 2. Callback execution transactions - one transaction per callback ID from step 1 events,
460+
// each executing a single scheduled callback with its specified effort limit
461+
// 3. System chunk transaction - performs standard system operations
462+
//
463+
// When scheduled callbacks are disabled, only the system chunk transaction is executed.
460464
//
461-
// An error can be returned if the process callback transaction fails. This is a fatal error.
465+
// All errors are indicators of bugs or corrupted internal state (continuation impossible)
462466
func (e *blockComputer) executeSystemTransactions(
463467
block *entity.ExecutableBlock,
464468
blockSpan otelTrace.Span,
@@ -493,7 +497,7 @@ func (e *blockComputer) executeSystemTransactions(
493497
blockId: block.BlockID(),
494498
blockIdStr: block.BlockID().String(),
495499
blockHeight: block.Block.Height,
496-
collectionIndex: len(rawCollections),
500+
collectionIndex: userCollectionCount,
497501
CompleteCollection: nil, // We do not yet know all the scheduled callbacks, so postpone construction of the collection.
498502
isSystemTransaction: true,
499503
}
@@ -544,6 +548,12 @@ func (e *blockComputer) executeSystemTransactions(
544548
}
545549
}
546550

551+
// Update logger with number of transactions once they've become known
552+
// (user tx + callbacks + 2 (process, system)
553+
systemLogger = systemLogger.With().
554+
Uint32("num_txs", uint32(userTxCount+len(callbackTxs)+2)).
555+
Logger()
556+
547557
txQueue := e.queueSystemTransactions(
548558
callbackCtx,
549559
systemChunkCtx,
@@ -579,10 +589,20 @@ func (e *blockComputer) executeQueue(
579589
wg.Wait()
580590
}
581591

582-
// executeProcessCallback executes a transaction that calls callback scheduler contract process method.
583-
// The execution result contains events that are emitted for each callback which is ready for execution.
584-
// We use these events to prepare callback execution transactions, which are later executed as part of the system collection.
585-
// An error can be returned if the process callback transaction fails. This is a fatal error.
592+
// executeProcessCallback submits a transaction that invokes the `process` method
593+
// on the callback scheduler contract.
594+
//
595+
// The `process` method scans for scheduled callbacks and emits an event for each that should
596+
// be executed. These emitted events are used to construct callback execution transactions,
597+
// which are then added to the system transaction collection.
598+
//
599+
// If the `process` transaction fails, a fatal error is returned.
600+
//
601+
// Note: this transaction is executed serially and not concurrently with the system transaction.
602+
// This is because it's unclear whether the callback executions triggered by `process`
603+
// will result in additional system transactions.
604+
// In theory, if no additional transactions are emitted, concurrent execution could be optimized.
605+
// However, due to the added complexity, this optimization was deferred.
586606
func (e *blockComputer) executeProcessCallback(
587607
systemCtx fvm.Context,
588608
systemCollectionInfo collectionInfo,
@@ -609,8 +629,7 @@ func (e *blockComputer) executeProcessCallback(
609629
}
610630

611631
return nil, 0, fmt.Errorf(
612-
"failed to execute %s transaction %v (%d@%d) for block %s at height %v: %w",
613-
"system",
632+
"failed to execute system process transaction %v (%d@%d) for block %s at height %v: %w",
614633
request.txnIdStr,
615634
request.txnIndex,
616635
snapshotTime,

0 commit comments

Comments
 (0)