Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,6 +2215,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

collectionIndexer, err := collections.NewIndexer(
node.Logger,
builder.ProtocolDB,
notNil(builder.collectionExecutedMetric),
node.State,
node.Storage.Blocks,
Expand All @@ -2233,6 +2234,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var executionDataSyncer *collections.ExecutionDataSyncer
if builder.executionDataSyncEnabled && !builder.executionDataIndexingEnabled {
executionDataSyncer = collections.NewExecutionDataSyncer(
node.Logger,
notNil(builder.ExecutionDataCache),
collectionIndexer,
)
Expand Down
1 change: 1 addition & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
var collectionExecutedMetric module.CollectionExecutedMetric = metrics.NewNoopCollector()
collectionIndexer, err := collections.NewIndexer(
builder.Logger,
builder.ProtocolDB,
collectionExecutedMetric,
builder.State,
builder.Storage.Blocks,
Expand Down
78 changes: 44 additions & 34 deletions engine/access/ingestion/collections/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ type CollectionIndexer interface {
// This method is non-blocking.
OnCollectionReceived(collection *flow.Collection)

// IndexCollections indexes a set of collections, skipping any collections which already exist in storage.
// Calling this method multiple times with the same collections is a no-op.
//
// No error returns are expected during normal operation.
IndexCollections(collections []*flow.Collection) error

// MissingCollectionsAtHeight returns all collections that are not present in storage for a specific
// finalized block height.
//
// Expected error returns during normal operation:
// - [storage.ErrNotFound]: if provided block height is not finalized or below this node's root block
MissingCollectionsAtHeight(height uint64) ([]*flow.CollectionGuarantee, error)

// IsCollectionInStorage checks whether the given collection is present in local storage.
//
// No error returns are expected during normal operation.
IsCollectionInStorage(collectionID flow.Identifier) (bool, error)
}

// Indexer stores and indexes collections received from the network. It is designed to be the central
Expand All @@ -56,10 +57,11 @@ type CollectionIndexer interface {
// The indexer also maintains the last full block height state, which is the highest block height
// for which all collections are stored and indexed.
type Indexer struct {
log zerolog.Logger
metrics module.CollectionExecutedMetric
lockManager lockctx.Manager
log zerolog.Logger
metrics module.CollectionExecutedMetric

db storage.DB
lockManager lockctx.Manager
state protocol.State
blocks storage.Blocks
collections storage.Collections
Expand All @@ -74,6 +76,7 @@ type Indexer struct {
// No error returns are expected during normal operation.
func NewIndexer(
log zerolog.Logger,
db storage.DB,
metrics module.CollectionExecutedMetric,
state protocol.State,
blocks storage.Blocks,
Expand All @@ -91,6 +94,7 @@ func NewIndexer(
return &Indexer{
log: log.With().Str("component", "collection-indexer").Logger(),
metrics: metrics,
db: db,
lockManager: lockManager,
state: state,
blocks: blocks,
Expand Down Expand Up @@ -136,7 +140,7 @@ func (ci *Indexer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component
return
}

if err := ci.indexCollection(collection); err != nil {
if err := ci.IndexCollections([]*flow.Collection{collection}); err != nil {
ctx.Throw(fmt.Errorf("error indexing collection: %w", err))
return
}
Expand All @@ -158,42 +162,48 @@ func (ci *Indexer) OnCollectionReceived(collection *flow.Collection) {
ci.pendingCollectionsNotifier.Notify()
}

// indexCollection indexes a collection and its transactions.
// Skips indexing and returns without an error if the collection is already indexed.
// IndexCollections indexes a set of collections, skipping any collections which already exist in storage.
// Calling this method multiple times with the same collections is a no-op.
//
// No error returns are expected during normal operation.
func (ci *Indexer) indexCollection(collection *flow.Collection) error {
func (ci *Indexer) IndexCollections(collections []*flow.Collection) error {
// skip indexing if collection is already indexed. on the common path, collections may be received
// via multiple subsystems (e.g. execution data sync, collection sync, execution state indexer).
// In this case, the indexer will be notified multiple times for the same collection. Only the
// first notification should be processed.
//
// It's OK that this check is not done atomically with the index operation since the collections
// storage module is solely responsible for enforcing consistency (even if this is a stale read).
exists, err := ci.IsCollectionInStorage(collection.ID())
if err != nil {
return fmt.Errorf("failed to check if collection is in storage: %w", err)
}
if exists {
return nil
newCollections := make([]*flow.Collection, 0)
for _, collection := range collections {
exists, err := ci.isCollectionInStorage(collection.ID())
if err != nil {
return fmt.Errorf("failed to check if collection is in storage: %w", err)
}
if !exists {
newCollections = append(newCollections, collection)
}
}

lctx := ci.lockManager.NewContext()
defer lctx.Release()
err = lctx.AcquireLock(storage.LockInsertCollection)
if err != nil {
return fmt.Errorf("could not acquire lock for indexing collections: %w", err)
if len(newCollections) == 0 {
return nil
}

// store the collection, including constituent transactions, and index transactionID -> collectionID
light, err := ci.collections.StoreAndIndexByTransaction(lctx, collection)
if err != nil {
return fmt.Errorf("failed to store collection: %w", err)
}
return storage.WithLock(ci.lockManager, storage.LockInsertCollection, func(lctx lockctx.Context) error {
return ci.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
for _, collection := range newCollections {
// store the collection, including constituent transactions, and index transactionID -> collectionID
light, err := ci.collections.BatchStoreAndIndexByTransaction(lctx, collection, rw)
if err != nil {
return fmt.Errorf("failed to store collection: %w", err)
}

ci.metrics.CollectionFinalized(light)
ci.metrics.CollectionExecuted(light)
return nil
ci.metrics.CollectionFinalized(light)
ci.metrics.CollectionExecuted(light)
}
return nil
})
})
}

// updateLastFullBlockHeight updates the LastFullBlockHeight index (if it has changed).
Expand Down Expand Up @@ -254,7 +264,7 @@ func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.Collection

var missingCollections []*flow.CollectionGuarantee
for _, guarantee := range block.Payload.Guarantees {
inStorage, err := ci.IsCollectionInStorage(guarantee.CollectionID)
inStorage, err := ci.isCollectionInStorage(guarantee.CollectionID)
if err != nil {
return nil, err
}
Expand All @@ -267,10 +277,10 @@ func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.Collection
return missingCollections, nil
}

// IsCollectionInStorage checks whether the given collection is present in local storage.
// isCollectionInStorage checks whether the given collection is present in local storage.
//
// No error returns are expected during normal operation.
func (ci *Indexer) IsCollectionInStorage(collectionID flow.Identifier) (bool, error) {
func (ci *Indexer) isCollectionInStorage(collectionID flow.Identifier) (bool, error) {
_, err := ci.collections.LightByID(collectionID)
if err == nil {
return true, nil
Expand Down
16 changes: 12 additions & 4 deletions engine/access/ingestion/collections/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
type IndexerSuite struct {
suite.Suite

db *storagemock.DB
state *protocolmock.State
blocks *storagemock.Blocks
collections *storagemock.Collections
Expand All @@ -49,6 +50,7 @@ func TestIndexer(t *testing.T) {
}

func (s *IndexerSuite) SetupTest() {
s.db = storagemock.NewDB(s.T())
s.state = protocolmock.NewState(s.T())
s.blocks = storagemock.NewBlocks(s.T())
s.collections = storagemock.NewCollections(s.T())
Expand All @@ -67,6 +69,7 @@ func (s *IndexerSuite) SetupTest() {
func (s *IndexerSuite) createIndexer(t *testing.T) *Indexer {
indexer, err := NewIndexer(
unittest.Logger(),
s.db,
metrics.NewNoopCollector(),
s.state,
s.blocks,
Expand Down Expand Up @@ -183,7 +186,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() {

indexer := s.createIndexer(s.T())

inStorage, err := indexer.IsCollectionInStorage(collection.ID())
inStorage, err := indexer.isCollectionInStorage(collection.ID())
s.Require().NoError(err)
s.Require().True(inStorage)
})
Expand All @@ -193,7 +196,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() {

indexer := s.createIndexer(s.T())

inStorage, err := indexer.IsCollectionInStorage(collection.ID())
inStorage, err := indexer.isCollectionInStorage(collection.ID())
s.Require().NoError(err)
s.Require().False(inStorage)
})
Expand All @@ -204,7 +207,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() {

indexer := s.createIndexer(s.T())

inStorage, err := indexer.IsCollectionInStorage(collection.ID())
inStorage, err := indexer.isCollectionInStorage(collection.ID())
s.Require().ErrorIs(err, exception)
s.Require().False(inStorage)
})
Expand Down Expand Up @@ -268,7 +271,11 @@ func (s *IndexerSuite) TestOnCollectionReceived() {

synctest.Test(s.T(), func(t *testing.T) {
s.collections.On("LightByID", collection.ID()).Return(nil, storage.ErrNotFound).Once()
s.collections.On("StoreAndIndexByTransaction", mock.Anything, collection).Return(collection.Light(), nil).Once()
s.collections.On("BatchStoreAndIndexByTransaction", mock.Anything, collection, mock.Anything).Return(collection.Light(), nil).Once()

s.db.On("WithReaderBatchWriter", mock.Anything).Return(func(fn func(storage.ReaderBatchWriter) error) error {
return fn(nil)
}).Once()

indexer := s.createIndexer(s.T())

Expand Down Expand Up @@ -312,6 +319,7 @@ func (s *IndexerSuite) TestWorkerProcessing_ProcessesCollections() {

indexer, err := NewIndexer(
unittest.Logger(),
bc.db,
metrics.NewNoopCollector(),
bc.state,
bc.all.Blocks,
Expand Down
28 changes: 9 additions & 19 deletions engine/access/ingestion/collections/mock/collection_indexer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading