diff --git a/process/block/export_test.go b/process/block/export_test.go index 4d4bf7e350..8e03a117d5 100644 --- a/process/block/export_test.go +++ b/process/block/export_test.go @@ -734,13 +734,31 @@ func (sp *shardProcessor) GetHdrForBlock() HeadersForBlock { return sp.hdrsForCurrBlock } +// PendingMiniBlocksAfterSelection - +type PendingMiniBlocksAfterSelection = pendingMiniBlocksAfterSelection + +// GetHeaderHash - +func (p *PendingMiniBlocksAfterSelection) GetHeaderHash() []byte { + return p.headerHash +} + +// GetHeader - +func (p *PendingMiniBlocksAfterSelection) GetHeader() data.HeaderHandler { + return p.header +} + +// GetMiniBlocksAndHashes - +func (p *PendingMiniBlocksAfterSelection) GetMiniBlocksAndHashes() []block.MiniblockAndHash { + return p.pendingMiniBlocksAndHashes +} + // SelectIncomingMiniBlocks - func (sp *shardProcessor) SelectIncomingMiniBlocks( lastCrossNotarizedMetaHdr data.HeaderHandler, orderedMetaBlocks []data.HeaderHandler, orderedMetaBlocksHashes [][]byte, haveTime func() bool, -) ([]block.MiniblockAndHash, error) { +) ([]*PendingMiniBlocksAfterSelection, error) { return sp.selectIncomingMiniBlocks(lastCrossNotarizedMetaHdr, orderedMetaBlocks, orderedMetaBlocksHashes, haveTime) } diff --git a/process/block/gasConsumption.go b/process/block/gasConsumption.go index 77e711f555..4abbeeb42e 100644 --- a/process/block/gasConsumption.go +++ b/process/block/gasConsumption.go @@ -18,6 +18,7 @@ import ( const ( incoming = "incoming" outgoing = "outgoing" + pending = "pending" ) const ( @@ -121,12 +122,25 @@ func (gc *gasConsumption) AddIncomingMiniBlocks( shouldSavePending, err = gc.addIncomingMiniBlock(miniBlocks[i], transactions, bandwidthForIncomingMiniBlocks) if shouldSavePending { // saving pending starting with idx i, as it was not included either - gc.pendingMiniBlocks = append(gc.pendingMiniBlocks, miniBlocks[i:]...) for _, mb := range miniBlocks[i:] { hashStr := string(mb.GetHash()) + transactionsForMB, found := transactions[hashStr] + if !found { + log.Warn("could not find transaction for pending mini block", "hash", mb.GetHash()) + return lastMiniBlockIndex, 0, fmt.Errorf("%w, could not find mini block hash in transactions map", process.ErrInvalidValue) + } + gasConsumedByPendingMb, errCheckPending := gc.checkGasConsumedByMiniBlock(mb, transactionsForMB) + if errCheckPending != nil { + log.Warn("failed to check gas consumed by pending mini block", "hash", mb.GetHash(), "error", errCheckPending) + return lastMiniBlockIndex, 0, errCheckPending + } + gc.transactionsForPendingMiniBlocks[hashStr] = transactions[hashStr] + gc.totalGasConsumed[pending] += gasConsumedByPendingMb } + gc.pendingMiniBlocks = append(gc.pendingMiniBlocks, miniBlocks[i:]...) + return lastMiniBlockIndex, len(gc.pendingMiniBlocks), err } if err != nil { @@ -155,6 +169,7 @@ func (gc *gasConsumption) RevertIncomingMiniBlocks(miniBlockHashes [][]byte) { isPending, idxInPendingSlice := gc.isPendingMiniBlock(miniBlockHash) if isPending { gc.revertPendingMiniBlock(miniBlockHash, idxInPendingSlice) + gc.totalGasConsumed[pending] -= gasConsumedByMb continue } @@ -281,6 +296,7 @@ func (gc *gasConsumption) addPendingIncomingMiniBlocks() ([]data.MiniBlockHeader lastIndexAdded := 0 for i := 0; i < len(gc.pendingMiniBlocks); i++ { mb := gc.pendingMiniBlocks[i] + gasConsumedByIncomingBefore := gc.totalGasConsumed[incoming] shouldSavePending, err := gc.addIncomingMiniBlock(mb, gc.transactionsForPendingMiniBlocks, bandwidthForIncomingMiniBlocks) if err != nil { return nil, err @@ -292,6 +308,12 @@ func (gc *gasConsumption) addPendingIncomingMiniBlocks() ([]data.MiniBlockHeader addedMiniBlocks = append(addedMiniBlocks, mb) lastIndexAdded = i + + gasConsumedByIncomingAfter := gc.totalGasConsumed[incoming] + gasConsumedByMiniBlock := gasConsumedByIncomingAfter - gasConsumedByIncomingBefore + if gasConsumedByMiniBlock <= gc.totalGasConsumed[pending] { + gc.totalGasConsumed[pending] -= gasConsumedByMiniBlock + } } gc.pendingMiniBlocks = gc.pendingMiniBlocks[lastIndexAdded+1:] @@ -471,6 +493,31 @@ func (gc *gasConsumption) TotalGasConsumedInShard(shard uint32) uint64 { return gc.totalGasConsumed[gasKeyOutgoingCross] } +// CanAddPendingIncomingMiniBlocks returns true if more pending incoming mini blocks can be added without reaching the block limits +func (gc *gasConsumption) CanAddPendingIncomingMiniBlocks() bool { + gc.mut.RLock() + defer gc.mut.RUnlock() + + totalGasToBeConsumedByPending := uint64(0) + totalGasConsumed := uint64(0) + for typeOfGas, gasConsumed := range gc.totalGasConsumed { + if typeOfGas == pending { + totalGasToBeConsumedByPending += gasConsumed + continue + } + + totalGasConsumed += gasConsumed + } + + maxGasLimitPerBlock := gc.maxGasLimitPerBlock(incoming, gc.shardCoordinator.SelfId()) + if maxGasLimitPerBlock <= totalGasConsumed { + return false + } + + gasLeft := maxGasLimitPerBlock - totalGasConsumed + return totalGasToBeConsumedByPending < gasLeft +} + // DecreaseIncomingLimit reduces the gas limit for incoming mini blocks by a configured percentage func (gc *gasConsumption) DecreaseIncomingLimit() { gc.mut.Lock() diff --git a/process/block/gasConsumption_test.go b/process/block/gasConsumption_test.go index ffa647142f..ed998991b7 100644 --- a/process/block/gasConsumption_test.go +++ b/process/block/gasConsumption_test.go @@ -1015,3 +1015,173 @@ func TestGasConsumption_ConcurrentOps(t *testing.T) { wg.Wait() }) } + +func TestGasConsumption_CanAddPendingIncomingMiniBlocks(t *testing.T) { + t.Parallel() + + t.Run("no pending mini blocks, block partially filled should return true", func(t *testing.T) { + t.Parallel() + + gc, _ := block.NewGasConsumption(getMockArgsGasConsumption()) + require.NotNil(t, gc) + + // add some mini blocks but no pending ones + mbs := generateMiniBlocks(3, 5) + txs := generateTxsForMiniBlocks(mbs) + lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs) + require.NoError(t, err) + require.Equal(t, 2, lastMbIndex) + require.Zero(t, pendingMbs) + + // no pending mini blocks, enough space left, should return true + require.True(t, gc.CanAddPendingIncomingMiniBlocks()) + }) + + t.Run("pending mini blocks can fit in remaining space", func(t *testing.T) { + t.Parallel() + + gc, _ := block.NewGasConsumption(getMockArgsGasConsumption()) + require.NotNil(t, gc) + + // add mini blocks to create some pending ones + // maxGasLimitPerBlock = 400, half * 200% = 400 for incoming + // 5 txs * 10 gas = 50 per mb + // 8 mbs will consume 400, leaving 2 as pending with total 100 gas + mbs := generateMiniBlocks(10, 5) + txs := generateTxsForMiniBlocks(mbs) + lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs) + require.NoError(t, err) + require.Equal(t, 7, lastMbIndex) + require.Equal(t, 2, pendingMbs) + + // pending mini blocks consume 100 gas (2 mbs * 5 txs * 10 gas) + // max limit is 400, consumed is 400, but we have the full tx space available (400) + require.True(t, gc.CanAddPendingIncomingMiniBlocks()) + }) + + t.Run("pending mini blocks cannot fit in remaining space", func(t *testing.T) { + t.Parallel() + + gc, _ := block.NewGasConsumption(getMockArgsGasConsumption()) + require.NotNil(t, gc) + + // add mini blocks to create pending ones + mbs := generateMiniBlocks(10, 5) + txs := generateTxsForMiniBlocks(mbs) + lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs) + require.NoError(t, err) + require.Equal(t, 7, lastMbIndex) + require.Equal(t, 2, pendingMbs) + + // now fill the transaction space to leave very little room + // maxGasLimitPerBlock = 400, half * 200% = 400 for txs + // add 39 txs (390 gas), leaving only 10 gas available + // already pending mbs need 100 gas, so one more mini block won't fit + txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 39) + _, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing) + require.NoError(t, err) + + // pending mini blocks consume 100 gas + // total consumed = 400 (incoming) + 390 (outgoing) = 790 + // total block limit is 400 * 2 = 800 + // gasLeft = 800 - 790 = 10 + // maxGasLimitPerMiniBlock (100) > gasLeft (10) + require.False(t, gc.CanAddPendingIncomingMiniBlocks()) + }) + + t.Run("block is full, no space for pending", func(t *testing.T) { + t.Parallel() + + gc, _ := block.NewGasConsumption(getMockArgsGasConsumption()) + require.NotNil(t, gc) + + // add mini blocks to create pending ones + mbs := generateMiniBlocks(10, 5) + txs := generateTxsForMiniBlocks(mbs) + lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs) + require.NoError(t, err) + require.Equal(t, 7, lastMbIndex) + require.Equal(t, 2, pendingMbs) + + // fill the transaction space completely + txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 40) + _, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing) + require.NoError(t, err) + + require.False(t, gc.CanAddPendingIncomingMiniBlocks()) + }) + + t.Run("pending mini blocks exactly fit in remaining space", func(t *testing.T) { + t.Parallel() + + gc, _ := block.NewGasConsumption(getMockArgsGasConsumption()) + require.NotNil(t, gc) + + // add mini blocks to create pending ones + mbs := generateMiniBlocks(10, 5) + txs := generateTxsForMiniBlocks(mbs) + lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs) + require.NoError(t, err) + require.Equal(t, 7, lastMbIndex) + require.Equal(t, 2, pendingMbs) + + // add 30 txs (300 gas) to leave exactly 100 gas + txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 30) + _, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing) + require.NoError(t, err) + + // pending mini blocks consume 100 gas + // total consumed = 400 (incoming) + 300 (outgoing) = 700 + // total block limit = 800 + // gasLeft = 800 - 700 = 100 + // space enough for already added pending mini blocks + require.False(t, gc.CanAddPendingIncomingMiniBlocks()) + }) + + t.Run("after incoming limit is zeroed, should return false", func(t *testing.T) { + t.Parallel() + + gc, _ := block.NewGasConsumption(getMockArgsGasConsumption()) + require.NotNil(t, gc) + + // add mini blocks to create pending ones + mbs := generateMiniBlocks(10, 5) + txs := generateTxsForMiniBlocks(mbs) + lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs) + require.NoError(t, err) + require.Equal(t, 7, lastMbIndex) + require.Equal(t, 2, pendingMbs) + + // zero the incoming limit + gc.ZeroIncomingLimit() + + require.False(t, gc.CanAddPendingIncomingMiniBlocks()) + }) + + t.Run("with transactions added leaving enough space for pending", func(t *testing.T) { + t.Parallel() + + gc, _ := block.NewGasConsumption(getMockArgsGasConsumption()) + require.NotNil(t, gc) + + // add mini blocks to create pending ones + mbs := generateMiniBlocks(10, 5) + txs := generateTxsForMiniBlocks(mbs) + lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs) + require.NoError(t, err) + require.Equal(t, 7, lastMbIndex) + require.Equal(t, 2, pendingMbs) + + // add only 10 txs (100 gas), leaving 300 gas available + txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 10) + _, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing) + require.NoError(t, err) + + // pending mini blocks consume 100 gas + // total consumed = 400 (incoming) + 100 (outgoing) = 500 + // total block limit = 800 + // gasLeft = 800 - 500 = 300 + // should allow 2 more mini blocks + require.True(t, gc.CanAddPendingIncomingMiniBlocks()) + }) +} diff --git a/process/block/shardblockProposal.go b/process/block/shardblockProposal.go index 3576f949d1..d21c04c92b 100644 --- a/process/block/shardblockProposal.go +++ b/process/block/shardblockProposal.go @@ -16,6 +16,12 @@ import ( "github.com/multiversx/mx-chain-go/state" ) +type pendingMiniBlocksAfterSelection struct { + headerHash []byte + header data.HeaderHandler + pendingMiniBlocksAndHashes []block.MiniblockAndHash +} + // TODO: maybe move this to config const maxBlockProcessingTime = 3 * time.Second @@ -427,7 +433,7 @@ func (sp *shardProcessor) createBlockBodyProposal( func (sp *shardProcessor) selectIncomingMiniBlocksForProposal( haveTime func() bool, -) ([]block.MiniblockAndHash, error) { +) ([]*pendingMiniBlocksAfterSelection, error) { log.Debug("selectIncomingMiniBlocksForProposal has been started") sw := core.NewStopWatch() @@ -472,13 +478,14 @@ func (sp *shardProcessor) selectIncomingMiniBlocks( orderedMetaBlocks []data.HeaderHandler, orderedMetaBlocksHashes [][]byte, haveTime func() bool, -) ([]block.MiniblockAndHash, error) { +) ([]*pendingMiniBlocksAfterSelection, error) { var currentMetaBlock data.HeaderHandler var currentMetaBlockHash []byte - var pendingMiniBlocks []block.MiniblockAndHash + var pendingMiniBlocks []*pendingMiniBlocksAfterSelection var errCreated error var createIncomingMbsResult *CrossShardIncomingMbsCreationResult lastMeta := lastCrossNotarizedMetaHdr + lastMetaAdded := lastCrossNotarizedMetaHdr for i := 0; i < len(orderedMetaBlocks); i++ { if !haveTime() { @@ -521,6 +528,7 @@ func (sp *shardProcessor) selectIncomingMiniBlocks( if len(currentMetaBlock.GetMiniBlockHeadersWithDst(sp.shardCoordinator.SelfId())) == 0 { sp.miniBlocksSelectionSession.AddReferencedHeader(currentMetaBlock, currentMetaBlockHash) lastMeta = currentMetaBlock + lastMetaAdded = currentMetaBlock continue } @@ -530,7 +538,12 @@ func (sp *shardProcessor) selectIncomingMiniBlocks( return nil, errCreated } - pendingMiniBlocks = append(pendingMiniBlocks, createIncomingMbsResult.PendingMiniBlocks...) + pendingMiniBlocks = append(pendingMiniBlocks, &pendingMiniBlocksAfterSelection{ + headerHash: currentMetaBlockHash, + header: currentMetaBlock, + pendingMiniBlocksAndHashes: createIncomingMbsResult.PendingMiniBlocks, + }) + if len(createIncomingMbsResult.AddedMiniBlocks) > 0 { errAdd := sp.miniBlocksSelectionSession.AddMiniBlocksAndHashes(createIncomingMbsResult.AddedMiniBlocks) if errAdd != nil { @@ -538,14 +551,23 @@ func (sp *shardProcessor) selectIncomingMiniBlocks( } sp.miniBlocksSelectionSession.AddReferencedHeader(currentMetaBlock, currentMetaBlockHash) lastMeta = currentMetaBlock + lastMetaAdded = currentMetaBlock + } + + if createIncomingMbsResult.HeaderFinished { + continue } - if !createIncomingMbsResult.HeaderFinished { + canAddMorePendingMiniBlocks := sp.gasComputation.CanAddPendingIncomingMiniBlocks() + if !canAddMorePendingMiniBlocks { break } + + // continue saving pending mini blocks until they are done or there is no possible space left in the block + lastMeta = currentMetaBlock } - go sp.requestHeadersFromHeaderIfNeeded(lastMeta) + go sp.requestHeadersFromHeaderIfNeeded(lastMetaAdded) return pendingMiniBlocks, nil } @@ -585,26 +607,46 @@ func (sp *shardProcessor) createProposalMiniBlocks( } func (sp *shardProcessor) appendPendingMiniBlocksAddedAfterSelectingOutgoingTransactions( - pendingMiniBlocksLeft []block.MiniblockAndHash, + pendingMiniBlocksLeft []*pendingMiniBlocksAfterSelection, pendingIncomingMiniBlocksAdded []data.MiniBlockHeaderHandler, ) error { if len(pendingIncomingMiniBlocksAdded) == 0 { return nil } - pendingMiniBlocksLeftMap := miniBlocksAndHashesSliceToMap(pendingMiniBlocksLeft) extraMiniBlocksAdded := make([]block.MiniblockAndHash, len(pendingIncomingMiniBlocksAdded)) for i, pendingMbAdded := range pendingIncomingMiniBlocksAdded { - miniBlockAndHash, ok := pendingMiniBlocksLeftMap[string(pendingMbAdded.GetHash())] - if !ok { + miniBlockAndHash, headerHash, header, found := findPendingMiniBlock(pendingMiniBlocksLeft, pendingMbAdded) + if !found { log.Error("pending mini block added does not exists in the remaining pending list") return process.ErrInvalidHash } extraMiniBlocksAdded[i] = miniBlockAndHash + sp.miniBlocksSelectionSession.AddReferencedHeader(header, headerHash) + } + + err := sp.miniBlocksSelectionSession.AddMiniBlocksAndHashes(extraMiniBlocksAdded) + if err != nil { + return err + } + + return nil +} + +func findPendingMiniBlock( + pendingMiniBlocksLeft []*pendingMiniBlocksAfterSelection, + pendingMbAdded data.MiniBlockHeaderHandler, +) (block.MiniblockAndHash, []byte, data.HeaderHandler, bool) { + for _, pendingMiniBlocksForHeader := range pendingMiniBlocksLeft { + pendingMiniBlocksLeftMap := miniBlocksAndHashesSliceToMap(pendingMiniBlocksForHeader.pendingMiniBlocksAndHashes) + miniBlockAndHash, ok := pendingMiniBlocksLeftMap[string(pendingMbAdded.GetHash())] + if ok { + return miniBlockAndHash, pendingMiniBlocksForHeader.headerHash, pendingMiniBlocksForHeader.header, true + } } - return sp.miniBlocksSelectionSession.AddMiniBlocksAndHashes(extraMiniBlocksAdded) + return block.MiniblockAndHash{}, nil, nil, false } func miniBlocksAndHashesSliceToMap(providedSlice []block.MiniblockAndHash) map[string]block.MiniblockAndHash { diff --git a/process/block/shardblockProposal_test.go b/process/block/shardblockProposal_test.go index 65906c78a6..0b0581cd62 100644 --- a/process/block/shardblockProposal_test.go +++ b/process/block/shardblockProposal_test.go @@ -486,32 +486,57 @@ func TestShardProcessor_CreateBlockProposal(t *testing.T) { arguments := CreateMockArguments(coreComponents, dataComponents, bootstrapComponents, statusComponents) arguments.BlockTracker = &mock.BlockTrackerMock{ ComputeLongestMetaChainFromLastNotarizedCalled: func() ([]data.HeaderHandler, [][]byte, error) { - return []data.HeaderHandler{&block.MetaBlockV3{ - Nonce: 1, - ShardInfo: []block.ShardData{ - { - Nonce: 0, - ShardID: 1, - ShardMiniBlockHeaders: []block.MiniBlockHeader{ - { - SenderShardID: 1, - ReceiverShardID: 0, + return []data.HeaderHandler{ + &block.MetaBlockV3{ + Nonce: 1, + ShardInfo: []block.ShardData{ + { + Nonce: 0, + ShardID: 1, + ShardMiniBlockHeaders: []block.MiniBlockHeader{ + { + SenderShardID: 1, + ReceiverShardID: 0, + }, }, }, + // for extra coverage, should be skipped as it is empty + { + Nonce: 1, + ShardID: 1, + ShardMiniBlockHeaders: []block.MiniBlockHeader{}, + }, }, - // for extra coverage, should be skipped as it is empty - { - Nonce: 1, - ShardID: 1, - ShardMiniBlockHeaders: []block.MiniBlockHeader{}, + MiniBlockHeaders: []block.MiniBlockHeader{ + {}, {}, }, }, - MiniBlockHeaders: []block.MiniBlockHeader{ - {}, + &block.MetaBlockV3{Nonce: 2}, + &block.MetaBlockV3{ + Nonce: 3, + ShardInfo: []block.ShardData{ + // should be saved as pending and added later + { + Nonce: 2, + ShardID: 1, + ShardMiniBlockHeaders: []block.MiniBlockHeader{ + { + SenderShardID: 1, + ReceiverShardID: 0, + }, + }, + }, + }, + MiniBlockHeaders: []block.MiniBlockHeader{ + {}, + }, }, - }, &block.MetaBlockV3{Nonce: 2}, }, - [][]byte{[]byte("hash_ok"), []byte("hash_empty")}, + [][]byte{ + []byte("hash_ok"), + []byte("hash_empty"), + []byte("hash_pending"), + }, nil }, GetLastCrossNotarizedHeaderCalled: func(shardID uint32) (data.HeaderHandler, []byte, error) { @@ -526,23 +551,44 @@ func TestShardProcessor_CreateBlockProposal(t *testing.T) { providedPendingMb := &block.MiniBlock{ TxHashes: [][]byte{[]byte("tx_hash2")}, } + providedPendingMb2 := &block.MiniBlock{ + TxHashes: [][]byte{[]byte("tx_hash3")}, + } arguments.TxCoordinator = &testscommon.TransactionCoordinatorMock{ CreateMbsCrossShardDstMeCalled: func(header data.HeaderHandler, processedMiniBlocksInfo map[string]*processedMb.ProcessedMiniBlockInfo) ([]block.MiniblockAndHash, []block.MiniblockAndHash, uint32, bool, error) { - return []block.MiniblockAndHash{ - { - Miniblock: providedMb, - Hash: []byte("providedMB"), + if header.GetNonce() == 1 { + return []block.MiniblockAndHash{ + { + Miniblock: providedMb, + Hash: []byte("providedMB"), + }, }, - }, + []block.MiniblockAndHash{ + { + Miniblock: providedPendingMb, + Hash: []byte("providedPendingMB"), + }, + }, 0, true, nil + } + + return []block.MiniblockAndHash{}, []block.MiniblockAndHash{ { - Miniblock: providedPendingMb, - Hash: []byte("providedPendingMB"), + Miniblock: providedPendingMb2, + Hash: []byte("providedPendingMB2"), }, }, 0, true, nil }, SelectOutgoingTransactionsCalled: func(nonce uint64) ([][]byte, []data.MiniBlockHeaderHandler) { - return [][]byte{}, []data.MiniBlockHeaderHandler{&block.MiniBlockHeader{Hash: []byte("providedPendingMB")}} + pendingMbsAdded := []data.MiniBlockHeaderHandler{ + &block.MiniBlockHeader{ + Hash: []byte("providedPendingMB"), + }, + &block.MiniBlockHeader{ + Hash: []byte("providedPendingMB2"), + }, + } + return [][]byte{}, pendingMbsAdded }, } @@ -554,7 +600,7 @@ func TestShardProcessor_CreateBlockProposal(t *testing.T) { ExecutionResult: &block.BaseExecutionResult{}, }, PrevHash: []byte("prevHash"), - MiniBlockHeaders: []block.MiniBlockHeader{{}, {}}, // 2 mb headers to match the provided ones + MiniBlockHeaders: []block.MiniBlockHeader{{}, {}, {}}, // 3 mb headers to match the provided ones } hdr, body, err := sp.CreateBlockProposal(initialHdr, haveTimeTrue) require.NoError(t, err) @@ -563,9 +609,15 @@ func TestShardProcessor_CreateBlockProposal(t *testing.T) { rawBody, ok := body.(*block.Body) require.True(t, ok) - require.Len(t, rawBody.MiniBlocks, 2) + require.Len(t, rawBody.MiniBlocks, 3) require.Equal(t, providedMb, rawBody.MiniBlocks[0]) require.Equal(t, providedPendingMb, rawBody.MiniBlocks[1]) + require.Equal(t, providedPendingMb2, rawBody.MiniBlocks[2]) + referencedHashes := arguments.MiniBlocksSelectionSession.GetReferencedHeaderHashes() + require.Len(t, referencedHashes, 3) + require.Equal(t, "hash_ok", string(referencedHashes[0])) + require.Equal(t, "hash_empty", string(referencedHashes[1])) + require.Equal(t, "hash_pending", string(referencedHashes[2])) }) } diff --git a/process/coordinator/processProposal.go b/process/coordinator/processProposal.go index 82f1d2d4cb..3920bb7560 100644 --- a/process/coordinator/processProposal.go +++ b/process/coordinator/processProposal.go @@ -134,6 +134,12 @@ func (tc *transactionCoordinator) CreateMbsCrossShardDstMe( // if not all mini blocks were included, remove them from the miniBlocksAndHashes slice // but add them into pendingMiniBlocksAndHashes if lastMBIndex < len(mbsSlice)-1 { + log.Debug("transactionCoordinator.CreateMbsCrossShardDstMe: could not select all mini blocks, saving them as pending", "lastMBIndex", lastMBIndex) + + for _, mbAndHash := range miniBlocksAndHashes[lastMBIndex+1:] { + numTransactions -= uint32(len(mbAndHash.Miniblock.TxHashes)) + } + pendingMiniBlocksAndHashes = miniBlocksAndHashes[lastMBIndex+1:] miniBlocksAndHashes = miniBlocksAndHashes[:lastMBIndex+1] } diff --git a/process/coordinator/processProposal_test.go b/process/coordinator/processProposal_test.go index 4586e51755..51e94efdae 100644 --- a/process/coordinator/processProposal_test.go +++ b/process/coordinator/processProposal_test.go @@ -339,7 +339,7 @@ func TestTransactionCoordinator_CreateMbsCrossShardDstMe_MiniBlockProcessing_Wit tc.gasComputation = &testscommon.GasComputationMock{ AddIncomingMiniBlocksCalled: func(miniBlocks []data.MiniBlockHeaderHandler, transactions map[string][]data.TransactionHandler) (int, int, error) { - return 0, 1, nil // last mb added index is 0, so only first mini block is added, num pendings miniblocks is 1, so the second is pending + return 0, 1, nil // last mb added index is 0, so only first mini block is added, num pending miniblocks is 1, so the second is pending }, } @@ -355,7 +355,7 @@ func TestTransactionCoordinator_CreateMbsCrossShardDstMe_MiniBlockProcessing_Wit require.Equal(t, td.mb2Info.Miniblock, pendingMiniBlocks[0].Miniblock) require.Equal(t, td.mb2Info.Hash, pendingMiniBlocks[0].Hash) - require.Equal(t, uint32(3), numTxs) + require.Equal(t, uint32(2), numTxs) require.False(t, allAdded) // Verify proposal preprocessor was used, not execution diff --git a/process/interface.go b/process/interface.go index 9a7e3f7ab0..a86f476e14 100644 --- a/process/interface.go +++ b/process/interface.go @@ -1593,6 +1593,7 @@ type GasComputation interface { ZeroOutgoingLimit() ResetIncomingLimit() ResetOutgoingLimit() + CanAddPendingIncomingMiniBlocks() bool Reset() IsInterfaceNil() bool } diff --git a/testscommon/gasComputationMock.go b/testscommon/gasComputationMock.go index 76364727e2..e3cff690a5 100644 --- a/testscommon/gasComputationMock.go +++ b/testscommon/gasComputationMock.go @@ -25,6 +25,7 @@ type GasComputationMock struct { ResetOutgoingLimitCalled func() ResetCalled func() RevertIncomingMiniBlocksCalled func(miniBlockHashes [][]byte) + CanAddPendingIncomingMiniBlocksCalled func() bool } // AddIncomingMiniBlocks - @@ -122,6 +123,14 @@ func (mock *GasComputationMock) ResetOutgoingLimit() { } } +// CanAddPendingIncomingMiniBlocks - +func (mock *GasComputationMock) CanAddPendingIncomingMiniBlocks() bool { + if mock.CanAddPendingIncomingMiniBlocksCalled != nil { + return mock.CanAddPendingIncomingMiniBlocksCalled() + } + return true +} + // Reset - func (mock *GasComputationMock) Reset() { if mock.ResetCalled != nil {