Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,6 +2215,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

collectionIndexer, err := collections.NewIndexer(
node.Logger,
builder.ProtocolDB,
notNil(builder.collectionExecutedMetric),
node.State,
node.Storage.Blocks,
Expand All @@ -2233,6 +2234,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var executionDataSyncer *collections.ExecutionDataSyncer
if builder.executionDataSyncEnabled && !builder.executionDataIndexingEnabled {
executionDataSyncer = collections.NewExecutionDataSyncer(
node.Logger,
notNil(builder.ExecutionDataCache),
collectionIndexer,
)
Expand Down
1 change: 1 addition & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
var collectionExecutedMetric module.CollectionExecutedMetric = metrics.NewNoopCollector()
collectionIndexer, err := collections.NewIndexer(
builder.Logger,
builder.ProtocolDB,
collectionExecutedMetric,
builder.State,
builder.Storage.Blocks,
Expand Down
7 changes: 5 additions & 2 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ func (suite *Suite) TestGetSealedTransaction() {

collectionIndexer, err := ingestioncollections.NewIndexer(
suite.log,
db,
collectionExecutedMetric,
suite.state,
all.Blocks,
Expand Down Expand Up @@ -816,8 +817,8 @@ func (suite *Suite) TestGetSealedTransaction() {

// block until the collection is processed by the indexer
require.Eventually(suite.T(), func() bool {
isStored, err := collectionIndexer.IsCollectionInStorage(collection.ID())
return isStored && err == nil
_, err := collections.LightByID(collection.ID())
return err == nil
}, 1*time.Second, 10*time.Millisecond, "collection not indexed")

// 5. Client requests a transaction
Expand Down Expand Up @@ -984,6 +985,7 @@ func (suite *Suite) TestGetTransactionResult() {

collectionIndexer, err := ingestioncollections.NewIndexer(
suite.log,
db,
collectionExecutedMetric,
suite.state,
all.Blocks,
Expand Down Expand Up @@ -1259,6 +1261,7 @@ func (suite *Suite) TestExecuteScript() {

collectionIndexer, err := ingestioncollections.NewIndexer(
suite.log,
db,
collectionExecutedMetric,
suite.state,
all.Blocks,
Expand Down
78 changes: 44 additions & 34 deletions engine/access/ingestion/collections/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ type CollectionIndexer interface {
// This method is non-blocking.
OnCollectionReceived(collection *flow.Collection)

// IndexCollections indexes a set of collections, skipping any collections which already exist in storage.
// Calling this method multiple times with the same collections is a no-op.
//
// No error returns are expected during normal operation.
IndexCollections(collections []*flow.Collection) error

// MissingCollectionsAtHeight returns all collections that are not present in storage for a specific
// finalized block height.
//
// Expected error returns during normal operation:
// - [storage.ErrNotFound]: if provided block height is not finalized or below this node's root block
MissingCollectionsAtHeight(height uint64) ([]*flow.CollectionGuarantee, error)

// IsCollectionInStorage checks whether the given collection is present in local storage.
//
// No error returns are expected during normal operation.
IsCollectionInStorage(collectionID flow.Identifier) (bool, error)
}

// Indexer stores and indexes collections received from the network. It is designed to be the central
Expand All @@ -56,10 +57,11 @@ type CollectionIndexer interface {
// The indexer also maintains the last full block height state, which is the highest block height
// for which all collections are stored and indexed.
type Indexer struct {
log zerolog.Logger
metrics module.CollectionExecutedMetric
lockManager lockctx.Manager
log zerolog.Logger
metrics module.CollectionExecutedMetric

db storage.DB
lockManager lockctx.Manager
state protocol.State
blocks storage.Blocks
collections storage.Collections
Expand All @@ -74,6 +76,7 @@ type Indexer struct {
// No error returns are expected during normal operation.
func NewIndexer(
log zerolog.Logger,
db storage.DB,
metrics module.CollectionExecutedMetric,
state protocol.State,
blocks storage.Blocks,
Expand All @@ -91,6 +94,7 @@ func NewIndexer(
return &Indexer{
log: log.With().Str("component", "collection-indexer").Logger(),
metrics: metrics,
db: db,
lockManager: lockManager,
state: state,
blocks: blocks,
Expand Down Expand Up @@ -136,7 +140,7 @@ func (ci *Indexer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component
return
}

if err := ci.indexCollection(collection); err != nil {
if err := ci.IndexCollections([]*flow.Collection{collection}); err != nil {
ctx.Throw(fmt.Errorf("error indexing collection: %w", err))
return
}
Expand All @@ -158,42 +162,48 @@ func (ci *Indexer) OnCollectionReceived(collection *flow.Collection) {
ci.pendingCollectionsNotifier.Notify()
}

// indexCollection indexes a collection and its transactions.
// Skips indexing and returns without an error if the collection is already indexed.
// IndexCollections indexes a set of collections, skipping any collections which already exist in storage.
// Calling this method multiple times with the same collections is a no-op.
//
// No error returns are expected during normal operation.
func (ci *Indexer) indexCollection(collection *flow.Collection) error {
func (ci *Indexer) IndexCollections(collections []*flow.Collection) error {
// skip indexing if collection is already indexed. on the common path, collections may be received
// via multiple subsystems (e.g. execution data sync, collection sync, execution state indexer).
// In this case, the indexer will be notified multiple times for the same collection. Only the
// first notification should be processed.
//
// It's OK that this check is not done atomically with the index operation since the collections
// storage module is solely responsible for enforcing consistency (even if this is a stale read).
exists, err := ci.IsCollectionInStorage(collection.ID())
if err != nil {
return fmt.Errorf("failed to check if collection is in storage: %w", err)
}
if exists {
return nil
newCollections := make([]*flow.Collection, 0)
for _, collection := range collections {
exists, err := ci.isCollectionInStorage(collection.ID())
if err != nil {
return fmt.Errorf("failed to check if collection is in storage: %w", err)
}
if !exists {
newCollections = append(newCollections, collection)
}
}

lctx := ci.lockManager.NewContext()
defer lctx.Release()
err = lctx.AcquireLock(storage.LockInsertCollection)
if err != nil {
return fmt.Errorf("could not acquire lock for indexing collections: %w", err)
if len(newCollections) == 0 {
return nil
}

// store the collection, including constituent transactions, and index transactionID -> collectionID
light, err := ci.collections.StoreAndIndexByTransaction(lctx, collection)
if err != nil {
return fmt.Errorf("failed to store collection: %w", err)
}
return storage.WithLock(ci.lockManager, storage.LockInsertCollection, func(lctx lockctx.Context) error {
return ci.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
for _, collection := range newCollections {
// store the collection, including constituent transactions, and index transactionID -> collectionID
light, err := ci.collections.BatchStoreAndIndexByTransaction(lctx, collection, rw)
if err != nil {
return fmt.Errorf("failed to store collection: %w", err)
}

ci.metrics.CollectionFinalized(light)
ci.metrics.CollectionExecuted(light)
return nil
ci.metrics.CollectionFinalized(light)
ci.metrics.CollectionExecuted(light)
}
return nil
})
})
}

// updateLastFullBlockHeight updates the LastFullBlockHeight index (if it has changed).
Expand Down Expand Up @@ -254,7 +264,7 @@ func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.Collection

var missingCollections []*flow.CollectionGuarantee
for _, guarantee := range block.Payload.Guarantees {
inStorage, err := ci.IsCollectionInStorage(guarantee.CollectionID)
inStorage, err := ci.isCollectionInStorage(guarantee.CollectionID)
if err != nil {
return nil, err
}
Expand All @@ -267,10 +277,10 @@ func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.Collection
return missingCollections, nil
}

// IsCollectionInStorage checks whether the given collection is present in local storage.
// isCollectionInStorage checks whether the given collection is present in local storage.
//
// No error returns are expected during normal operation.
func (ci *Indexer) IsCollectionInStorage(collectionID flow.Identifier) (bool, error) {
func (ci *Indexer) isCollectionInStorage(collectionID flow.Identifier) (bool, error) {
_, err := ci.collections.LightByID(collectionID)
if err == nil {
return true, nil
Expand Down
16 changes: 12 additions & 4 deletions engine/access/ingestion/collections/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
type IndexerSuite struct {
suite.Suite

db *storagemock.DB
state *protocolmock.State
blocks *storagemock.Blocks
collections *storagemock.Collections
Expand All @@ -49,6 +50,7 @@ func TestIndexer(t *testing.T) {
}

func (s *IndexerSuite) SetupTest() {
s.db = storagemock.NewDB(s.T())
s.state = protocolmock.NewState(s.T())
s.blocks = storagemock.NewBlocks(s.T())
s.collections = storagemock.NewCollections(s.T())
Expand All @@ -67,6 +69,7 @@ func (s *IndexerSuite) SetupTest() {
func (s *IndexerSuite) createIndexer(t *testing.T) *Indexer {
indexer, err := NewIndexer(
unittest.Logger(),
s.db,
metrics.NewNoopCollector(),
s.state,
s.blocks,
Expand Down Expand Up @@ -183,7 +186,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() {

indexer := s.createIndexer(s.T())

inStorage, err := indexer.IsCollectionInStorage(collection.ID())
inStorage, err := indexer.isCollectionInStorage(collection.ID())
s.Require().NoError(err)
s.Require().True(inStorage)
})
Expand All @@ -193,7 +196,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() {

indexer := s.createIndexer(s.T())

inStorage, err := indexer.IsCollectionInStorage(collection.ID())
inStorage, err := indexer.isCollectionInStorage(collection.ID())
s.Require().NoError(err)
s.Require().False(inStorage)
})
Expand All @@ -204,7 +207,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() {

indexer := s.createIndexer(s.T())

inStorage, err := indexer.IsCollectionInStorage(collection.ID())
inStorage, err := indexer.isCollectionInStorage(collection.ID())
s.Require().ErrorIs(err, exception)
s.Require().False(inStorage)
})
Expand Down Expand Up @@ -268,7 +271,11 @@ func (s *IndexerSuite) TestOnCollectionReceived() {

synctest.Test(s.T(), func(t *testing.T) {
s.collections.On("LightByID", collection.ID()).Return(nil, storage.ErrNotFound).Once()
s.collections.On("StoreAndIndexByTransaction", mock.Anything, collection).Return(collection.Light(), nil).Once()
s.collections.On("BatchStoreAndIndexByTransaction", mock.Anything, collection, mock.Anything).Return(collection.Light(), nil).Once()

s.db.On("WithReaderBatchWriter", mock.Anything).Return(func(fn func(storage.ReaderBatchWriter) error) error {
return fn(nil)
}).Once()

indexer := s.createIndexer(s.T())

Expand Down Expand Up @@ -312,6 +319,7 @@ func (s *IndexerSuite) TestWorkerProcessing_ProcessesCollections() {

indexer, err := NewIndexer(
unittest.Logger(),
bc.db,
metrics.NewNoopCollector(),
bc.state,
bc.all.Blocks,
Expand Down
28 changes: 9 additions & 19 deletions engine/access/ingestion/collections/mock/collection_indexer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading