Skip to content

Commit 10ea9e0

Browse files
committed
[Access] Refactor the collection indexing and syncing
1 parent 36f2876 commit 10ea9e0

File tree

12 files changed

+319
-332
lines changed

12 files changed

+319
-332
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2215,6 +2215,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
22152215

22162216
collectionIndexer, err := collections.NewIndexer(
22172217
node.Logger,
2218+
builder.ProtocolDB,
22182219
notNil(builder.collectionExecutedMetric),
22192220
node.State,
22202221
node.Storage.Blocks,
@@ -2233,6 +2234,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
22332234
var executionDataSyncer *collections.ExecutionDataSyncer
22342235
if builder.executionDataSyncEnabled && !builder.executionDataIndexingEnabled {
22352236
executionDataSyncer = collections.NewExecutionDataSyncer(
2237+
node.Logger,
22362238
notNil(builder.ExecutionDataCache),
22372239
collectionIndexer,
22382240
)

cmd/observer/node_builder/observer_builder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,6 +1454,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
14541454
var collectionExecutedMetric module.CollectionExecutedMetric = metrics.NewNoopCollector()
14551455
collectionIndexer, err := collections.NewIndexer(
14561456
builder.Logger,
1457+
builder.ProtocolDB,
14571458
collectionExecutedMetric,
14581459
builder.State,
14591460
builder.Storage.Blocks,

engine/access/ingestion/collections/indexer.go

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,18 @@ type CollectionIndexer interface {
3434
// This method is non-blocking.
3535
OnCollectionReceived(collection *flow.Collection)
3636

37+
// IndexCollections indexes a set of collections, skipping any collections which already exist in storage.
38+
// Calling this method multiple times with the same collections is a no-op.
39+
//
40+
// No error returns are expected during normal operation.
41+
IndexCollections(collections []*flow.Collection) error
42+
3743
// MissingCollectionsAtHeight returns all collections that are not present in storage for a specific
3844
// finalized block height.
3945
//
4046
// Expected error returns during normal operation:
4147
// - [storage.ErrNotFound]: if provided block height is not finalized or below this node's root block
4248
MissingCollectionsAtHeight(height uint64) ([]*flow.CollectionGuarantee, error)
43-
44-
// IsCollectionInStorage checks whether the given collection is present in local storage.
45-
//
46-
// No error returns are expected during normal operation.
47-
IsCollectionInStorage(collectionID flow.Identifier) (bool, error)
4849
}
4950

5051
// Indexer stores and indexes collections received from the network. It is designed to be the central
@@ -56,10 +57,11 @@ type CollectionIndexer interface {
5657
// The indexer also maintains the last full block height state, which is the highest block height
5758
// for which all collections are stored and indexed.
5859
type Indexer struct {
59-
log zerolog.Logger
60-
metrics module.CollectionExecutedMetric
61-
lockManager lockctx.Manager
60+
log zerolog.Logger
61+
metrics module.CollectionExecutedMetric
6262

63+
db storage.DB
64+
lockManager lockctx.Manager
6365
state protocol.State
6466
blocks storage.Blocks
6567
collections storage.Collections
@@ -74,6 +76,7 @@ type Indexer struct {
7476
// No error returns are expected during normal operation.
7577
func NewIndexer(
7678
log zerolog.Logger,
79+
db storage.DB,
7780
metrics module.CollectionExecutedMetric,
7881
state protocol.State,
7982
blocks storage.Blocks,
@@ -91,6 +94,7 @@ func NewIndexer(
9194
return &Indexer{
9295
log: log.With().Str("component", "collection-indexer").Logger(),
9396
metrics: metrics,
97+
db: db,
9498
lockManager: lockManager,
9599
state: state,
96100
blocks: blocks,
@@ -136,7 +140,7 @@ func (ci *Indexer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component
136140
return
137141
}
138142

139-
if err := ci.indexCollection(collection); err != nil {
143+
if err := ci.IndexCollections([]*flow.Collection{collection}); err != nil {
140144
ctx.Throw(fmt.Errorf("error indexing collection: %w", err))
141145
return
142146
}
@@ -158,42 +162,48 @@ func (ci *Indexer) OnCollectionReceived(collection *flow.Collection) {
158162
ci.pendingCollectionsNotifier.Notify()
159163
}
160164

161-
// indexCollection indexes a collection and its transactions.
162-
// Skips indexing and returns without an error if the collection is already indexed.
165+
// IndexCollections indexes a set of collections, skipping any collections which already exist in storage.
166+
// Calling this method multiple times with the same collections is a no-op.
163167
//
164168
// No error returns are expected during normal operation.
165-
func (ci *Indexer) indexCollection(collection *flow.Collection) error {
169+
func (ci *Indexer) IndexCollections(collections []*flow.Collection) error {
166170
// skip indexing if collection is already indexed. on the common path, collections may be received
167171
// via multiple subsystems (e.g. execution data sync, collection sync, execution state indexer).
168172
// In this case, the indexer will be notified multiple times for the same collection. Only the
169173
// first notification should be processed.
170174
//
171175
// It's OK that this check is not done atomically with the index operation since the collections
172176
// storage module is solely responsible for enforcing consistency (even if this is a stale read).
173-
exists, err := ci.IsCollectionInStorage(collection.ID())
174-
if err != nil {
175-
return fmt.Errorf("failed to check if collection is in storage: %w", err)
176-
}
177-
if exists {
178-
return nil
177+
newCollections := make([]*flow.Collection, 0)
178+
for _, collection := range collections {
179+
exists, err := ci.isCollectionInStorage(collection.ID())
180+
if err != nil {
181+
return fmt.Errorf("failed to check if collection is in storage: %w", err)
182+
}
183+
if !exists {
184+
newCollections = append(newCollections, collection)
185+
}
179186
}
180187

181-
lctx := ci.lockManager.NewContext()
182-
defer lctx.Release()
183-
err = lctx.AcquireLock(storage.LockInsertCollection)
184-
if err != nil {
185-
return fmt.Errorf("could not acquire lock for indexing collections: %w", err)
188+
if len(newCollections) == 0 {
189+
return nil
186190
}
187191

188-
// store the collection, including constituent transactions, and index transactionID -> collectionID
189-
light, err := ci.collections.StoreAndIndexByTransaction(lctx, collection)
190-
if err != nil {
191-
return fmt.Errorf("failed to store collection: %w", err)
192-
}
192+
return storage.WithLock(ci.lockManager, storage.LockInsertCollection, func(lctx lockctx.Context) error {
193+
return ci.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
194+
for _, collection := range newCollections {
195+
// store the collection, including constituent transactions, and index transactionID -> collectionID
196+
light, err := ci.collections.BatchStoreAndIndexByTransaction(lctx, collection, rw)
197+
if err != nil {
198+
return fmt.Errorf("failed to store collection: %w", err)
199+
}
193200

194-
ci.metrics.CollectionFinalized(light)
195-
ci.metrics.CollectionExecuted(light)
196-
return nil
201+
ci.metrics.CollectionFinalized(light)
202+
ci.metrics.CollectionExecuted(light)
203+
}
204+
return nil
205+
})
206+
})
197207
}
198208

199209
// updateLastFullBlockHeight updates the LastFullBlockHeight index (if it has changed).
@@ -254,7 +264,7 @@ func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.Collection
254264

255265
var missingCollections []*flow.CollectionGuarantee
256266
for _, guarantee := range block.Payload.Guarantees {
257-
inStorage, err := ci.IsCollectionInStorage(guarantee.CollectionID)
267+
inStorage, err := ci.isCollectionInStorage(guarantee.CollectionID)
258268
if err != nil {
259269
return nil, err
260270
}
@@ -267,10 +277,10 @@ func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.Collection
267277
return missingCollections, nil
268278
}
269279

270-
// IsCollectionInStorage checks whether the given collection is present in local storage.
280+
// isCollectionInStorage checks whether the given collection is present in local storage.
271281
//
272282
// No error returns are expected during normal operation.
273-
func (ci *Indexer) IsCollectionInStorage(collectionID flow.Identifier) (bool, error) {
283+
func (ci *Indexer) isCollectionInStorage(collectionID flow.Identifier) (bool, error) {
274284
_, err := ci.collections.LightByID(collectionID)
275285
if err == nil {
276286
return true, nil

engine/access/ingestion/collections/indexer_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
type IndexerSuite struct {
3434
suite.Suite
3535

36+
db *storagemock.DB
3637
state *protocolmock.State
3738
blocks *storagemock.Blocks
3839
collections *storagemock.Collections
@@ -49,6 +50,7 @@ func TestIndexer(t *testing.T) {
4950
}
5051

5152
func (s *IndexerSuite) SetupTest() {
53+
s.db = storagemock.NewDB(s.T())
5254
s.state = protocolmock.NewState(s.T())
5355
s.blocks = storagemock.NewBlocks(s.T())
5456
s.collections = storagemock.NewCollections(s.T())
@@ -67,6 +69,7 @@ func (s *IndexerSuite) SetupTest() {
6769
func (s *IndexerSuite) createIndexer(t *testing.T) *Indexer {
6870
indexer, err := NewIndexer(
6971
unittest.Logger(),
72+
s.db,
7073
metrics.NewNoopCollector(),
7174
s.state,
7275
s.blocks,
@@ -183,7 +186,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() {
183186

184187
indexer := s.createIndexer(s.T())
185188

186-
inStorage, err := indexer.IsCollectionInStorage(collection.ID())
189+
inStorage, err := indexer.isCollectionInStorage(collection.ID())
187190
s.Require().NoError(err)
188191
s.Require().True(inStorage)
189192
})
@@ -193,7 +196,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() {
193196

194197
indexer := s.createIndexer(s.T())
195198

196-
inStorage, err := indexer.IsCollectionInStorage(collection.ID())
199+
inStorage, err := indexer.isCollectionInStorage(collection.ID())
197200
s.Require().NoError(err)
198201
s.Require().False(inStorage)
199202
})
@@ -204,7 +207,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() {
204207

205208
indexer := s.createIndexer(s.T())
206209

207-
inStorage, err := indexer.IsCollectionInStorage(collection.ID())
210+
inStorage, err := indexer.isCollectionInStorage(collection.ID())
208211
s.Require().ErrorIs(err, exception)
209212
s.Require().False(inStorage)
210213
})
@@ -268,7 +271,11 @@ func (s *IndexerSuite) TestOnCollectionReceived() {
268271

269272
synctest.Test(s.T(), func(t *testing.T) {
270273
s.collections.On("LightByID", collection.ID()).Return(nil, storage.ErrNotFound).Once()
271-
s.collections.On("StoreAndIndexByTransaction", mock.Anything, collection).Return(collection.Light(), nil).Once()
274+
s.collections.On("BatchStoreAndIndexByTransaction", mock.Anything, collection, mock.Anything).Return(collection.Light(), nil).Once()
275+
276+
s.db.On("WithReaderBatchWriter", mock.Anything).Return(func(fn func(storage.ReaderBatchWriter) error) error {
277+
return fn(nil)
278+
}).Once()
272279

273280
indexer := s.createIndexer(s.T())
274281

@@ -312,6 +319,7 @@ func (s *IndexerSuite) TestWorkerProcessing_ProcessesCollections() {
312319

313320
indexer, err := NewIndexer(
314321
unittest.Logger(),
322+
bc.db,
315323
metrics.NewNoopCollector(),
316324
bc.state,
317325
bc.all.Blocks,

engine/access/ingestion/collections/mock/collection_indexer.go

Lines changed: 9 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)