Skip to content

Commit 92ecd0c

Browse files
committed
Fixed mempool implementaiton. Fixed tests for collection builder
1 parent 1cf5658 commit 92ecd0c

File tree

4 files changed

+31
-44
lines changed

4 files changed

+31
-44
lines changed

module/builder/collection/builder_test.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,9 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSizeRateLimiting() {
10881088
// start with an empty mempool
10891089
suite.ClearPool()
10901090

1091+
cfg := updatable_configs.DefaultBySealingLagRateLimiterConfigs()
1092+
suite.Require().NoError(cfg.SetMinSealingLag(50)) // set min sealing lag to 50 blocks so we can hit rate limiting
1093+
suite.Require().NoError(cfg.SetMaxSealingLag(50)) // set max sealing lag to 50 blocks so we can hit rate limiting
10911094
suite.builder, _ = builder.NewBuilder(
10921095
suite.db,
10931096
trace.NewNoopTracer(),
@@ -1100,7 +1103,7 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSizeRateLimiting() {
11001103
suite.pool,
11011104
unittest.Logger(),
11021105
suite.epochCounter,
1103-
updatable_configs.DefaultBySealingLagRateLimiterConfigs(),
1106+
cfg,
11041107
builder.WithMaxCollectionSize(100),
11051108
)
11061109

@@ -1125,27 +1128,36 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSizeRateLimiting() {
11251128
// build a long chain of blocks that were finalized but not sealed
11261129
// this will lead to a big sealing lag.
11271130
for i := 0; i < 100; i++ {
1128-
block := unittest.BlockWithParentFixture(head)
1129-
block.SetPayload(unittest.PayloadFixture(unittest.WithProtocolStateID(protocolStateID)))
1130-
err = suite.protoState.ExtendCertified(context.Background(), block, unittest.CertifyBlock(block.Header))
1131+
block := unittest.BlockWithParentAndPayload(head, unittest.PayloadFixture(unittest.WithProtocolStateID(protocolStateID)))
1132+
err = suite.protoState.ExtendCertified(context.Background(), unittest.NewCertifiedBlock(block))
11311133
suite.Require().NoError(err)
11321134
err = suite.protoState.Finalize(context.Background(), block.ID())
11331135
suite.Require().NoError(err)
1134-
head = block.Header
1136+
head = block.ToHeader()
11351137
}
11361138

11371139
rateLimiterCfg := updatable_configs.DefaultBySealingLagRateLimiterConfigs()
11381140

11391141
// rate-limiting should be applied, resulting in minimum collection size.
11401142
parentID := suite.genesis.ID()
1143+
setter := func(h *flow.HeaderBodyBuilder) error {
1144+
h.WithChainID(flow.Emulator).
1145+
WithParentID(parentID).
1146+
WithView(1337).
1147+
WithParentView(1336).
1148+
WithParentVoterIndices(unittest.SignerIndicesFixture(4)).
1149+
WithParentVoterSigData(unittest.QCSigDataFixture()).
1150+
WithProposerID(unittest.IdentifierFixture())
1151+
return nil
1152+
}
11411153
for i := 0; i < 10; i++ {
1142-
header, err := suite.builder.BuildOn(parentID, noopSetter, noopSigner)
1154+
header, err := suite.builder.BuildOn(parentID, setter, signer)
11431155
suite.Require().NoError(err)
1144-
parentID = header.ID()
1156+
parentID = header.Header.ID()
11451157

1146-
// each collection should be half-full with 5 transactions
1158+
// each collection should be equal to the minimum collection size
11471159
var built model.Block
1148-
err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built))
1160+
err = suite.db.View(procedure.RetrieveClusterBlock(header.Header.ID(), &built))
11491161
suite.Assert().NoError(err)
11501162
suite.Assert().Len(built.Payload.Collection.Transactions, int(rateLimiterCfg.MinCollectionSize()))
11511163
}
@@ -1382,11 +1394,11 @@ func (suite *BuilderSuite) TestBuildOn_SystemTxAlwaysIncluded() {
13821394

13831395
// rate-limiting should not be applied, since the payer is marked as unlimited
13841396
parentID := suite.genesis.ID()
1385-
header, err := suite.builder.BuildOn(parentID, noopSetter, noopSigner)
1397+
header, err := suite.builder.BuildOn(parentID, setter, signer)
13861398
suite.Require().NoError(err)
13871399

13881400
var built model.Block
1389-
err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built))
1401+
err = suite.db.View(procedure.RetrieveClusterBlock(header.Header.ID(), &built))
13901402
suite.Assert().NoError(err)
13911403
suite.Assert().Len(built.Payload.Collection.Transactions, 2)
13921404
for _, tx := range built.Payload.Collection.Transactions {

module/mempool/herocache/transactions.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type Transactions struct {
1616
byPayer map[flow.Address]map[flow.Identifier]struct{}
1717
}
1818

19+
var _ mempool.Transactions = (*Transactions)(nil)
20+
1921
// NewTransactions implements a transactions mempool based on hero cache.
2022
func NewTransactions(limit uint32, logger zerolog.Logger, collector module.HeroCacheMetrics) *Transactions {
2123
byPayer := make(map[flow.Address]map[flow.Identifier]struct{})
@@ -37,12 +39,11 @@ func NewTransactions(limit uint32, logger zerolog.Logger, collector module.HeroC
3739
}
3840

3941
// Add adds a transaction to the mempool.
40-
func (t *Transactions) Add(tx *flow.TransactionBody) bool {
42+
func (t *Transactions) Add(txID flow.Identifier, tx *flow.TransactionBody) bool {
4143
added := false
4244
err := t.Run(func(backdata mempool.BackData[flow.Identifier, *flow.TransactionBody]) error {
4345
// Warning! reference pointer must be dereferenced before adding to HeroCache.
4446
// This is crucial for its heap object optimizations.
45-
txID := tx.ID()
4647
added = backdata.Add(txID, tx)
4748
if !added {
4849
return nil
@@ -77,12 +78,11 @@ func (t *Transactions) Clear() {
7778
func (t *Transactions) Remove(id flow.Identifier) bool {
7879
removed := false
7980
err := t.Run(func(backdata mempool.BackData[flow.Identifier, *flow.TransactionBody]) error {
80-
var entity flow.Entity
81-
entity, removed = backdata.Remove(id)
81+
var txBody *flow.TransactionBody
82+
txBody, removed = backdata.Remove(id)
8283
if !removed {
8384
return nil
8485
}
85-
txBody := entity.(flow.TransactionBody)
8686
t.removeFromIndex(id, txBody.Payer)
8787
return nil
8888
})

module/mempool/mempool.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ type Mempool[K comparable, V any] interface {
2222
Adjust(key K, f func(V) V) (V, bool)
2323
// Size will return the size of the mempool.
2424
Size() uint
25+
// Values returns all stored values from the mempool.
26+
Values() []V
2527
// All returns all stored key-value pairs as a map from the mempool.
2628
All() map[K]V
2729
// Clear removes all key-value pairs from the mempool.

module/mempool/transactions.go

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,8 @@ import (
66

77
// Transactions represents a concurrency-safe memory pool for transactions.
88
type Transactions interface {
9-
10-
// Has checks whether the transaction with the given hash is currently in
11-
// the memory pool.
12-
Has(txID flow.Identifier) bool
13-
14-
// Add will add the given transaction body to the memory pool. It will
15-
// return false if it was already in the mempool.
16-
Add(txId flow.Identifier, tx *flow.TransactionBody) bool
17-
18-
// Remove will remove the given transaction from the memory pool; it will
19-
// return true if the transaction was known and removed.
20-
Remove(txID flow.Identifier) bool
21-
22-
// Get retrieve the transaction with the given ID from the memory
23-
// pool. It will return false if it was not found in the mempool.
24-
Get(txID flow.Identifier) (*flow.TransactionBody, bool)
25-
9+
Mempool[flow.Identifier, *flow.TransactionBody]
2610
// ByPayer retrieves all transactions from the memory pool that are sent
2711
// by the given payer.
2812
ByPayer(payer flow.Address) []*flow.TransactionBody
29-
30-
// Size will return the current size of the memory pool.
31-
Size() uint
32-
33-
// Values will retrieve all transactions that are currently in the memory pool
34-
// as a slice.
35-
// Values guarantees returning all transactions in the same order as they are added.
36-
Values() []*flow.TransactionBody
37-
38-
// Clear removes all transactions from the mempool.
39-
Clear()
4013
}

0 commit comments

Comments
 (0)