Skip to content

Commit 3c0df29

Browse files
Merge pull request #7579 from devbugging/gregor/callbacks/04-contract-integration
[Scheduled Callbacks] Integrate callback scheduler contract
2 parents b9f7887 + 3a6aa5f commit 3c0df29

File tree

15 files changed

+176
-118
lines changed

15 files changed

+176
-118
lines changed

engine/execution/computation/computer/computer.go

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ type blockComputer struct {
111111
tracer module.Tracer
112112
log zerolog.Logger
113113
systemTxn *flow.TransactionBody
114+
processCallbackTxn *flow.TransactionBody
114115
committer ViewCommitter
115116
executionDataProvider provider.Provider
116117
signer module.Local
@@ -184,6 +185,8 @@ func NewBlockComputer(
184185
return nil, fmt.Errorf("could not build system chunk transaction: %w", err)
185186
}
186187

188+
processCallbackTxn := blueprints.ProcessCallbacksTransaction(vmCtx.Chain)
189+
187190
return &blockComputer{
188191
vm: vm,
189192
vmCtx: vmCtx,
@@ -193,6 +196,7 @@ func NewBlockComputer(
193196
tracer: tracer,
194197
log: logger,
195198
systemTxn: systemTxn,
199+
processCallbackTxn: processCallbackTxn,
196200
committer: committer,
197201
executionDataProvider: executionDataProvider,
198202
signer: signer,
@@ -241,8 +245,11 @@ func (e *blockComputer) queueUserTransactions(
241245
blockId flow.Identifier,
242246
blockHeader *flow.Header,
243247
rawCollections []*entity.CompleteCollection,
244-
requestQueue chan TransactionRequest,
245-
) {
248+
userTxCount int,
249+
) chan TransactionRequest {
250+
txQueue := make(chan TransactionRequest, userTxCount)
251+
defer close(txQueue)
252+
246253
txnIndex := uint32(0)
247254
blockIdStr := blockId.String()
248255

@@ -270,7 +277,7 @@ func (e *blockComputer) queueUserTransactions(
270277
}
271278

272279
for i, txnBody := range collection.Transactions {
273-
requestQueue <- newTransactionRequest(
280+
txQueue <- newTransactionRequest(
274281
collectionInfo,
275282
collectionCtx,
276283
collectionLogger,
@@ -280,6 +287,8 @@ func (e *blockComputer) queueUserTransactions(
280287
txnIndex += 1
281288
}
282289
}
290+
291+
return txQueue
283292
}
284293

285294
func (e *blockComputer) queueSystemTransactions(
@@ -288,16 +297,18 @@ func (e *blockComputer) queueSystemTransactions(
288297
systemColection collectionInfo,
289298
systemTxn *flow.TransactionBody,
290299
executeCallbackTxs []*flow.TransactionBody,
291-
requestQueue chan TransactionRequest,
292300
txnIndex uint32,
293301
systemLogger zerolog.Logger,
294-
) {
302+
) chan TransactionRequest {
295303
allTxs := append(executeCallbackTxs, systemTxn)
296304
// add execute callback transactions to the system collection info along to existing process transaction
297305
systemTxs := systemColection.CompleteCollection.Transactions
298306
systemColection.CompleteCollection.Transactions = append(systemTxs, allTxs...)
299307
systemLogger = systemLogger.With().Uint32("num_txs", uint32(len(systemTxs))).Logger()
300308

309+
txQueue := make(chan TransactionRequest, len(allTxs))
310+
defer close(txQueue)
311+
301312
for i, txBody := range allTxs {
302313
last := i == len(allTxs)-1
303314
ctx := callbackCtx
@@ -306,7 +317,7 @@ func (e *blockComputer) queueSystemTransactions(
306317
ctx = systemChunkCtx
307318
}
308319

309-
requestQueue <- newTransactionRequest(
320+
txQueue <- newTransactionRequest(
310321
systemColection,
311322
ctx,
312323
systemLogger,
@@ -317,6 +328,8 @@ func (e *blockComputer) queueSystemTransactions(
317328

318329
txnIndex++
319330
}
331+
332+
return txQueue
320333
}
321334

322335
func (e *blockComputer) executeBlock(
@@ -415,17 +428,13 @@ func (e *blockComputer) executeUserTransactions(
415428
rawCollections []*entity.CompleteCollection,
416429
userTxCount int,
417430
) {
418-
txQueue := make(chan TransactionRequest, userTxCount)
419-
420-
e.queueUserTransactions(
431+
txQueue := e.queueUserTransactions(
421432
block.ID(),
422433
block.Block.Header,
423434
rawCollections,
424-
txQueue,
435+
userTxCount,
425436
)
426437

427-
close(txQueue)
428-
429438
e.executeQueue(blockSpan, database, txQueue)
430439
}
431440

@@ -479,10 +488,9 @@ func (e *blockComputer) executeSystemTransactions(
479488
}
480489

481490
var callbackTxs []*flow.TransactionBody
482-
var err error
483491

484492
if e.vmCtx.ScheduleCallbacksEnabled {
485-
callbackTxs, err = e.executeProcessCallback(
493+
callbacks, updatedTxnIndex, err := e.executeProcessCallback(
486494
callbackCtx,
487495
systemCollectionInfo,
488496
database,
@@ -494,25 +502,20 @@ func (e *blockComputer) executeSystemTransactions(
494502
return err
495503
}
496504

497-
txIndex++
505+
callbackTxs = callbacks
506+
txIndex = updatedTxnIndex
498507
}
499508

500-
// queue size for callback transactions + 1 system transaction (process callback already executed)
501-
txQueue := make(chan TransactionRequest, len(callbackTxs)+1)
502-
503-
e.queueSystemTransactions(
509+
txQueue := e.queueSystemTransactions(
504510
callbackCtx,
505511
systemChunkCtx,
506512
systemCollectionInfo,
507513
e.systemTxn,
508514
callbackTxs,
509-
txQueue,
510515
txIndex,
511516
systemLogger,
512517
)
513518

514-
close(txQueue)
515-
516519
e.executeQueue(blockSpan, database, txQueue)
517520

518521
return nil
@@ -549,28 +552,28 @@ func (e *blockComputer) executeProcessCallback(
549552
blockSpan otelTrace.Span,
550553
txnIndex uint32,
551554
systemLogger zerolog.Logger,
552-
) ([]*flow.TransactionBody, error) {
553-
processTxn := blueprints.ProcessCallbacksTransaction(e.vmCtx.Chain)
554-
555+
) ([]*flow.TransactionBody, uint32, error) {
555556
// add process callback transaction to the system collection info
556-
systemCollectionInfo.CompleteCollection.Transactions = append(systemCollectionInfo.CompleteCollection.Transactions, processTxn)
557+
systemCollectionInfo.CompleteCollection.Transactions = append(systemCollectionInfo.CompleteCollection.Transactions, e.processCallbackTxn)
557558

558559
request := newTransactionRequest(
559560
systemCollectionInfo,
560561
systemCtx,
561562
systemLogger,
562563
txnIndex,
563-
processTxn,
564+
e.processCallbackTxn,
564565
false)
565566

567+
txnIndex++
568+
566569
txn, err := e.executeTransactionInternal(blockSpan, database, request, 0)
567570
if err != nil {
568571
snapshotTime := logical.Time(0)
569572
if txn != nil {
570573
snapshotTime = txn.SnapshotTime()
571574
}
572575

573-
return nil, fmt.Errorf(
576+
return nil, 0, fmt.Errorf(
574577
"failed to execute %s transaction %v (%d@%d) for block %s at height %v: %w",
575578
"system",
576579
request.txnIdStr,
@@ -582,13 +585,18 @@ func (e *blockComputer) executeProcessCallback(
582585
}
583586

584587
if txn.Output().Err != nil {
585-
return nil, fmt.Errorf(
588+
return nil, 0, fmt.Errorf(
586589
"process callback transaction %s error: %v",
587590
request.txnIdStr,
588591
txn.Output().Err)
589592
}
590593

591-
return blueprints.ExecuteCallbacksTransactions(e.vmCtx.Chain, txn.Output().Events)
594+
callbackTxs, err := blueprints.ExecuteCallbacksTransactions(e.vmCtx.Chain, txn.Output().Events)
595+
if err != nil {
596+
return nil, 0, err
597+
}
598+
599+
return callbackTxs, txnIndex, nil
592600
}
593601

594602
func (e *blockComputer) executeTransactions(

engine/execution/computation/computer/computer_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1636,16 +1636,19 @@ func (executor *callbackTestExecutor) Output() fvm.ProcedureOutput {
16361636

16371637
switch {
16381638
// scheduled callbacks process transaction
1639-
case strings.Contains(string(txBody.Script), "CallbackScheduler.process"):
1639+
case strings.Contains(string(txBody.Script), "scheduler.process"):
16401640
executor.vm.executedTransactions[txID] = "process_callback"
1641+
env := systemcontracts.SystemContractsForChain(flow.Mainnet.Chain().ChainID()).AsTemplateEnv()
1642+
eventTypeString := fmt.Sprintf("A.%v.CallbackScheduler.CallbackProcessed", env.FlowCallbackSchedulerAddress)
16411643

16421644
// return events for each scheduled callback
16431645
events := make([]flow.Event, len(executor.vm.eventPayloads))
16441646
for i, payload := range executor.vm.eventPayloads {
16451647
events[i] = flow.Event{
16461648
// TODO: we shouldn't hardcode this event types, refactor after the scheduler contract is done
1647-
Type: flow.EventType("A.0x0000000000000000.CallbackScheduler.CallbackProcessed"),
1648-
TransactionID: txProc.ID,
1649+
Type: flow.EventType(eventTypeString),
1650+
TransactionID: txProc.ID,
1651+
16491652
TransactionIndex: txProc.TxIndex,
16501653
EventIndex: uint32(i),
16511654
Payload: payload,
@@ -1656,7 +1659,7 @@ func (executor *callbackTestExecutor) Output() fvm.ProcedureOutput {
16561659
Events: events,
16571660
}
16581661
// scheduled callbacks execute transaction
1659-
case strings.Contains(string(txBody.Script), "CallbackScheduler.executeCallback"):
1662+
case strings.Contains(string(txBody.Script), "scheduler.executeCallback"):
16601663
// extract the callback ID from the arguments
16611664
if len(txBody.Arguments) == 0 {
16621665
return fvm.ProcedureOutput{}
Lines changed: 33 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,23 @@
11
package blueprints
22

33
import (
4-
_ "embed"
54
"fmt"
6-
"strings"
75

86
"github.com/onflow/cadence"
97
"github.com/onflow/cadence/encoding/ccf"
8+
"github.com/rs/zerolog/log"
109

10+
"github.com/onflow/flow-core-contracts/lib/go/templates"
11+
12+
"github.com/onflow/flow-go/fvm/systemcontracts"
1113
"github.com/onflow/flow-go/model/flow"
1214
)
1315

14-
// processScheduledCallbacksTransaction calls scheduled callback contract
15-
// and process new callbacks that should be executed.
16-
//
17-
//go:embed scripts/processScheduledCallbacksTransaction.cdc
18-
var processCallbacksTransaction string
19-
20-
// executeCallbacksTransaction calls scheduled callback contract
21-
// to execute the provided callback by ID.
22-
//
23-
//go:embed scripts/executeScheduledCallbackTransaction.cdc
24-
var executeCallbacksTransaction string
25-
26-
const (
27-
placeholderScheduledContract = "import \"CallbackScheduler\""
28-
processedCallbackIDFieldName = "ID"
29-
processedCallbackEffortFieldName = "executionEffort"
30-
processedEventTypeTemplate = "A.%v.CallbackScheduler.CallbackProcessed"
31-
callbackTransactionGasLimit = flow.DefaultMaxTransactionGasLimit
32-
)
16+
const callbackTransactionGasLimit = flow.DefaultMaxTransactionGasLimit
3317

3418
func ProcessCallbacksTransaction(chain flow.Chain) *flow.TransactionBody {
35-
script := prepareScheduledContractTransaction(chain, processCallbacksTransaction)
19+
sc := systemcontracts.SystemContractsForChain(chain.ChainID())
20+
script := templates.GenerateProcessCallbackScript(sc.AsTemplateEnv())
3621

3722
return flow.NewTransactionBody().
3823
SetScript(script).
@@ -41,22 +26,24 @@ func ProcessCallbacksTransaction(chain flow.Chain) *flow.TransactionBody {
4126

4227
func ExecuteCallbacksTransactions(chainID flow.Chain, processEvents flow.EventsList) ([]*flow.TransactionBody, error) {
4328
txs := make([]*flow.TransactionBody, 0, len(processEvents))
29+
env := systemcontracts.SystemContractsForChain(chainID.ChainID()).AsTemplateEnv()
4430

4531
for _, event := range processEvents {
46-
id, effort, err := callbackArgsFromEvent(event)
32+
id, effort, err := callbackArgsFromEvent(env, event)
4733
if err != nil {
4834
return nil, fmt.Errorf("failed to get callback args from event: %w", err)
4935
}
5036

51-
tx := executeCallbackTransaction(chainID, id, effort)
37+
tx := executeCallbackTransaction(env, id, effort)
5238
txs = append(txs, tx)
5339
}
5440

5541
return txs, nil
5642
}
5743

58-
func executeCallbackTransaction(chain flow.Chain, id []byte, effort uint64) *flow.TransactionBody {
59-
script := prepareScheduledContractTransaction(chain, executeCallbacksTransaction)
44+
func executeCallbackTransaction(env templates.Environment, id []byte, effort uint64) *flow.TransactionBody {
45+
script := templates.GenerateExecuteCallbackScript(env)
46+
6047
return flow.NewTransactionBody().
6148
SetScript(script).
6249
AddArgument(id).
@@ -68,9 +55,17 @@ func executeCallbackTransaction(chain flow.Chain, id []byte, effort uint64) *flo
6855
// The event for processed callback event is emitted by the process callback transaction from
6956
// callback scheduler contract and has the following signature:
7057
// event CallbackProcessed(ID: UInt64, executionEffort: UInt64)
71-
func callbackArgsFromEvent(event flow.Event) ([]byte, uint64, error) {
72-
scheduledContractAddress := "0x0000000000000000" // todo use contract addr
73-
if string(event.Type) != fmt.Sprintf(processedEventTypeTemplate, scheduledContractAddress) {
58+
func callbackArgsFromEvent(env templates.Environment, event flow.Event) ([]byte, uint64, error) {
59+
const (
60+
processedCallbackIDFieldName = "ID"
61+
processedCallbackEffortFieldName = "executionEffort"
62+
processedEventTypeTemplate = "A.%v.CallbackScheduler.CallbackProcessed"
63+
)
64+
65+
scheduledContractAddress := env.FlowCallbackSchedulerAddress
66+
processedEventType := flow.EventType(fmt.Sprintf(processedEventTypeTemplate, scheduledContractAddress))
67+
68+
if event.Type != processedEventType {
7469
return nil, 0, fmt.Errorf("wrong event type is passed")
7570
}
7671

@@ -99,29 +94,22 @@ func callbackArgsFromEvent(event flow.Event) ([]byte, uint64, error) {
9994
return nil, 0, fmt.Errorf("id is not uint64")
10095
}
10196

102-
effort, ok := effortValue.(cadence.UInt64)
97+
cadenceEffort, ok := effortValue.(cadence.UInt64)
10398
if !ok {
10499
return nil, 0, fmt.Errorf("effort is not uint64")
105100
}
106101

102+
effort := uint64(cadenceEffort)
103+
104+
if effort > flow.DefaultMaxTransactionGasLimit {
105+
log.Warn().Uint64("effort", effort).Msg("effort is greater than max transaction gas limit, setting to max")
106+
effort = flow.DefaultMaxTransactionGasLimit
107+
}
108+
107109
encodedID, err := ccf.Encode(id)
108110
if err != nil {
109111
return nil, 0, fmt.Errorf("failed to encode id: %w", err)
110112
}
111113

112-
return encodedID, uint64(effort), nil
113-
}
114-
115-
func prepareScheduledContractTransaction(_ flow.Chain, txScript string) []byte {
116-
// todo use this instead of palceholder address
117-
// _ = systemcontracts.SystemContractsForChain(chain.ChainID())
118-
scheduledContractAddress := "0x0000000000000000"
119-
120-
code := strings.ReplaceAll(
121-
txScript,
122-
placeholderScheduledContract,
123-
fmt.Sprintf("%s from %s", placeholderScheduledContract, scheduledContractAddress),
124-
)
125-
126-
return []byte(code)
114+
return encodedID, effort, nil
127115
}

0 commit comments

Comments
 (0)