Skip to content
20 changes: 19 additions & 1 deletion process/block/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,13 +734,31 @@ func (sp *shardProcessor) GetHdrForBlock() HeadersForBlock {
return sp.hdrsForCurrBlock
}

// PendingMiniBlocksAfterSelection -
type PendingMiniBlocksAfterSelection = pendingMiniBlocksAfterSelection

// GetHeaderHash -
func (p *PendingMiniBlocksAfterSelection) GetHeaderHash() []byte {
return p.headerHash
}

// GetHeader -
func (p *PendingMiniBlocksAfterSelection) GetHeader() data.HeaderHandler {
return p.header
}

// GetMiniBlocksAndHashes -
func (p *PendingMiniBlocksAfterSelection) GetMiniBlocksAndHashes() []block.MiniblockAndHash {
return p.pendingMiniBlocksAndHashes
}

// SelectIncomingMiniBlocks -
func (sp *shardProcessor) SelectIncomingMiniBlocks(
lastCrossNotarizedMetaHdr data.HeaderHandler,
orderedMetaBlocks []data.HeaderHandler,
orderedMetaBlocksHashes [][]byte,
haveTime func() bool,
) ([]block.MiniblockAndHash, error) {
) ([]*PendingMiniBlocksAfterSelection, error) {
return sp.selectIncomingMiniBlocks(lastCrossNotarizedMetaHdr, orderedMetaBlocks, orderedMetaBlocksHashes, haveTime)
}

Expand Down
49 changes: 49 additions & 0 deletions process/block/gasConsumption.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
const (
incoming = "incoming"
outgoing = "outgoing"
pending = "pending"
)

