Skip to content
20 changes: 19 additions & 1 deletion process/block/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,13 +734,31 @@ func (sp *shardProcessor) GetHdrForBlock() HeadersForBlock {
return sp.hdrsForCurrBlock
}

// PendingMiniBlocksAfterSelection -
type PendingMiniBlocksAfterSelection = pendingMiniBlocksAfterSelection

// GetHeaderHash -
func (p *PendingMiniBlocksAfterSelection) GetHeaderHash() []byte {
return p.headerHash
}

// GetHeader -
func (p *PendingMiniBlocksAfterSelection) GetHeader() data.HeaderHandler {
return p.header
}

// GetMiniBlocksAndHashes -
func (p *PendingMiniBlocksAfterSelection) GetMiniBlocksAndHashes() []block.MiniblockAndHash {
return p.pendingMiniBlocksAndHashes
}

// SelectIncomingMiniBlocks -
func (sp *shardProcessor) SelectIncomingMiniBlocks(
lastCrossNotarizedMetaHdr data.HeaderHandler,
orderedMetaBlocks []data.HeaderHandler,
orderedMetaBlocksHashes [][]byte,
haveTime func() bool,
) ([]block.MiniblockAndHash, error) {
) ([]*PendingMiniBlocksAfterSelection, error) {
return sp.selectIncomingMiniBlocks(lastCrossNotarizedMetaHdr, orderedMetaBlocks, orderedMetaBlocksHashes, haveTime)
}

Expand Down
49 changes: 48 additions & 1 deletion process/block/gasConsumption.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
const (
incoming = "incoming"
outgoing = "outgoing"
pending = "pending"
)

