Skip to content

Commit 5e3c7a7

Browse files
committed
fix concurrent access in tx streaming tests
1 parent a559b7e commit 5e3c7a7

File tree

1 file changed

+48
-41
lines changed

1 file changed

+48
-41
lines changed

engine/access/rpc/backend/transactions/stream/stream_backend_test.go

Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package stream
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"os"
87
"testing"
@@ -51,6 +50,7 @@ import (
5150
storagemock "github.com/onflow/flow-go/storage/mock"
5251
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
5352
"github.com/onflow/flow-go/storage/store"
53+
"github.com/onflow/flow-go/utils/concurrentmap"
5454
"github.com/onflow/flow-go/utils/unittest"
5555
"github.com/onflow/flow-go/utils/unittest/mocks"
5656
)
@@ -95,7 +95,7 @@ type TransactionStreamSuite struct {
9595
sealedBlock *flow.Block
9696
finalizedBlock *flow.Block
9797

98-
blockMap map[uint64]*flow.Block
98+
blockMap *concurrentmap.Map[uint64, *flow.Block]
9999

100100
txStreamBackend *TransactionStream
101101

@@ -115,7 +115,7 @@ func TestTransactionStatusSuite(t *testing.T) {
115115

116116
// SetupTest initializes the test dependencies, configurations, and mock objects for TransactionStreamSuite tests.
117117
func (s *TransactionStreamSuite) SetupTest() {
118-
s.log = zerolog.New(zerolog.NewConsoleWriter())
118+
s.log = unittest.Logger()
119119
s.state = protocol.NewState(s.T())
120120
s.sealedSnapshot = protocol.NewSnapshot(s.T())
121121
s.finalSnapshot = protocol.NewSnapshot(s.T())
@@ -207,10 +207,9 @@ func (s *TransactionStreamSuite) initializeBackend() {
207207

208208
s.sealedBlock = s.rootBlock
209209
s.finalizedBlock = unittest.BlockWithParentFixture(s.sealedBlock.ToHeader())
210-
s.blockMap = map[uint64]*flow.Block{
211-
s.sealedBlock.Height: s.sealedBlock,
212-
s.finalizedBlock.Height: s.finalizedBlock,
213-
}
210+
s.blockMap = concurrentmap.New[uint64, *flow.Block]()
211+
s.blockMap.Add(s.sealedBlock.Height, s.sealedBlock)
212+
s.blockMap.Add(s.finalizedBlock.Height, s.finalizedBlock)
214213

215214
txStatusDeriver := txstatus.NewTxStatusDeriver(
216215
s.state,
@@ -344,45 +343,53 @@ func (s *TransactionStreamSuite) initializeBackend() {
344343
)
345344
}
346345

346+
func blockByID(blockMap *concurrentmap.Map[uint64, *flow.Block]) func(flow.Identifier) (*flow.Block, error) {
347+
return func(blockID flow.Identifier) (*flow.Block, error) {
348+
var block *flow.Block
349+
blockMap.ForEach(func(height uint64, b *flow.Block) error {
350+
if b.ID() == blockID {
351+
block = b
352+
}
353+
return nil
354+
})
355+
if block == nil {
356+
return nil, storage.ErrNotFound
357+
}
358+
return block, nil
359+
}
360+
}
361+
362+
func blockByHeight(blockMap *concurrentmap.Map[uint64, *flow.Block]) func(uint64) (*flow.Block, error) {
363+
return func(height uint64) (*flow.Block, error) {
364+
if block, ok := blockMap.Get(height); ok {
365+
return block, nil
366+
}
367+
return nil, storage.ErrNotFound
368+
}
369+
}
370+
347371
// initializeMainMockInstructions sets up the main mock behaviors for components used in TransactionStreamSuite tests.
348372
func (s *TransactionStreamSuite) initializeMainMockInstructions() {
349373
s.transactions.On("Store", mock.Anything).Return(nil).Maybe()
350374

351-
s.blocks.On("ByHeight", mock.AnythingOfType("uint64")).Return(mocks.StorageMapGetter(s.blockMap)).Maybe()
352-
s.blocks.On("ByID", mock.Anything).Return(
353-
func(blockID flow.Identifier) *flow.Block {
354-
for _, block := range s.blockMap {
355-
if block.ID() == blockID {
356-
return block
357-
}
358-
}
359-
return nil
360-
},
361-
func(blockID flow.Identifier) error {
362-
for _, block := range s.blockMap {
363-
if block.ID() == blockID {
364-
return nil
365-
}
366-
}
367-
return errors.New("block not found")
368-
},
369-
).Maybe()
375+
s.blocks.On("ByHeight", mock.AnythingOfType("uint64")).Return(blockByHeight(s.blockMap)).Maybe()
376+
s.blocks.On("ByID", mock.Anything).Return(blockByID(s.blockMap)).Maybe()
370377

371378
s.state.On("Final").Return(s.finalSnapshot, nil).Maybe()
372-
s.state.On("AtBlockID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) protocolint.Snapshot {
373-
s.tempSnapshot.On("Head").Unset()
374-
s.tempSnapshot.On("Head").Return(func() *flow.Header {
375-
for _, block := range s.blockMap {
376-
if block.ID() == blockID {
377-
return block.ToHeader()
378-
}
379-
}
380-
381-
return nil
382-
}, nil)
383-
384-
return s.tempSnapshot
385-
}, nil).Maybe()
379+
s.state.On("AtBlockID", mock.AnythingOfType("flow.Identifier")).Return(
380+
func(blockID flow.Identifier) protocolint.Snapshot {
381+
s.tempSnapshot.On("Head").Unset()
382+
s.tempSnapshot.On("Head").Return(
383+
func() (*flow.Header, error) {
384+
block, err := blockByID(s.blockMap)(blockID)
385+
if err != nil {
386+
return nil, err
387+
}
388+
return block.ToHeader(), nil
389+
}, nil)
390+
391+
return s.tempSnapshot
392+
}, nil).Maybe()
386393

387394
s.finalSnapshot.On("Head").Return(func() *flow.Header {
388395
return s.finalizedBlock.ToHeader()
@@ -439,7 +446,7 @@ func (s *TransactionStreamSuite) addNewFinalizedBlock(parent *flow.Header, notif
439446
option(s.finalizedBlock)
440447
}
441448

442-
s.blockMap[s.finalizedBlock.Height] = s.finalizedBlock
449+
s.blockMap.Add(s.finalizedBlock.Height, s.finalizedBlock)
443450

444451
if notify {
445452
s.broadcaster.Publish()

0 commit comments

Comments
 (0)