const (
Expand Down Expand Up @@ -125,6 +126,19 @@ func (gc *gasConsumption) AddIncomingMiniBlocks(
for _, mb := range miniBlocks[i:] {
hashStr := string(mb.GetHash())
gc.transactionsForPendingMiniBlocks[hashStr] = transactions[hashStr]

transactionsForMB, found := transactions[string(mb.GetHash())]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you merge this line and L128?

both appear to be reading the same transactions

e.g do the gc.transactionsForPendingMiniBlocks[hashStr] = transactionsForMB after the !found check?
or do we need to set nil explicitly in the map?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, missed it

if !found {
log.Warn("could not find transaction for pending mini block", "hash", mb.GetHash())
continue
}
gasConsumedByPendingMb, errCheckPending := gc.checkGasConsumedByMiniBlock(mb, transactionsForMB)
if errCheckPending != nil {
log.Warn("failed to check gas consumed by pending mini block", "hash", mb.GetHash(), "error", errCheckPending)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should break instead of continuing, as that means that it can create miniblock gaps when there's a problem, which would not be ok.

Same with the above continue, I think it should break there as well, as we don't want to create gaps.

Or even better returning error instead of breaking, as we rely now on the computed pending miniblocks consumed gas in the next steps.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, updated

}

gc.totalGasConsumed[pending] += gasConsumedByPendingMb
}

return lastMiniBlockIndex, len(gc.pendingMiniBlocks), err
Expand Down Expand Up @@ -155,6 +169,7 @@ func (gc *gasConsumption) RevertIncomingMiniBlocks(miniBlockHashes [][]byte) {
isPending, idxInPendingSlice := gc.isPendingMiniBlock(miniBlockHash)
if isPending {
gc.revertPendingMiniBlock(miniBlockHash, idxInPendingSlice)
gc.totalGasConsumed[pending] -= gasConsumedByMb
continue
}

Expand Down Expand Up @@ -281,6 +296,7 @@ func (gc *gasConsumption) addPendingIncomingMiniBlocks() ([]data.MiniBlockHeader
lastIndexAdded := 0
for i := 0; i < len(gc.pendingMiniBlocks); i++ {
mb := gc.pendingMiniBlocks[i]
gasConsumedByIncomingBefore := gc.totalGasConsumed[incoming]
shouldSavePending, err := gc.addIncomingMiniBlock(mb, gc.transactionsForPendingMiniBlocks, bandwidthForIncomingMiniBlocks)
if err != nil {
return nil, err
Expand All @@ -292,6 +308,13 @@ func (gc *gasConsumption) addPendingIncomingMiniBlocks() ([]data.MiniBlockHeader

addedMiniBlocks = append(addedMiniBlocks, mb)
lastIndexAdded = i

gasConsumedByIncomingAfter := gc.totalGasConsumed[incoming]
gasConsumedByMiniBlock := gasConsumedByIncomingAfter - gasConsumedByIncomingBefore
if gasConsumedByMiniBlock < gc.totalGasConsumed[pending] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the last pending miniblock I think this condition would be equal.

this relates to the comment below

// should never be false

but for the last one, it will be false.

maybe the conditon should be <= gc.totalGasConsumed[pending]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

// should never be false
gc.totalGasConsumed[pending] -= gasConsumedByMiniBlock
}
}

gc.pendingMiniBlocks = gc.pendingMiniBlocks[lastIndexAdded+1:]
Expand Down Expand Up @@ -471,6 +494,32 @@ func (gc *gasConsumption) TotalGasConsumedInShard(shard uint32) uint64 {
return gc.totalGasConsumed[gasKeyOutgoingCross]
}

// CanAddPendingIncomingMiniBlocks returns true if more pending incoming mini blocks can be added without reaching the block limits
func (gc *gasConsumption) CanAddPendingIncomingMiniBlocks() bool {
gc.mut.RLock()
defer gc.mut.RUnlock()

totalGasToBeConsumedByPending := uint64(0)
totalGasConsumed := uint64(0)
for typeOfGas, gasConsumed := range gc.totalGasConsumed {
if typeOfGas == pending {
totalGasToBeConsumedByPending += gasConsumed
continue
}

totalGasConsumed += gasConsumed
}

maxGasLimitPerBlock := gc.maxGasLimitPerBlock(incoming, gc.shardCoordinator.SelfId())
if maxGasLimitPerBlock <= totalGasConsumed {
return false
}

gasLeft := maxGasLimitPerBlock - totalGasConsumed
maxGasLimitPerMiniBlock := gc.maxGasLimitPerMiniBlock(gc.shardCoordinator.SelfId())
return totalGasToBeConsumedByPending+maxGasLimitPerMiniBlock < gasLeft
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we add the maxGasLimitPerMiniBlock to the total gas to be consumed by pending?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method checks if there is space for one more mini block left in the block.. checking against the max gas limit, as it does not receive a specific mini block to know how much it would consume

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the miniblock does not necessarily need to be of max size, it can be of any size below the maximum size.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it

}

// DecreaseIncomingLimit reduces the gas limit for incoming mini blocks by a configured percentage
func (gc *gasConsumption) DecreaseIncomingLimit() {
gc.mut.Lock()
Expand Down
170 changes: 170 additions & 0 deletions process/block/gasConsumption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,3 +1015,173 @@ func TestGasConsumption_ConcurrentOps(t *testing.T) {
wg.Wait()
})
}

func TestGasConsumption_CanAddPendingIncomingMiniBlocks(t *testing.T) {
t.Parallel()

t.Run("no pending mini blocks, block partially filled should return true", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add some mini blocks but no pending ones
mbs := generateMiniBlocks(3, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 2, lastMbIndex)
require.Zero(t, pendingMbs)

// no pending mini blocks, enough space left, should return true
require.True(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("pending mini blocks can fit in remaining space", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create some pending ones
// maxGasLimitPerBlock = 400, half * 200% = 400 for incoming
// 5 txs * 10 gas = 50 per mb
// 8 mbs will consume 400, leaving 2 as pending with total 100 gas
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// pending mini blocks consume 100 gas (2 mbs * 5 txs * 10 gas)
// max limit is 400, consumed is 400, but we have the full tx space available (400)
require.True(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("pending mini blocks cannot fit in remaining space", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// now fill the transaction space to leave very little room
// maxGasLimitPerBlock = 400, half * 200% = 400 for txs
// add 39 txs (390 gas), leaving only 10 gas available
// already pending mbs need 100 gas, so one more mini block won't fit
txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 39)
_, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing)
require.NoError(t, err)

// pending mini blocks consume 100 gas
// total consumed = 400 (incoming) + 390 (outgoing) = 790
// total block limit is 400 * 2 = 800
// gasLeft = 800 - 790 = 10
// maxGasLimitPerMiniBlock (100) > gasLeft (10)
require.False(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("block is full, no space for pending", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// fill the transaction space completely
txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 40)
_, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing)
require.NoError(t, err)

require.False(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("pending mini blocks exactly fit in remaining space", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// add 30 txs (300 gas) to leave exactly 100 gas
txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 30)
_, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing)
require.NoError(t, err)

// pending mini blocks consume 100 gas
// total consumed = 400 (incoming) + 300 (outgoing) = 700
// total block limit = 800
// gasLeft = 800 - 700 = 100
// space enough for already added pending mini blocks
require.False(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("after incoming limit is zeroed, should return false", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// zero the incoming limit
gc.ZeroIncomingLimit()

require.False(t, gc.CanAddPendingIncomingMiniBlocks())
})

t.Run("with transactions added leaving enough space for pending", func(t *testing.T) {
t.Parallel()

gc, _ := block.NewGasConsumption(getMockArgsGasConsumption())
require.NotNil(t, gc)

// add mini blocks to create pending ones
mbs := generateMiniBlocks(10, 5)
txs := generateTxsForMiniBlocks(mbs)
lastMbIndex, pendingMbs, err := gc.AddIncomingMiniBlocks(mbs, txs)
require.NoError(t, err)
require.Equal(t, 7, lastMbIndex)
require.Equal(t, 2, pendingMbs)

// add only 10 txs (100 gas), leaving 300 gas available
txHashes, txsOutgoing := generateTxs(maxGasLimitPerTx, 10)
_, _, err = gc.AddOutgoingTransactions(txHashes, txsOutgoing)
require.NoError(t, err)

// pending mini blocks consume 100 gas
// total consumed = 400 (incoming) + 100 (outgoing) = 500
// total block limit = 800
// gasLeft = 800 - 500 = 300
// should allow 2 more mini blocks
require.True(t, gc.CanAddPendingIncomingMiniBlocks())
})
}
Loading
Loading