const (
Expand Down Expand Up @@ -121,12 +122,25 @@ func (gc *gasConsumption) AddIncomingMiniBlocks(
shouldSavePending, err = gc.addIncomingMiniBlock(miniBlocks[i], transactions, bandwidthForIncomingMiniBlocks)
if shouldSavePending {
// saving pending starting with idx i, as it was not included either
gc.pendingMiniBlocks = append(gc.pendingMiniBlocks, miniBlocks[i:]...)
for _, mb := range miniBlocks[i:] {
hashStr := string(mb.GetHash())
transactionsForMB, found := transactions[hashStr]
if !found {
log.Warn("could not find transaction for pending mini block", "hash", mb.GetHash())
return lastMiniBlockIndex, 0, fmt.Errorf("%w, could not find mini block hash in transactions map", process.ErrInvalidValue)
}
gasConsumedByPendingMb, errCheckPending := gc.checkGasConsumedByMiniBlock(mb, transactionsForMB)
if errCheckPending != nil {
log.Warn("failed to check gas consumed by pending mini block", "hash", mb.GetHash(), "error", errCheckPending)
return lastMiniBlockIndex, 0, errCheckPending
}

gc.transactionsForPendingMiniBlocks[hashStr] = transactions[hashStr]
gc.totalGasConsumed[pending] += gasConsumedByPendingMb
}

gc.pendingMiniBlocks = append(gc.pendingMiniBlocks, miniBlocks[i:]...)

return lastMiniBlockIndex, len(gc.pendingMiniBlocks), err
}
if err != nil {
Expand Down Expand Up @@ -155,6 +169,7 @@ func (gc *gasConsumption) RevertIncomingMiniBlocks(miniBlockHashes [][]byte) {
isPending, idxInPendingSlice := gc.isPendingMiniBlock(miniBlockHash)
if isPending {
gc.revertPendingMiniBlock(miniBlockHash, idxInPendingSlice)
gc.totalGasConsumed[pending] -= gasConsumedByMb
continue
}

Expand Down Expand Up @@ -281,6 +296,7 @@ func (gc *gasConsumption) addPendingIncomingMiniBlocks() ([]data.MiniBlockHeader
lastIndexAdded := 0
for i := 0; i < len(gc.pendingMiniBlocks); i++ {
mb := gc.pendingMiniBlocks[i]
gasConsumedByIncomingBefore := gc.totalGasConsumed[incoming]
shouldSavePending, err := gc.addIncomingMiniBlock(mb, gc.transactionsForPendingMiniBlocks, bandwidthForIncomingMiniBlocks)
if err != nil {
return nil, err
Expand All @@ -292,6 +308,12 @@ func (gc *gasConsumption) addPendingIncomingMiniBlocks() ([]data.MiniBlockHeader

addedMiniBlocks = append(addedMiniBlocks, mb)
lastIndexAdded = i

gasConsumedByIncomingAfter := gc.totalGasConsumed[incoming]
gasConsumedByMiniBlock := gasConsumedByIncomingAfter - gasConsumedByIncomingBefore
if gasConsumedByMiniBlock <= gc.totalGasConsumed[pending] {
gc.totalGasConsumed[pending] -= gasConsumedByMiniBlock
}
}

gc.pendingMiniBlocks = gc.pendingMiniBlocks[lastIndexAdded+1:]
Expand Down Expand Up @@ -471,6 +493,31 @@ func (gc *gasConsumption) TotalGasConsumedInShard(shard uint32) uint64 {
return gc.totalGasConsumed[gasKeyOutgoingCross]
}

// CanAddPendingIncomingMiniBlocks returns true if more pending incoming mini blocks can be added without reaching the block limits
func (gc *gasConsumption) CanAddPendingIncomingMiniBlocks() bool {
gc.mut.RLock()
defer gc.mut.RUnlock()

totalGasToBeConsumedByPending := uint64(0)
totalGasConsumed := uint64(0)
for typeOfGas, gasConsumed := range gc.totalGasConsumed {
if typeOfGas == pending {
totalGasToBeConsumedByPending += gasConsumed
continue
}

totalGasConsumed += gasConsumed
}

maxGasLimitPerBlock := gc.maxGasLimitPerBlock(incoming, gc.shardCoordinator.SelfId())
if maxGasLimitPerBlock <= totalGasConsumed {
return false
}

gasLeft := maxGasLimitPerBlock - totalGasConsumed
return totalGasToBeConsumedByPending < gasLeft
}

// DecreaseIncomingLimit reduces the gas limit for incoming mini blocks by a configured percentage
func (gc *gasConsumption) DecreaseIncomingLimit() {
gc.mut.Lock()
Expand Down
170 changes: 170 additions & 0 deletions process/block/gasConsumption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,3 +1015,173 @@ func TestGasConsumption_ConcurrentOps(t *testing.T) {
wg.Wait()
})
}

func TestGasConsumption_CanAddPendingIncomingMiniBlocks(t *testing.T) {
t.Parallel()

t.Run("no pending mini blocks, block partially filled should return true", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add some mini blocks but no pending ones
mbs := generateMiniBlocks(3, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 2, lastMbIndex)
require.Zero(t, pendingMbs)

// no pending mini blocks, enough space left, should return true
require.True(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("pending mini blocks can fit in remaining space", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create some pending ones
// maxGasLimitPerBlock = 400, half * 200% = 400 for incoming
// 5 txs * 10 gas = 50 per mb
// 8 mbs will consume 400, leaving 2 as pending with total 100 gas
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// pending mini blocks consume 100 gas (2 mbs * 5 txs * 10 gas)
// max limit is 400, consumed is 400, but we have the full tx space available (400)
require.True(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("pending mini blocks cannot fit in remaining space", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// now fill the transaction space to leave very little room
// maxGasLimitPerBlock = 400, half * 200% = 400 for txs
// add 39 txs (390 gas), leaving only 10 gas available
// already pending mbs need 100 gas, so one more mini block won't fit
txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 39)
_, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing)
require.NoError(t, err)

// pending mini blocks consume 100 gas
// total consumed = 400 (incoming) + 390 (outgoing) = 790
// total block limit is 400 * 2 = 800
// gasLeft = 800 - 790 = 10
// maxGasLimitPerMiniBlock (100) > gasLeft (10)
require.False(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("block is full, no space for pending", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// fill the transaction space completely
txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 40)
_, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing)
require.NoError(t, err)

require.False(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("pending mini blocks exactly fit in remaining space", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// add 30 txs (300 gas) to leave exactly 100 gas
txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 30)
_, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing)
require.NoError(t, err)

// pending mini blocks consume 100 gas
// total consumed = 400 (incoming) + 300 (outgoing) = 700
// total block limit = 800
// gasLeft = 800 - 700 = 100
// space enough for already added pending mini blocks
require.False(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("after incoming limit is zeroed, should return false", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// zero the incoming limit
gc.ZeroIncomingLimit()

require.False(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("with transactions added leaving enough space for pending", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// add only 10 txs (100 gas), leaving 300 gas available
txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 10)
_, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing)
require.NoError(t, err)

// pending mini blocks consume 100 gas
// total consumed = 400 (incoming) + 100 (outgoing) = 500
// total block limit = 800
// gasLeft = 800 - 500 = 300
// should allow 2 more mini blocks
require.True(t, gc.CanAddPendingIncomingMiniBlocks())
})
}
Loading
Loading