Skip to content

Commit 10c8e56

Browse files
authored
Merge branch 'feat/supernova-async-exec' into MX-17392-configs-by-round-num-flooding-rounds-2
2 parents 13e81f1 + 40c7ddd commit 10c8e56

File tree

5 files changed

+243
-15
lines changed

5 files changed

+243
-15
lines changed

process/sync/baseSync.go

Lines changed: 98 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ func (boot *baseBootstrap) prepareForSyncAtBoostrapIfNeeded() error {
825825
"currHeader nonce", currentHeader.GetNonce(),
826826
)
827827

828-
err := boot.prepareForSyncIfNeeded(syncingNonce, true)
828+
err := boot.prepareForSyncIfNeeded(syncingNonce)
829829
if err != nil {
830830
return err
831831
}
@@ -997,7 +997,7 @@ func (boot *baseBootstrap) prepareForLegacySyncIfNeeded() error {
997997
// Finally, if everything works, the block will be committed and added into the processing queue.
998998
// And all this mechanism will be reiterated for the next block.
999999
func (boot *baseBootstrap) syncBlockV3(body data.BodyHandler, header data.HeaderHandler) error {
1000-
err := boot.prepareForSyncIfNeeded(header.GetNonce(), false)
1000+
err := boot.prepareForSyncIfNeeded(header.GetNonce())
10011001
if err != nil {
10021002
return err
10031003
}
@@ -1083,7 +1083,6 @@ func (boot *baseBootstrap) syncMiniBlocksAndTxsForHeader(
10831083

10841084
func (boot *baseBootstrap) prepareForSyncIfNeeded(
10851085
syncingNonce uint64,
1086-
withTxs bool,
10871086
) error {
10881087
if boot.preparedForSync {
10891088
return nil
@@ -1106,11 +1105,14 @@ func (boot *baseBootstrap) prepareForSyncIfNeeded(
11061105
return errGetBody
11071106
}
11081107

1109-
if withTxs {
1110-
err = boot.syncMiniBlocksAndTxsForHeader(currentHeader)
1111-
if err != nil {
1112-
return err
1113-
}
1108+
err = boot.syncMiniBlocksAndTxsForHeader(currentHeader)
1109+
if err != nil {
1110+
return err
1111+
}
1112+
1113+
err = boot.saveProposedTxsToPool(currentHeader, currentBody)
1114+
if err != nil {
1115+
return err
11141116
}
11151117

11161118
errOnProposedBlock := boot.blockProcessor.OnProposedBlock(
@@ -1144,11 +1146,14 @@ func (boot *baseBootstrap) prepareForSyncIfNeeded(
11441146
return errGetBody
11451147
}
11461148

1147-
if withTxs {
1148-
err = boot.syncMiniBlocksAndTxsForHeader(hdr)
1149-
if err != nil {
1150-
return err
1151-
}
1149+
err = boot.syncMiniBlocksAndTxsForHeader(hdr)
1150+
if err != nil {
1151+
return err
1152+
}
1153+
1154+
err = boot.saveProposedTxsToPool(hdr, body)
1155+
if err != nil {
1156+
return err
11521157
}
11531158

11541159
errOnProposedBlock := boot.blockProcessor.OnProposedBlock(
@@ -1174,6 +1179,86 @@ func (boot *baseBootstrap) prepareForSyncIfNeeded(
11741179
return nil
11751180
}
11761181

1182+
func (boot *baseBootstrap) saveProposedTxsToPool(
1183+
header data.HeaderHandler,
1184+
body data.BodyHandler,
1185+
) error {
1186+
if !header.IsHeaderV3() {
1187+
return nil
1188+
}
1189+
1190+
bodyPtr, ok := body.(*block.Body)
1191+
if !ok {
1192+
return process.ErrWrongTypeAssertion
1193+
}
1194+
1195+
separatedBodies := process.SeparateBodyByType(bodyPtr)
1196+
1197+
for blockType, blockBody := range separatedBodies {
1198+
dataPool, err := process.GetDataPoolByBlockType(blockType, boot.dataPool)
1199+
if err != nil {
1200+
return err
1201+
}
1202+
1203+
unit, err := process.GetStorageUnitByBlockType(blockType)
1204+
if err != nil {
1205+
return err
1206+
}
1207+
1208+
storer, err := boot.store.GetStorer(unit)
1209+
if err != nil {
1210+
return err
1211+
}
1212+
1213+
for i := 0; i < len(blockBody.MiniBlocks); i++ {
1214+
miniBlock := blockBody.MiniBlocks[i]
1215+
err = boot.saveTxsToPool(dataPool, storer, miniBlock, blockType)
1216+
if err != nil {
1217+
return err
1218+
}
1219+
}
1220+
}
1221+
1222+
return nil
1223+
}
1224+
1225+
func (boot *baseBootstrap) saveTxsToPool(
1226+
dataPool dataRetriever.ShardedDataCacherNotifier,
1227+
storer storage.Storer,
1228+
miniBlock *block.MiniBlock,
1229+
blockType block.Type,
1230+
) error {
1231+
txHashes := miniBlock.TxHashes
1232+
1233+
for _, txHash := range txHashes {
1234+
// continue if already in pool
1235+
_, ok := dataPool.SearchFirstData(txHash)
1236+
if ok {
1237+
continue
1238+
}
1239+
1240+
txBuff, err := storer.Get(txHash)
1241+
if err != nil {
1242+
return err
1243+
}
1244+
1245+
tx, err := boot.unmarshalTxByBlockType(blockType, txBuff)
1246+
if err != nil {
1247+
return err
1248+
}
1249+
1250+
cacherIdentifier := process.ShardCacherIdentifier(miniBlock.SenderShardID, miniBlock.ReceiverShardID)
1251+
dataPool.AddData(
1252+
txHash,
1253+
tx,
1254+
tx.Size(),
1255+
cacherIdentifier,
1256+
)
1257+
}
1258+
1259+
return nil
1260+
}
1261+
11771262
func (boot *baseBootstrap) getExecutionResultHeaderNonceForSyncStart(
11781263
syncingNonce uint64,
11791264
currentHeader data.HeaderHandler,

process/sync/baseSync_test.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,28 @@ package sync
22

33
import (
44
"context"
5+
"errors"
56
"sync/atomic"
67
"testing"
78
"time"
89

910
"github.com/multiversx/mx-chain-core-go/core"
1011
"github.com/multiversx/mx-chain-core-go/data"
1112
"github.com/multiversx/mx-chain-core-go/data/block"
13+
"github.com/multiversx/mx-chain-core-go/data/rewardTx"
14+
"github.com/multiversx/mx-chain-core-go/data/smartContractResult"
15+
"github.com/multiversx/mx-chain-core-go/data/transaction"
16+
"github.com/multiversx/mx-chain-core-go/marshal"
1217
"github.com/multiversx/mx-chain-go/common"
18+
"github.com/multiversx/mx-chain-go/dataRetriever"
1319
"github.com/multiversx/mx-chain-go/process"
1420
"github.com/multiversx/mx-chain-go/process/mock"
21+
"github.com/multiversx/mx-chain-go/state"
22+
"github.com/multiversx/mx-chain-go/storage"
1523
"github.com/multiversx/mx-chain-go/testscommon"
24+
dataRetrieverMock "github.com/multiversx/mx-chain-go/testscommon/dataRetriever"
1625
"github.com/multiversx/mx-chain-go/testscommon/processMocks"
26+
storageStubs "github.com/multiversx/mx-chain-go/testscommon/storage"
1727
"github.com/stretchr/testify/assert"
1828
"github.com/stretchr/testify/require"
1929
)
@@ -440,3 +450,125 @@ func TestBaseBootstrap_PrepareForSyncAtBootstrapIfNeeded(t *testing.T) {
440450
require.Equal(t, 1, numCalls) // still 1 call
441451
})
442452
}
453+
454+
func TestBaseBootstrap_SaveProposedTxsToPool(t *testing.T) {
455+
t.Parallel()
456+
457+
marshaller := &marshal.GogoProtoMarshalizer{}
458+
459+
txCalls := 0
460+
scCalls := 0
461+
rwCalls := 0
462+
peerCalls := 0
463+
464+
boot := &baseBootstrap{
465+
marshalizer: marshaller,
466+
dataPool: &dataRetrieverMock.PoolsHolderStub{
467+
TransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier {
468+
return &testscommon.ShardedDataStub{
469+
AddDataCalled: func(key []byte, data interface{}, sizeInBytes int, cacheID string) {
470+
txCalls++
471+
},
472+
}
473+
},
474+
UnsignedTransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier {
475+
return &testscommon.ShardedDataStub{
476+
AddDataCalled: func(key []byte, data interface{}, sizeInBytes int, cacheID string) {
477+
scCalls++
478+
},
479+
}
480+
},
481+
RewardTransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier {
482+
return &testscommon.ShardedDataStub{
483+
AddDataCalled: func(key []byte, data interface{}, sizeInBytes int, cacheID string) {
484+
rwCalls++
485+
},
486+
}
487+
},
488+
ValidatorsInfoCalled: func() dataRetriever.ShardedDataCacherNotifier {
489+
return &testscommon.ShardedDataStub{
490+
AddDataCalled: func(key []byte, data interface{}, sizeInBytes int, cacheID string) {
491+
peerCalls++
492+
},
493+
}
494+
},
495+
},
496+
store: &storageStubs.ChainStorerStub{
497+
GetStorerCalled: func(unitType dataRetriever.UnitType) (storage.Storer, error) {
498+
return &storageStubs.StorerStub{
499+
GetCalled: func(key []byte) ([]byte, error) {
500+
switch string(key) {
501+
case "txHash1":
502+
tx := &transaction.Transaction{
503+
Nonce: 1,
504+
}
505+
txBytes, _ := marshaller.Marshal(tx)
506+
return txBytes, nil
507+
case "txHash2":
508+
tx := &transaction.Transaction{
509+
Nonce: 2,
510+
}
511+
txBytes, _ := marshaller.Marshal(tx)
512+
return txBytes, nil
513+
case "txHash3":
514+
tx := &smartContractResult.SmartContractResult{
515+
Nonce: 3,
516+
CodeMetadata: []byte("codeMetadata"),
517+
}
518+
txBytes, _ := marshaller.Marshal(tx)
519+
return txBytes, nil
520+
case "txHash4":
521+
tx := &rewardTx.RewardTx{
522+
Round: 1,
523+
}
524+
txBytes, _ := marshaller.Marshal(tx)
525+
return txBytes, nil
526+
case "txHash5":
527+
tx := &state.ShardValidatorInfo{
528+
PublicKey: []byte("pubKey"),
529+
}
530+
txBytes, _ := marshaller.Marshal(tx)
531+
return txBytes, nil
532+
default:
533+
return nil, errors.New("err")
534+
}
535+
},
536+
}, nil
537+
},
538+
},
539+
}
540+
541+
header := &block.HeaderV3{}
542+
body := &block.Body{
543+
MiniBlocks: []*block.MiniBlock{
544+
&block.MiniBlock{
545+
TxHashes: [][]byte{[]byte("txHash1")},
546+
Type: block.TxBlock,
547+
},
548+
&block.MiniBlock{
549+
TxHashes: [][]byte{[]byte("txHash2")},
550+
Type: block.InvalidBlock,
551+
},
552+
&block.MiniBlock{
553+
TxHashes: [][]byte{[]byte("txHash3")},
554+
Type: block.SmartContractResultBlock,
555+
},
556+
&block.MiniBlock{
557+
TxHashes: [][]byte{[]byte("txHash4")},
558+
Type: block.RewardsBlock,
559+
},
560+
&block.MiniBlock{
561+
TxHashes: [][]byte{[]byte("txHash5")},
562+
Type: block.PeerBlock,
563+
},
564+
},
565+
}
566+
567+
err := boot.SaveProposedTxsToPool(header, body)
568+
require.Nil(t, err)
569+
570+
require.Equal(t, 2, txCalls)
571+
require.Equal(t, 1, scCalls)
572+
require.Equal(t, 1, rwCalls)
573+
require.Equal(t, 1, peerCalls)
574+
}

process/sync/export_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,3 +357,11 @@ func (boot *baseBootstrap) ProcessWaitTime() time.Duration {
357357
func (boot *baseBootstrap) PrepareForSyncAtBoostrapIfNeeded() error {
358358
return boot.prepareForSyncAtBoostrapIfNeeded()
359359
}
360+
361+
// SaveProposedTxsToPool -
362+
func (boot *baseBootstrap) SaveProposedTxsToPool(
363+
header data.HeaderHandler,
364+
body data.BodyHandler,
365+
) error {
366+
return boot.saveProposedTxsToPool(header, body)
367+
}

txcache/blocks.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package txcache
22

33
import (
4+
"encoding/hex"
5+
"fmt"
6+
47
"github.com/multiversx/mx-chain-core-go/data/block"
58
)
69

@@ -30,7 +33,7 @@ func getTransactionsInBlock(
3033
for _, txHash := range txHashes {
3134
tx, ok := txCache.GetByTxHash(txHash)
3235
if !ok {
33-
return nil, errNotFoundTx
36+
return nil, fmt.Errorf("%w for txHash: %s", errNotFoundTx, hex.EncodeToString(txHash))
3437
}
3538

3639
txs = append(txs, tx)

txcache/blocks_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,6 @@ func Test_getTransactionsInBlock(t *testing.T) {
9090

9191
txs, err := getTransactionsInBlock(&blockBody, txCache, 0)
9292
require.Nil(t, txs)
93-
require.Equal(t, errNotFoundTx, err)
93+
require.ErrorIs(t, err, errNotFoundTx)
9494
})
9595
}

0 commit comments

Comments
 (0)