Skip to content

Commit 9a51bc4

Browse files
authored
Merge branch 'master' into peter/upgrade-lz4
2 parents 3f1330b + daeaf04 commit 9a51bc4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+2153
-868
lines changed

cmd/util/cmd/read-light-block/read_light_block_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ func TestReadClusterRange(t *testing.T) {
2525
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
2626
// add parent as boundary
2727
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
28-
return operation.IndexClusterBlockHeight(lctx, rw.Writer(), parent.ChainID, parent.Height, parent.ID())
28+
return operation.IndexClusterBlockHeight(lctx, rw, parent.ChainID, parent.Height, parent.ID())
2929
})
3030
if err != nil {
3131
return err
3232
}
3333

3434
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
35-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), parent.ChainID, parent.Height)
35+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, parent.ChainID, parent.Height)
3636
})
3737
})
3838
require.NoError(t, err)

engine/execution/computation/computer/computer.go

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,16 @@ type collectionInfo struct {
3838

3939
collectionIndex int
4040
*entity.CompleteCollection
41-
42-
isSystemTransaction bool
4341
}
4442

43+
type ComputerTransactionType uint
44+
45+
const (
46+
ComputerTransactionTypeUser ComputerTransactionType = iota
47+
ComputerTransactionTypeSystem
48+
ComputerTransactionTypeScheduled
49+
)
50+
4551
type TransactionRequest struct {
4652
collectionInfo
4753

@@ -50,6 +56,7 @@ type TransactionRequest struct {
5056

5157
txnIndex uint32
5258

59+
transactionType ComputerTransactionType
5360
lastTransactionInCollection bool
5461

5562
ctx fvm.Context
@@ -62,6 +69,7 @@ func newTransactionRequest(
6269
collectionLogger zerolog.Logger,
6370
txnIndex uint32,
6471
txnBody *flow.TransactionBody,
72+
transactionType ComputerTransactionType,
6573
lastTransactionInCollection bool,
6674
) TransactionRequest {
6775
txnId := txnBody.ID()
@@ -83,6 +91,7 @@ func newTransactionRequest(
8391
txnId,
8492
txnIndex,
8593
txnBody),
94+
transactionType: transactionType,
8695
lastTransactionInCollection: lastTransactionInCollection,
8796
}
8897
}
@@ -281,12 +290,11 @@ func (e *blockComputer) queueUserTransactions(
281290
Logger()
282291

283292
collectionInfo := collectionInfo{
284-
blockId: blockId,
285-
blockIdStr: blockIdStr,
286-
blockHeight: blockHeader.Height,
287-
collectionIndex: idx,
288-
CompleteCollection: collection,
289-
isSystemTransaction: false,
293+
blockId: blockId,
294+
blockIdStr: blockIdStr,
295+
blockHeight: blockHeader.Height,
296+
collectionIndex: idx,
297+
CompleteCollection: collection,
290298
}
291299

292300
for i, txnBody := range collection.Collection.Transactions {
@@ -296,7 +304,9 @@ func (e *blockComputer) queueUserTransactions(
296304
collectionLogger,
297305
txnIndex,
298306
txnBody,
299-
i == len(collection.Collection.Transactions)-1)
307+
ComputerTransactionTypeUser,
308+
i == len(collection.Collection.Transactions)-1,
309+
)
300310
txnIndex += 1
301311
}
302312
}
@@ -307,49 +317,50 @@ func (e *blockComputer) queueUserTransactions(
307317
func (e *blockComputer) queueSystemTransactions(
308318
callbackCtx fvm.Context,
309319
systemChunkCtx fvm.Context,
310-
systemColection collectionInfo,
320+
systemCollection collectionInfo,
311321
systemTxn *flow.TransactionBody,
312322
executeCallbackTxs []*flow.TransactionBody,
313323
txnIndex uint32,
314324
systemLogger zerolog.Logger,
315325
) chan TransactionRequest {
316-
var logger zerolog.Logger
317326

318-
systemChunkTxLogger := systemLogger.With().
319-
Uint32("num_txs", uint32(len(systemColection.CompleteCollection.Collection.Transactions))).
327+
systemTxLogger := systemLogger.With().
328+
Uint32("num_txs", uint32(len(systemCollection.CompleteCollection.Collection.Transactions))).
329+
Bool("system_transaction", true).
320330
Logger()
321331

322-
scheduledTxLogger := systemChunkTxLogger.With().
332+
scheduledTxLogger := systemLogger.With().
333+
Uint32("num_txs", uint32(len(systemCollection.CompleteCollection.Collection.Transactions))).
323334
Bool("scheduled_transaction", true).
324-
Bool("critical_error", false).
325335
Logger()
326336

327-
allTxs := append(executeCallbackTxs, systemTxn)
328-
txQueue := make(chan TransactionRequest, len(allTxs))
337+
txQueue := make(chan TransactionRequest, len(executeCallbackTxs)+1)
329338
defer close(txQueue)
330339

331-
for i, txBody := range allTxs {
332-
systemChunkTx := i == len(allTxs)-1 // last tx in collection is system chunk
333-
ctx := callbackCtx
334-
logger = scheduledTxLogger
335-
336-
if systemChunkTx {
337-
ctx = systemChunkCtx
338-
logger = systemChunkTxLogger
339-
}
340-
340+
for _, txBody := range executeCallbackTxs {
341341
txQueue <- newTransactionRequest(
342-
systemColection,
343-
ctx,
344-
logger,
342+
systemCollection,
343+
callbackCtx,
344+
scheduledTxLogger,
345345
txnIndex,
346346
txBody,
347-
systemChunkTx,
347+
ComputerTransactionTypeScheduled,
348+
false,
348349
)
349350

350351
txnIndex++
351352
}
352353

354+
txQueue <- newTransactionRequest(
355+
systemCollection,
356+
systemChunkCtx,
357+
systemTxLogger,
358+
txnIndex,
359+
systemTxn,
360+
ComputerTransactionTypeSystem,
361+
true,
362+
)
363+
353364
return txQueue
354365
}
355366

@@ -495,20 +506,17 @@ func (e *blockComputer) executeSystemTransactions(
495506
)
496507

497508
systemCollectionInfo := collectionInfo{
498-
blockId: block.BlockID(),
499-
blockIdStr: block.BlockID().String(),
500-
blockHeight: block.Block.Height,
501-
collectionIndex: userCollectionCount,
502-
CompleteCollection: nil, // We do not yet know all the scheduled callbacks, so postpone construction of the collection.
503-
isSystemTransaction: true,
509+
blockId: block.BlockID(),
510+
blockIdStr: block.BlockID().String(),
511+
blockHeight: block.Block.Height,
512+
collectionIndex: userCollectionCount,
513+
CompleteCollection: nil, // We do not yet know all the scheduled callbacks, so postpone construction of the collection.
504514
}
505515

506-
systemLogger := callbackCtx.Logger.With().
516+
systemChunkLogger := callbackCtx.Logger.With().
507517
Str("block_id", block.BlockID().String()).
508518
Uint64("height", block.Block.Height).
509519
Bool("system_chunk", true).
510-
Bool("system_transaction", true).
511-
Bool("critical_error", true).
512520
Int("num_collections", userCollectionCount).
513521
Logger()
514522

@@ -528,7 +536,7 @@ func (e *blockComputer) executeSystemTransactions(
528536
database,
529537
blockSpan,
530538
txIndex,
531-
systemLogger,
539+
systemChunkLogger,
532540
)
533541
if err != nil {
534542
return err
@@ -565,7 +573,7 @@ func (e *blockComputer) executeSystemTransactions(
565573
e.systemTxn,
566574
callbackTxs,
567575
txIndex,
568-
systemLogger,
576+
systemChunkLogger,
569577
)
570578

571579
e.executeQueue(blockSpan, database, txQueue)
@@ -616,7 +624,7 @@ func (e *blockComputer) executeProcessCallback(
616624
systemLogger zerolog.Logger,
617625
) ([]*flow.TransactionBody, uint32, error) {
618626
callbackLogger := systemLogger.With().
619-
Bool("scheduled_transaction", true).
627+
Bool("system_transaction", true).
620628
Logger()
621629

622630
request := newTransactionRequest(
@@ -625,6 +633,7 @@ func (e *blockComputer) executeProcessCallback(
625633
callbackLogger,
626634
txnIndex,
627635
e.processCallbackTxn,
636+
ComputerTransactionTypeSystem,
628637
false)
629638

630639
txnIndex++
@@ -725,7 +734,7 @@ func (e *blockComputer) executeTransaction(
725734
attempt)
726735
if err != nil {
727736
prefix := ""
728-
if request.isSystemTransaction {
737+
if request.transactionType == ComputerTransactionTypeSystem {
729738
prefix = "system "
730739
}
731740

engine/execution/computation/computer/computer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
586586
assert.LessOrEqual(t, vm.CallCount(), (1+totalTransactionCount)/2*totalTransactionCount)
587587
})
588588

589+
// TODO: this test is flaky with a low probability of failing
589590
t.Run(
590591
"service events are emitted", func(t *testing.T) {
591592
execCtx := fvm.NewContext(
@@ -1510,7 +1511,7 @@ func testScheduledCallbackWithError(t *testing.T, chain flow.Chain, callbackEven
15101511
exemetrics.On("ExecutionTransactionExecuted",
15111512
mock.Anything,
15121513
mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool {
1513-
return !arg.Failed && arg.SystemTransaction
1514+
return !arg.Failed && (arg.SystemTransaction || arg.ScheduledTransaction)
15141515
}),
15151516
mock.Anything).
15161517
Return(nil).

engine/execution/computation/computer/result_collector.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,11 @@ func (collector *resultCollector) processTransactionResult(
240240
Logger()
241241
logger.Info().Msg("transaction execution failed")
242242

243-
if txn.isSystemTransaction {
243+
if txn.transactionType == ComputerTransactionTypeSystem {
244244
// This log is used as the data source for an alert on grafana.
245-
// The system_chunk_error field must not be changed without adding
245+
// The critical_error field must not be changed without adding
246246
// the corresponding changes in grafana.
247-
// https://github.com/dapperlabs/flow-internal/issues/1546
248247
logger.Error().
249-
Bool("system_chunk_error", true).
250-
Bool("system_transaction_error", true).
251248
Bool("critical_error", true).
252249
Msg("error executing system chunk transaction")
253250
}
@@ -314,7 +311,8 @@ func (collector *resultCollector) handleTransactionExecutionMetrics(
314311
ComputationIntensities: output.ComputationIntensities,
315312
NumberOfTxnConflictRetries: numConflictRetries,
316313
Failed: output.Err != nil,
317-
SystemTransaction: txn.isSystemTransaction,
314+
ScheduledTransaction: txn.transactionType == ComputerTransactionTypeScheduled,
315+
SystemTransaction: txn.transactionType == ComputerTransactionTypeSystem,
318316
}
319317
for _, entry := range txnExecutionSnapshot.UpdatedRegisters() {
320318
transactionExecutionStats.NumberOfBytesWrittenToRegisters += len(entry.Value)

engine/execution/computation/computer/transaction_coordinator_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ func (db *testCoordinator) newTransaction(txnIndex uint32) (
122122
zerolog.Nop(),
123123
txnIndex,
124124
&flow.TransactionBody{},
125+
ComputerTransactionTypeUser,
125126
false),
126127
0)
127128
}

engine/execution/pruner/core_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) {
7575
require.NoError(t, err)
7676
require.NoError(t, results.Store(chunk.Result))
7777
require.NoError(t, results.Index(chunk.Result.BlockID, chunk.Result.ID()))
78-
require.NoError(t, chunkDataPacks.Store([]*flow.ChunkDataPack{chunk.ChunkDataPack}))
78+
require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error {
79+
return chunkDataPacks.StoreByChunkID(lctx, []*flow.ChunkDataPack{chunk.ChunkDataPack})
80+
}))
7981
_, storeErr := collections.Store(chunk.ChunkDataPack.Collection)
8082
require.NoError(t, storeErr)
8183
// verify that chunk data pack fixture can be found by the result

0 commit comments

Comments
 (0)