Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 98 additions & 13 deletions process/sync/baseSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func (boot *baseBootstrap) prepareForSyncAtBoostrapIfNeeded() error {
"currHeader nonce", currentHeader.GetNonce(),
)

err := boot.prepareForSyncIfNeeded(syncingNonce, true)
err := boot.prepareForSyncIfNeeded(syncingNonce)
if err != nil {
return err
}
Expand Down Expand Up @@ -997,7 +997,7 @@ func (boot *baseBootstrap) prepareForLegacySyncIfNeeded() error {
// Finally, if everything works, the block will be committed and added into the processing queue.
// And all this mechanism will be reiterated for the next block.
func (boot *baseBootstrap) syncBlockV3(body data.BodyHandler, header data.HeaderHandler) error {
err := boot.prepareForSyncIfNeeded(header.GetNonce(), false)
err := boot.prepareForSyncIfNeeded(header.GetNonce())
if err != nil {
return err
}
Expand Down Expand Up @@ -1083,7 +1083,6 @@ func (boot *baseBootstrap) syncMiniBlocksAndTxsForHeader(

func (boot *baseBootstrap) prepareForSyncIfNeeded(
syncingNonce uint64,
withTxs bool,
) error {
if boot.preparedForSync {
return nil
Expand All @@ -1106,11 +1105,14 @@ func (boot *baseBootstrap) prepareForSyncIfNeeded(
return errGetBody
}

if withTxs {
err = boot.syncMiniBlocksAndTxsForHeader(currentHeader)
if err != nil {
return err
}
err = boot.syncMiniBlocksAndTxsForHeader(currentHeader)
if err != nil {
return err
}

err = boot.saveProposedTxsToPool(currentHeader, currentBody)
if err != nil {
return err
}

errOnProposedBlock := boot.blockProcessor.OnProposedBlock(
Expand Down Expand Up @@ -1144,11 +1146,14 @@ func (boot *baseBootstrap) prepareForSyncIfNeeded(
return errGetBody
}

if withTxs {
err = boot.syncMiniBlocksAndTxsForHeader(hdr)
if err != nil {
return err
}
err = boot.syncMiniBlocksAndTxsForHeader(hdr)
if err != nil {
return err
}

err = boot.saveProposedTxsToPool(hdr, body)
if err != nil {
return err
}

errOnProposedBlock := boot.blockProcessor.OnProposedBlock(
Expand All @@ -1174,6 +1179,86 @@ func (boot *baseBootstrap) prepareForSyncIfNeeded(
return nil
}

func (boot *baseBootstrap) saveProposedTxsToPool(
header data.HeaderHandler,
body data.BodyHandler,
) error {
if !header.IsHeaderV3() {
return nil
}

bodyPtr, ok := body.(*block.Body)
if !ok {
return process.ErrWrongTypeAssertion
}

separatedBodies := process.SeparateBodyByType(bodyPtr)

for blockType, blockBody := range separatedBodies {
dataPool, err := process.GetDataPoolByBlockType(blockType, boot.dataPool)
if err != nil {
return err
}

unit, err := process.GetStorageUnitByBlockType(blockType)
if err != nil {
return err
}

storer, err := boot.store.GetStorer(unit)
if err != nil {
return err
}

for i := 0; i < len(blockBody.MiniBlocks); i++ {
miniBlock := blockBody.MiniBlocks[i]
err = boot.saveTxsToPool(dataPool, storer, miniBlock, blockType)
if err != nil {
return err
}
}
}

return nil
}

func (boot *baseBootstrap) saveTxsToPool(
dataPool dataRetriever.ShardedDataCacherNotifier,
storer storage.Storer,
miniBlock *block.MiniBlock,
blockType block.Type,
) error {
txHashes := miniBlock.TxHashes

for _, txHash := range txHashes {
// continue if already in pool
_, ok := dataPool.SearchFirstData(txHash)
if ok {
continue
}

txBuff, err := storer.Get(txHash)
if err != nil {
return err
}

tx, err := boot.unmarshalTxByBlockType(blockType, txBuff)
if err != nil {
return err
}

cacherIdentifier := process.ShardCacherIdentifier(miniBlock.SenderShardID, miniBlock.ReceiverShardID)
dataPool.AddData(
txHash,
tx,
tx.Size(),
cacherIdentifier,
)
}

return nil
}

func (boot *baseBootstrap) getExecutionResultHeaderNonceForSyncStart(
syncingNonce uint64,
currentHeader data.HeaderHandler,
Expand Down
132 changes: 132 additions & 0 deletions process/sync/baseSync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,28 @@ package sync

import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/data/rewardTx"
"github.com/multiversx/mx-chain-core-go/data/smartContractResult"
"github.com/multiversx/mx-chain-core-go/data/transaction"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/mock"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/storage"
"github.com/multiversx/mx-chain-go/testscommon"
dataRetrieverMock "github.com/multiversx/mx-chain-go/testscommon/dataRetriever"
"github.com/multiversx/mx-chain-go/testscommon/processMocks"
storageStubs "github.com/multiversx/mx-chain-go/testscommon/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -440,3 +450,125 @@ func TestBaseBootstrap_PrepareForSyncAtBootstrapIfNeeded(t *testing.T) {
require.Equal(t, 1, numCalls) // still 1 call
})
}

func TestBaseBootstrap_SaveProposedTxsToPool(t *testing.T) {
t.Parallel()

marshaller := &marshal.GogoProtoMarshalizer{}

txCalls := 0
scCalls := 0
rwCalls := 0
peerCalls := 0

boot := &baseBootstrap{
marshalizer: marshaller,
dataPool: &dataRetrieverMock.PoolsHolderStub{
TransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier {
return &testscommon.ShardedDataStub{
AddDataCalled: func(key []byte, data interface{}, sizeInBytes int, cacheID string) {
txCalls++
},
}
},
UnsignedTransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier {
return &testscommon.ShardedDataStub{
AddDataCalled: func(key []byte, data interface{}, sizeInBytes int, cacheID string) {
scCalls++
},
}
},
RewardTransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier {
return &testscommon.ShardedDataStub{
AddDataCalled: func(key []byte, data interface{}, sizeInBytes int, cacheID string) {
rwCalls++
},
}
},
ValidatorsInfoCalled: func() dataRetriever.ShardedDataCacherNotifier {
return &testscommon.ShardedDataStub{
AddDataCalled: func(key []byte, data interface{}, sizeInBytes int, cacheID string) {
peerCalls++
},
}
},
},
store: &storageStubs.ChainStorerStub{
GetStorerCalled: func(unitType dataRetriever.UnitType) (storage.Storer, error) {
return &storageStubs.StorerStub{
GetCalled: func(key []byte) ([]byte, error) {
switch string(key) {
case "txHash1":
tx := &transaction.Transaction{
Nonce: 1,
}
txBytes, _ := marshaller.Marshal(tx)
return txBytes, nil
case "txHash2":
tx := &transaction.Transaction{
Nonce: 2,
}
txBytes, _ := marshaller.Marshal(tx)
return txBytes, nil
case "txHash3":
tx := &smartContractResult.SmartContractResult{
Nonce: 3,
CodeMetadata: []byte("codeMetadata"),
}
txBytes, _ := marshaller.Marshal(tx)
return txBytes, nil
case "txHash4":
tx := &rewardTx.RewardTx{
Round: 1,
}
txBytes, _ := marshaller.Marshal(tx)
return txBytes, nil
case "txHash5":
tx := &state.ShardValidatorInfo{
PublicKey: []byte("pubKey"),
}
txBytes, _ := marshaller.Marshal(tx)
return txBytes, nil
default:
return nil, errors.New("err")
}
},
}, nil
},
},
}

header := &block.HeaderV3{}
body := &block.Body{
MiniBlocks: []*block.MiniBlock{
&block.MiniBlock{
TxHashes: [][]byte{[]byte("txHash1")},
Type: block.TxBlock,
},
&block.MiniBlock{
TxHashes: [][]byte{[]byte("txHash2")},
Type: block.InvalidBlock,
},
&block.MiniBlock{
TxHashes: [][]byte{[]byte("txHash3")},
Type: block.SmartContractResultBlock,
},
&block.MiniBlock{
TxHashes: [][]byte{[]byte("txHash4")},
Type: block.RewardsBlock,
},
&block.MiniBlock{
TxHashes: [][]byte{[]byte("txHash5")},
Type: block.PeerBlock,
},
},
}

err := boot.SaveProposedTxsToPool(header, body)
require.Nil(t, err)

require.Equal(t, 2, txCalls)
require.Equal(t, 1, scCalls)
require.Equal(t, 1, rwCalls)
require.Equal(t, 1, peerCalls)
}
8 changes: 8 additions & 0 deletions process/sync/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,11 @@ func (boot *baseBootstrap) ProcessWaitTime() time.Duration {
func (boot *baseBootstrap) PrepareForSyncAtBoostrapIfNeeded() error {
return boot.prepareForSyncAtBoostrapIfNeeded()
}

// SaveProposedTxsToPool -
func (boot *baseBootstrap) SaveProposedTxsToPool(
header data.HeaderHandler,
body data.BodyHandler,
) error {
return boot.saveProposedTxsToPool(header, body)
}
5 changes: 4 additions & 1 deletion txcache/blocks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package txcache

import (
"encoding/hex"
"fmt"

"github.com/multiversx/mx-chain-core-go/data/block"
)

Expand Down Expand Up @@ -30,7 +33,7 @@ func getTransactionsInBlock(
for _, txHash := range txHashes {
tx, ok := txCache.GetByTxHash(txHash)
if !ok {
return nil, errNotFoundTx
return nil, fmt.Errorf("%w for txHash: %s", errNotFoundTx, hex.EncodeToString(txHash))
}

txs = append(txs, tx)
Expand Down
2 changes: 1 addition & 1 deletion txcache/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,6 @@ func Test_getTransactionsInBlock(t *testing.T) {

txs, err := getTransactionsInBlock(&blockBody, txCache, 0)
require.Nil(t, txs)
require.Equal(t, errNotFoundTx, err)
require.ErrorIs(t, err, errNotFoundTx)
})
}
Loading