diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index fabc67bd7f4..6a79fd918bb 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -2215,6 +2215,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { collectionIndexer, err := collections.NewIndexer( node.Logger, + builder.ProtocolDB, notNil(builder.collectionExecutedMetric), node.State, node.Storage.Blocks, @@ -2233,6 +2234,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { var executionDataSyncer *collections.ExecutionDataSyncer if builder.executionDataSyncEnabled && !builder.executionDataIndexingEnabled { executionDataSyncer = collections.NewExecutionDataSyncer( + node.Logger, notNil(builder.ExecutionDataCache), collectionIndexer, ) diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 4b7965b16ba..7e4ae5ce6ed 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -1454,6 +1454,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS var collectionExecutedMetric module.CollectionExecutedMetric = metrics.NewNoopCollector() collectionIndexer, err := collections.NewIndexer( builder.Logger, + builder.ProtocolDB, collectionExecutedMetric, builder.State, builder.Storage.Blocks, diff --git a/engine/access/access_test.go b/engine/access/access_test.go index 15beb15953e..21fcf1107ff 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -726,6 +726,7 @@ func (suite *Suite) TestGetSealedTransaction() { collectionIndexer, err := ingestioncollections.NewIndexer( suite.log, + db, collectionExecutedMetric, suite.state, all.Blocks, @@ -816,8 +817,8 @@ func (suite *Suite) TestGetSealedTransaction() { // block until the collection is processed by the indexer require.Eventually(suite.T(), func() bool { - isStored, err := collectionIndexer.IsCollectionInStorage(collection.ID()) - return isStored && err == nil + _, err := collections.LightByID(collection.ID()) + return err == nil }, 1*time.Second, 10*time.Millisecond, "collection not indexed") // 5. Client requests a transaction @@ -984,6 +985,7 @@ func (suite *Suite) TestGetTransactionResult() { collectionIndexer, err := ingestioncollections.NewIndexer( suite.log, + db, collectionExecutedMetric, suite.state, all.Blocks, @@ -1259,6 +1261,7 @@ func (suite *Suite) TestExecuteScript() { collectionIndexer, err := ingestioncollections.NewIndexer( suite.log, + db, collectionExecutedMetric, suite.state, all.Blocks, diff --git a/engine/access/ingestion/collections/indexer.go b/engine/access/ingestion/collections/indexer.go index d2c5151930b..3fc6969a586 100644 --- a/engine/access/ingestion/collections/indexer.go +++ b/engine/access/ingestion/collections/indexer.go @@ -34,17 +34,18 @@ type CollectionIndexer interface { // This method is non-blocking. OnCollectionReceived(collection *flow.Collection) + // IndexCollections indexes a set of collections, skipping any collections which already exist in storage. + // Calling this method multiple times with the same collections is a no-op. + // + // No error returns are expected during normal operation. + IndexCollections(collections []*flow.Collection) error + // MissingCollectionsAtHeight returns all collections that are not present in storage for a specific // finalized block height. // // Expected error returns during normal operation: // - [storage.ErrNotFound]: if provided block height is not finalized or below this node's root block MissingCollectionsAtHeight(height uint64) ([]*flow.CollectionGuarantee, error) - - // IsCollectionInStorage checks whether the given collection is present in local storage. - // - // No error returns are expected during normal operation. - IsCollectionInStorage(collectionID flow.Identifier) (bool, error) } // Indexer stores and indexes collections received from the network. It is designed to be the central @@ -56,10 +57,11 @@ type CollectionIndexer interface { // The indexer also maintains the last full block height state, which is the highest block height // for which all collections are stored and indexed. type Indexer struct { - log zerolog.Logger - metrics module.CollectionExecutedMetric - lockManager lockctx.Manager + log zerolog.Logger + metrics module.CollectionExecutedMetric + db storage.DB + lockManager lockctx.Manager state protocol.State blocks storage.Blocks collections storage.Collections @@ -74,6 +76,7 @@ type Indexer struct { // No error returns are expected during normal operation. func NewIndexer( log zerolog.Logger, + db storage.DB, metrics module.CollectionExecutedMetric, state protocol.State, blocks storage.Blocks, @@ -91,6 +94,7 @@ func NewIndexer( return &Indexer{ log: log.With().Str("component", "collection-indexer").Logger(), metrics: metrics, + db: db, lockManager: lockManager, state: state, blocks: blocks, @@ -136,7 +140,7 @@ func (ci *Indexer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component return } - if err := ci.indexCollection(collection); err != nil { + if err := ci.IndexCollections([]*flow.Collection{collection}); err != nil { ctx.Throw(fmt.Errorf("error indexing collection: %w", err)) return } @@ -158,11 +162,11 @@ func (ci *Indexer) OnCollectionReceived(collection *flow.Collection) { ci.pendingCollectionsNotifier.Notify() } -// indexCollection indexes a collection and its transactions. -// Skips indexing and returns without an error if the collection is already indexed. +// IndexCollections indexes a set of collections, skipping any collections which already exist in storage. +// Calling this method multiple times with the same collections is a no-op. // // No error returns are expected during normal operation. -func (ci *Indexer) indexCollection(collection *flow.Collection) error { +func (ci *Indexer) IndexCollections(collections []*flow.Collection) error { // skip indexing if collection is already indexed. on the common path, collections may be received // via multiple subsystems (e.g. execution data sync, collection sync, execution state indexer). // In this case, the indexer will be notified multiple times for the same collection. Only the @@ -170,30 +174,36 @@ func (ci *Indexer) indexCollection(collection *flow.Collection) error { // // It's OK that this check is not done atomically with the index operation since the collections // storage module is solely responsible for enforcing consistency (even if this is a stale read). - exists, err := ci.IsCollectionInStorage(collection.ID()) - if err != nil { - return fmt.Errorf("failed to check if collection is in storage: %w", err) - } - if exists { - return nil + newCollections := make([]*flow.Collection, 0) + for _, collection := range collections { + exists, err := ci.isCollectionInStorage(collection.ID()) + if err != nil { + return fmt.Errorf("failed to check if collection is in storage: %w", err) + } + if !exists { + newCollections = append(newCollections, collection) + } } - lctx := ci.lockManager.NewContext() - defer lctx.Release() - err = lctx.AcquireLock(storage.LockInsertCollection) - if err != nil { - return fmt.Errorf("could not acquire lock for indexing collections: %w", err) + if len(newCollections) == 0 { + return nil } - // store the collection, including constituent transactions, and index transactionID -> collectionID - light, err := ci.collections.StoreAndIndexByTransaction(lctx, collection) - if err != nil { - return fmt.Errorf("failed to store collection: %w", err) - } + return storage.WithLock(ci.lockManager, storage.LockInsertCollection, func(lctx lockctx.Context) error { + return ci.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + for _, collection := range newCollections { + // store the collection, including constituent transactions, and index transactionID -> collectionID + light, err := ci.collections.BatchStoreAndIndexByTransaction(lctx, collection, rw) + if err != nil { + return fmt.Errorf("failed to store collection: %w", err) + } - ci.metrics.CollectionFinalized(light) - ci.metrics.CollectionExecuted(light) - return nil + ci.metrics.CollectionFinalized(light) + ci.metrics.CollectionExecuted(light) + } + return nil + }) + }) } // updateLastFullBlockHeight updates the LastFullBlockHeight index (if it has changed). @@ -254,7 +264,7 @@ func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.Collection var missingCollections []*flow.CollectionGuarantee for _, guarantee := range block.Payload.Guarantees { - inStorage, err := ci.IsCollectionInStorage(guarantee.CollectionID) + inStorage, err := ci.isCollectionInStorage(guarantee.CollectionID) if err != nil { return nil, err } @@ -267,10 +277,10 @@ func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.Collection return missingCollections, nil } -// IsCollectionInStorage checks whether the given collection is present in local storage. +// isCollectionInStorage checks whether the given collection is present in local storage. // // No error returns are expected during normal operation. -func (ci *Indexer) IsCollectionInStorage(collectionID flow.Identifier) (bool, error) { +func (ci *Indexer) isCollectionInStorage(collectionID flow.Identifier) (bool, error) { _, err := ci.collections.LightByID(collectionID) if err == nil { return true, nil diff --git a/engine/access/ingestion/collections/indexer_test.go b/engine/access/ingestion/collections/indexer_test.go index d882bdb1fb5..294ac3c034a 100644 --- a/engine/access/ingestion/collections/indexer_test.go +++ b/engine/access/ingestion/collections/indexer_test.go @@ -33,6 +33,7 @@ import ( type IndexerSuite struct { suite.Suite + db *storagemock.DB state *protocolmock.State blocks *storagemock.Blocks collections *storagemock.Collections @@ -49,6 +50,7 @@ func TestIndexer(t *testing.T) { } func (s *IndexerSuite) SetupTest() { + s.db = storagemock.NewDB(s.T()) s.state = protocolmock.NewState(s.T()) s.blocks = storagemock.NewBlocks(s.T()) s.collections = storagemock.NewCollections(s.T()) @@ -67,6 +69,7 @@ func (s *IndexerSuite) SetupTest() { func (s *IndexerSuite) createIndexer(t *testing.T) *Indexer { indexer, err := NewIndexer( unittest.Logger(), + s.db, metrics.NewNoopCollector(), s.state, s.blocks, @@ -183,7 +186,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() { indexer := s.createIndexer(s.T()) - inStorage, err := indexer.IsCollectionInStorage(collection.ID()) + inStorage, err := indexer.isCollectionInStorage(collection.ID()) s.Require().NoError(err) s.Require().True(inStorage) }) @@ -193,7 +196,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() { indexer := s.createIndexer(s.T()) - inStorage, err := indexer.IsCollectionInStorage(collection.ID()) + inStorage, err := indexer.isCollectionInStorage(collection.ID()) s.Require().NoError(err) s.Require().False(inStorage) }) @@ -204,7 +207,7 @@ func (s *IndexerSuite) TestIsCollectionInStorage() { indexer := s.createIndexer(s.T()) - inStorage, err := indexer.IsCollectionInStorage(collection.ID()) + inStorage, err := indexer.isCollectionInStorage(collection.ID()) s.Require().ErrorIs(err, exception) s.Require().False(inStorage) }) @@ -268,7 +271,11 @@ func (s *IndexerSuite) TestOnCollectionReceived() { synctest.Test(s.T(), func(t *testing.T) { s.collections.On("LightByID", collection.ID()).Return(nil, storage.ErrNotFound).Once() - s.collections.On("StoreAndIndexByTransaction", mock.Anything, collection).Return(collection.Light(), nil).Once() + s.collections.On("BatchStoreAndIndexByTransaction", mock.Anything, collection, mock.Anything).Return(collection.Light(), nil).Once() + + s.db.On("WithReaderBatchWriter", mock.Anything).Return(func(fn func(storage.ReaderBatchWriter) error) error { + return fn(nil) + }).Once() indexer := s.createIndexer(s.T()) @@ -312,6 +319,7 @@ func (s *IndexerSuite) TestWorkerProcessing_ProcessesCollections() { indexer, err := NewIndexer( unittest.Logger(), + bc.db, metrics.NewNoopCollector(), bc.state, bc.all.Blocks, diff --git a/engine/access/ingestion/collections/mock/collection_indexer.go b/engine/access/ingestion/collections/mock/collection_indexer.go index 9cfafd0df99..9d2f174cfee 100644 --- a/engine/access/ingestion/collections/mock/collection_indexer.go +++ b/engine/access/ingestion/collections/mock/collection_indexer.go @@ -12,32 +12,22 @@ type CollectionIndexer struct { mock.Mock } -// IsCollectionInStorage provides a mock function with given fields: collectionID -func (_m *CollectionIndexer) IsCollectionInStorage(collectionID flow.Identifier) (bool, error) { - ret := _m.Called(collectionID) +// IndexCollections provides a mock function with given fields: _a0 +func (_m *CollectionIndexer) IndexCollections(_a0 []*flow.Collection) error { + ret := _m.Called(_a0) if len(ret) == 0 { - panic("no return value specified for IsCollectionInStorage") + panic("no return value specified for IndexCollections") } - var r0 bool - var r1 error - if rf, ok := ret.Get(0).(func(flow.Identifier) (bool, error)); ok { - return rf(collectionID) - } - if rf, ok := ret.Get(0).(func(flow.Identifier) bool); ok { - r0 = rf(collectionID) + var r0 error + if rf, ok := ret.Get(0).(func([]*flow.Collection) error); ok { + r0 = rf(_a0) } else { - r0 = ret.Get(0).(bool) + r0 = ret.Error(0) } - if rf, ok := ret.Get(1).(func(flow.Identifier) error); ok { - r1 = rf(collectionID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 + return r0 } // MissingCollectionsAtHeight provides a mock function with given fields: height diff --git a/engine/access/ingestion/collections/syncer.go b/engine/access/ingestion/collections/syncer.go index 78cf887608a..53007147fe7 100644 --- a/engine/access/ingestion/collections/syncer.go +++ b/engine/access/ingestion/collections/syncer.go @@ -28,20 +28,15 @@ const ( // if missing collections have been received during the initial collection catchup process. DefaultCollectionCatchupDBPollInterval = 1 * time.Second - // DefaultMissingCollsRequestInterval is the interval at which the syncer checks missing collections + // DefaultMissingCollectionRequestInterval is the interval at which the syncer checks missing collections // and re-requests them from the network if needed. - DefaultMissingCollsRequestInterval = 1 * time.Minute + DefaultMissingCollectionRequestInterval = 1 * time.Minute - // DefaultMissingCollsForBlockThreshold is the threshold number of blocks with missing collections - // beyond which collections should be re-requested. This prevents spamming the collection nodes - // with requests for recent data. - DefaultMissingCollsForBlockThreshold = 100 - - // DefaultMissingCollsForAgeThreshold is the block height threshold below which collections + // DefaultMissingCollectionRequestThreshold is the block height threshold below which collections // should be re-requested, regardless of the number of blocks for which collection are missing. // This is to ensure that if a collection is missing for a long time (in terms of block height) // it is eventually re-requested. - DefaultMissingCollsForAgeThreshold = 100 + DefaultMissingCollectionRequestThreshold = 100 ) // The Syncer is responsible for syncing collections for finalized blocks from the network. It has @@ -74,11 +69,10 @@ type Syncer struct { execDataSyncer *ExecutionDataSyncer // may be nil // these are held as members to allow configuring their values during testing. - collectionCatchupTimeout time.Duration - collectionCatchupDBPollInterval time.Duration - missingCollsForBlockThreshold int - missingCollsForAgeThreshold uint64 - missingCollsRequestInterval time.Duration + collectionCatchupTimeout time.Duration + collectionCatchupDBPollInterval time.Duration + missingCollectionRequestThreshold uint64 + missingCollectionRequestInterval time.Duration } // NewSyncer creates a new Syncer responsible for requesting, tracking, and indexing missing collections. @@ -100,53 +94,61 @@ func NewSyncer( indexer: collectionIndexer, execDataSyncer: execDataSyncer, - collectionCatchupTimeout: DefaultCollectionCatchupTimeout, - collectionCatchupDBPollInterval: DefaultCollectionCatchupDBPollInterval, - missingCollsForBlockThreshold: DefaultMissingCollsForBlockThreshold, - missingCollsForAgeThreshold: DefaultMissingCollsForAgeThreshold, - missingCollsRequestInterval: DefaultMissingCollsRequestInterval, + collectionCatchupTimeout: DefaultCollectionCatchupTimeout, + collectionCatchupDBPollInterval: DefaultCollectionCatchupDBPollInterval, + missingCollectionRequestThreshold: DefaultMissingCollectionRequestThreshold, + missingCollectionRequestInterval: DefaultMissingCollectionRequestInterval, } } // WorkerLoop is a [component.ComponentWorker] that continuously monitors for missing collections, and // requests them from the network if needed. It also performs an initial collection catchup on startup. func (s *Syncer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - requestCtx, cancel := context.WithTimeout(ctx, s.collectionCatchupTimeout) - defer cancel() - - // attempt to download all known missing collections on start-up. - err := s.requestMissingCollectionsBlocking(requestCtx) - if err != nil { - if ctx.Err() != nil { - s.log.Error().Err(err).Msg("engine shutdown while downloading missing collections") + // Block marking the component ready until either the first run of the missing collections catchup + // completes, or `collectionCatchupTimeout` expires. This improves the user experience by preventing + // the Access API from starting while the initial catchup completes. The intention is to avoid + // returning NotFound errors immediately after startup for data that should be available, but has + // not yet been indexed. + // TODO (peter): consider removing this. I'm not convinced that this provides much value since in + // the common case (node restart), the block finalization would also pause so the node should not + // become farther behind in terms of collections. + readyChan := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): return + case <-readyChan: + case <-time.After(s.collectionCatchupTimeout): } + ready() + }() - if !errors.Is(err, context.DeadlineExceeded) { - ctx.Throw(fmt.Errorf("error downloading missing collections: %w", err)) + requestCollectionsTicker := time.NewTicker(s.missingCollectionRequestInterval) + defer requestCollectionsTicker.Stop() + + initialCatchupComplete := false + for { + err := s.requestMissingCollections(ctx, !initialCatchupComplete) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + + ctx.Throw(fmt.Errorf("failed to request missing collections: %w", err)) return } - // timed out during catchup. continue with normal startup. - // missing collections will be requested periodically. - s.log.Error().Err(err).Msg("timed out syncing collections during startup") - } - ready() - - requestCollectionsTicker := time.NewTicker(s.missingCollsRequestInterval) - defer requestCollectionsTicker.Stop() + // after the first successful run, mark the catchup as complete. This will cause the worker + // to call the ready function if it was not already called. + if !initialCatchupComplete { + initialCatchupComplete = true + close(readyChan) + } - for { select { + case <-requestCollectionsTicker.C: case <-ctx.Done(): return - - case <-requestCollectionsTicker.C: - err := s.requestMissingCollections(ctx) - if err != nil { - ctx.Throw(fmt.Errorf("failed to request missing collections: %w", err)) - return - } } } } @@ -197,168 +199,102 @@ func (s *Syncer) requestCollections(collections []*flow.CollectionGuarantee) err return nil } -// requestMissingCollections checks if missing collections should be requested based on configured -// block or age thresholds and triggers requests if needed. +// requestMissingCollections requests all missing collections using local execution data if available, +// otherwise requests them from the network. Only sends requests to the network if we are more than +// `missingCollectionRequestThreshold` blocks behind the latest finalized block. // -// No error returns are expected during normal operation. -func (s *Syncer) requestMissingCollections(ctx context.Context) error { +// Expected error returns during normal operation: +// - [context.Canceled]: if the context is canceled before all collections are requested +func (s *Syncer) requestMissingCollections(ctx context.Context, forceSendRequests bool) error { lastFullBlockHeight := s.lastFullBlockHeight.Value() lastFinalizedBlock, err := s.state.Final().Head() if err != nil { return fmt.Errorf("failed to get finalized block: %w", err) } - // if the node is syncing execution data, use the already downloaded data to index any available - // collections we are still missing. - lastSyncedHeight := lastFullBlockHeight - if s.execDataSyncer != nil { - lastSyncedHeight, err = s.execDataSyncer.IndexFromStartHeight(ctx, lastFullBlockHeight) - if err != nil { - return fmt.Errorf("failed to index collections from execution data: %w", err) - } - // At this point, all collections within blocks up to and including `lastSyncedHeight` were - // submitted for indexing. However, indexing is completed asynchronously, so `lastSyncedHeight` - // was set to the last block for which we have execution data to avoid re-requesting already - // submitted collections. - } - - // request all other missing collections from Collection nodes. - collections, incompleteBlocksCount, err := s.findMissingCollections(lastSyncedHeight, lastFinalizedBlock.Height) - if err != nil { - return fmt.Errorf("failed to find missing collections: %w", err) + if lastFullBlockHeight >= lastFinalizedBlock.Height { + return nil } // only send requests if we are sufficiently behind the latest finalized block to avoid spamming // collection nodes with requests. - blocksThresholdReached := incompleteBlocksCount >= s.missingCollsForBlockThreshold - ageThresholdReached := lastFinalizedBlock.Height-lastFullBlockHeight > s.missingCollsForAgeThreshold - if len(collections) > 0 && (blocksThresholdReached || ageThresholdReached) { - // warn log since this should generally not happen - s.log.Warn(). - Uint64("finalized_height", lastFinalizedBlock.Height). - Uint64("last_full_blk_height", lastFullBlockHeight). - Int("missing_collection_blk_count", incompleteBlocksCount). - Int("missing_collection_count", len(collections)). - Msg("re-requesting missing collections") - - err = s.requestCollections(collections) - if err != nil { - return fmt.Errorf("failed to request collections: %w", err) - } - // since this is a re-request, do not use force. new finalized block requests will force - // dispatch. On the happy path, this will happen at least once per second. + shouldSendRequestsToNetwork := forceSendRequests + if lastFinalizedBlock.Height-lastFullBlockHeight >= s.missingCollectionRequestThreshold { + shouldSendRequestsToNetwork = true } - return nil -} + progress := util.LogProgress(s.log, util.DefaultLogProgressConfig("requesting missing collections", lastFinalizedBlock.Height-lastFullBlockHeight)) -// findMissingCollections scans block heights from last known full block up to the latest finalized -// block and returns all missing collection along with the count of incomplete blocks. -// -// No error returns are expected during normal operation. -func (s *Syncer) findMissingCollections(lastFullBlockHeight, finalizedBlockHeight uint64) ([]*flow.CollectionGuarantee, int, error) { - var missingCollections []*flow.CollectionGuarantee - var incompleteBlocksCount int + requestedBlocks := 0 + requestedCollections := 0 + for height := lastFullBlockHeight + 1; height <= lastFinalizedBlock.Height; height++ { + if ctx.Err() != nil { + return fmt.Errorf("missing collection catchup interrupted: %w", ctx.Err()) + } - for height := lastFullBlockHeight + 1; height <= finalizedBlockHeight; height++ { - collections, err := s.indexer.MissingCollectionsAtHeight(height) + collections, requested, err := s.requestForHeight(ctx, height, shouldSendRequestsToNetwork) if err != nil { - return nil, 0, err + return fmt.Errorf("failed to request collections for height %d: %w", height, err) } - - if len(collections) > 0 { - missingCollections = append(missingCollections, collections...) - incompleteBlocksCount += 1 + if requested { + requestedBlocks++ + requestedCollections += len(collections) } + + progress(1) } - return missingCollections, incompleteBlocksCount, nil + if requestedBlocks > 0 { + s.log.Warn(). + Uint64("finalized_height", lastFinalizedBlock.Height). + Uint64("last_full_block_height", lastFullBlockHeight). + Int("requested_block_count", requestedBlocks). + Int("requested_collection_count", requestedCollections). + Msg("re-requesting missing collections") + } + + return nil } -// requestMissingCollectionsBlocking requests and waits for all missing collections to be downloaded, -// blocking until either completion or context timeout. +// requestForHeight requests all missing collections for the given height. +// If collections are available from execution data, they are indexed first. All other collections +// are requested from the network if `requestFromNetwork` is true. +// Returns true if the collections were missing and requested. +// Returns the list of collections that were requested from the network. // // No error returns are expected during normal operation. -func (s *Syncer) requestMissingCollectionsBlocking(ctx context.Context) error { - lastFullBlockHeight := s.lastFullBlockHeight.Value() - lastFinalizedBlock, err := s.state.Final().Head() +func (s *Syncer) requestForHeight(ctx context.Context, height uint64, requestFromNetwork bool) ([]*flow.CollectionGuarantee, bool, error) { + collections, err := s.indexer.MissingCollectionsAtHeight(height) if err != nil { - return fmt.Errorf("failed to get finalized block: %w", err) + return nil, false, fmt.Errorf("failed to find missing collections at height %d: %w", height, err) + } + if len(collections) == 0 { + return nil, false, nil } - progress := util.LogProgress(s.log, util.DefaultLogProgressConfig("requesting missing collections", lastFinalizedBlock.Height-lastFullBlockHeight)) - - pendingCollections := make(map[flow.Identifier]struct{}) - for height := lastFullBlockHeight + 1; height <= lastFinalizedBlock.Height; height++ { - if ctx.Err() != nil { - return fmt.Errorf("missing collection catchup interrupted: %w", ctx.Err()) - } - - collections, err := s.indexer.MissingCollectionsAtHeight(height) + // always index available collections from execution data. + if s.execDataSyncer != nil { + submitted, err := s.execDataSyncer.IndexForHeight(ctx, height) if err != nil { - return fmt.Errorf("failed to find missing collections at height %d: %w", height, err) + return nil, false, fmt.Errorf("failed to index collections from execution data: %w", err) } - - if len(collections) > 0 { - var submitted bool - if s.execDataSyncer != nil { - submitted, err = s.execDataSyncer.IndexForHeight(ctx, height) - if err != nil { - return fmt.Errorf("failed to index collections from execution data: %w", err) - } - } - - // if the data wasn't available from execution data, request it from Collection nodes. - if !submitted { - err = s.requestCollections(collections) - if err != nil { - return fmt.Errorf("failed to request collections: %w", err) - } - for _, collection := range collections { - pendingCollections[collection.CollectionID] = struct{}{} - } - } + if submitted { + return nil, true, nil } - - progress(1) } - if len(pendingCollections) == 0 { - s.log.Info().Msg("no missing collections to download") - return nil + // only request collections from the network if asked to do so. + if !requestFromNetwork { + return nil, false, nil } - // trigger immediate dispatch of any pending collection requests. - s.requester.Force() - - collectionStoragePollTicker := time.NewTicker(s.collectionCatchupDBPollInterval) - defer collectionStoragePollTicker.Stop() - - // we want to wait for all collections to be downloaded so we poll local storage periodically to - // make sure each collection was successfully stored and indexed. - for len(pendingCollections) > 0 { - select { - case <-ctx.Done(): - return fmt.Errorf("failed to complete collection retrieval: %w", ctx.Err()) - - case <-collectionStoragePollTicker.C: - s.log.Debug(). - Int("total_missing_collections", len(pendingCollections)). - Msg("retrieving missing collections...") - - for collectionID := range pendingCollections { - downloaded, err := s.indexer.IsCollectionInStorage(collectionID) - if err != nil { - return err - } - - if downloaded { - delete(pendingCollections, collectionID) - } - } - } + err = s.requestCollections(collections) + if err != nil { + return nil, false, fmt.Errorf("failed to request collections: %w", err) } + // request made during catchup do not use Force() so they can be batched together for efficiency. + // In practice, Force() is called once for each finalized block, so requests are dispatched at + // least as often as the block rate. - s.log.Info().Msg("collection catchup done") - return nil + return collections, true, nil } diff --git a/engine/access/ingestion/collections/syncer_execution_data.go b/engine/access/ingestion/collections/syncer_execution_data.go index 1be0ac55fda..c8a2086f8e9 100644 --- a/engine/access/ingestion/collections/syncer_execution_data.go +++ b/engine/access/ingestion/collections/syncer_execution_data.go @@ -7,30 +7,36 @@ import ( "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/storage" + "github.com/rs/zerolog" ) // ExecutionDataSyncer submits collections from execution data to the collections indexer. It is // designed to be used within the collection syncer to optimize indexing when collection data is // already available on the node. type ExecutionDataSyncer struct { + log zerolog.Logger executionDataCache execution_data.ExecutionDataCache indexer CollectionIndexer } func NewExecutionDataSyncer( + log zerolog.Logger, executionDataCache execution_data.ExecutionDataCache, indexer CollectionIndexer, ) *ExecutionDataSyncer { return &ExecutionDataSyncer{ + log: log.With().Str("component", "execution-data-syncer").Logger(), executionDataCache: executionDataCache, indexer: indexer, } } -// IndexForHeight indexes the collections for a given height using locally available execution data. +// IndexForHeight indexes the collections for a given finalized block height using locally available +// execution data. // Returns false and no error if execution data for the block is not available. // -// No error returns are expected during normal operation. +// Expected error returns during normal operation: +// - [context.Canceled]: if the context is canceled before the collections are indexed. func (s *ExecutionDataSyncer) IndexForHeight(ctx context.Context, height uint64) (bool, error) { executionData, err := s.executionDataCache.ByHeight(ctx, height) if err != nil { @@ -40,31 +46,19 @@ func (s *ExecutionDataSyncer) IndexForHeight(ctx context.Context, height uint64) return false, fmt.Errorf("failed to get execution data for height %d: %w", height, err) } - // index all collections except for the system chunk. - for _, chunkData := range executionData.ChunkExecutionDatas[:len(executionData.ChunkExecutionDatas)-1] { - s.indexer.OnCollectionReceived(chunkData.Collection) - } - - return true, nil -} - -// IndexFromStartHeight indexes the collections for all blocks with available execution data starting -// from the last full block height. Returns the last indexed height. -// -// No error returns are expected during normal operation. -func (s *ExecutionDataSyncer) IndexFromStartHeight(ctx context.Context, lastFullBlockHeight uint64) (uint64, error) { - lastIndexedHeight := lastFullBlockHeight - height := lastFullBlockHeight + 1 - for { - submitted, err := s.IndexForHeight(ctx, height) + // index all standard (non-system) collections. + standardCollections := executionData.StandardCollections() + if len(standardCollections) > 0 { + err = s.indexer.IndexCollections(standardCollections) if err != nil { - return 0, err + return false, fmt.Errorf("failed to index collections from execution data for height %d: %w", height, err) } - if !submitted { - return lastIndexedHeight, nil - } - - lastIndexedHeight = height - height++ } + + s.log.Debug(). + Uint64("height", height). + Int("collection_count", len(standardCollections)). + Msg("indexed collections from execution data") + + return true, nil } diff --git a/engine/access/ingestion/collections/syncer_test.go b/engine/access/ingestion/collections/syncer_test.go index be744da5df0..4ee0759a681 100644 --- a/engine/access/ingestion/collections/syncer_test.go +++ b/engine/access/ingestion/collections/syncer_test.go @@ -3,6 +3,7 @@ package collections import ( "context" "errors" + "fmt" "testing" "testing/synctest" "time" @@ -22,6 +23,7 @@ import ( modulemock "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/module/signature" protocolmock "github.com/onflow/flow-go/state/protocol/mock" + "github.com/onflow/flow-go/storage" storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/unittest" "github.com/onflow/flow-go/utils/unittest/fixtures" @@ -60,6 +62,7 @@ func (s *SyncerSuite) SetupTest() { func (s *SyncerSuite) createSyncer() *Syncer { execDataSyncer := NewExecutionDataSyncer( + unittest.Logger(), s.executionDataCache, s.indexer, ) @@ -188,19 +191,6 @@ func (s *SyncerSuite) TestWorkerLoop_InitialCatchup_CollectionNodesOnly() { } } - // called once for the batch - s.requester.On("Force").Once() - - for _, guarantees := range guaranteesByHeight { - for _, guarantee := range guarantees { - // simulate the collection being missing by randomly returning false. All collections should - // eventually be found in storage. - s.indexer.On("IsCollectionInStorage", guarantee.CollectionID).Return(func(collectionID flow.Identifier) (bool, error) { - return g.Random().Bool(), nil - }) - } - } - syncer := NewSyncer( unittest.Logger(), s.requester, @@ -276,10 +266,7 @@ func (s *SyncerSuite) TestWorkerLoop_InitialCatchup_SplitExecutionDataAndCollect s.indexer.On("MissingCollectionsAtHeight", height).Return(guarantees, nil).Once() s.executionDataCache.On("ByHeight", mock.Anything, height).Return(execData, nil).Once() - - for _, chunkData := range execData.ChunkExecutionDatas[:len(execData.ChunkExecutionDatas)-1] { - s.indexer.On("OnCollectionReceived", chunkData.Collection).Once() - } + s.indexer.On("IndexCollections", execData.StandardCollections()).Return(nil).Once() } // the syncer should request collections from collection nodes for all remaining heights. @@ -297,19 +284,6 @@ func (s *SyncerSuite) TestWorkerLoop_InitialCatchup_SplitExecutionDataAndCollect } } - // called once for the batch - s.requester.On("Force").Once() - - for _, guarantees := range guaranteesByHeight { - for _, guarantee := range guarantees { - // simulate the collection being missing by randomly returning false. All collections should - // eventually be found in storage. - s.indexer.On("IsCollectionInStorage", guarantee.CollectionID).Return(func(collectionID flow.Identifier) (bool, error) { - return g.Random().Bool(), nil - }) - } - } - syncer := s.createSyncer() synctest.Test(s.T(), func(t *testing.T) { @@ -373,9 +347,7 @@ func (s *SyncerSuite) TestWorkerLoop_InitialCatchup_AllAvailableFromExecutionDat s.indexer.On("MissingCollectionsAtHeight", height).Return(guarantees, nil).Once() s.executionDataCache.On("ByHeight", mock.Anything, height).Return(execData, nil).Once() - for _, chunkData := range execData.ChunkExecutionDatas[:len(execData.ChunkExecutionDatas)-1] { - s.indexer.On("OnCollectionReceived", chunkData.Collection).Once() - } + s.indexer.On("IndexCollections", execData.StandardCollections()).Return(nil).Once() } syncer := s.createSyncer() @@ -406,14 +378,14 @@ func (s *SyncerSuite) TestWorkerLoop_InitialCatchup_Timesout() { g := fixtures.NewGeneratorSuite() synctest.Test(s.T(), func(t *testing.T) { - finalizedHeight := s.lastFullBlockHeight.Value() + 10 + finalizedHeight := s.lastFullBlockHeight.Value() + 1 finalizedHeader := g.Headers().Fixture(fixtures.Header.WithHeight(finalizedHeight)) finalSnapshot := protocolmock.NewSnapshot(s.T()) finalSnapshot.On("Head").Return(finalizedHeader, nil).Once() s.state.On("Final").Return(finalSnapshot).Once() - // block the first call to MissingCollectionsAtHeight that is called during startup until after - // the timeout. This simulates the catchup logic taking too long. + // block the first call to MissingCollectionsAtHeight during startup until after the timeout. + // This simulates the catchup logic taking too long. unblockStartup := make(chan struct{}) s.indexer. On("MissingCollectionsAtHeight", s.lastFullBlockHeight.Value()+1). @@ -470,7 +442,7 @@ func (s *SyncerSuite) TestWorkerLoop_RequestsMissingCollections() { } s.Run("no missing collections", func() { - s.runWorkerLoopMissingCollections(g, nil, func(syncer *Syncer) { + s.runWorkerLoopMissingCollections(g, nil, nil, func(syncer *Syncer) { finalSnapshot := protocolmock.NewSnapshot(s.T()) finalSnapshot.On("Head").Return(finalizedHeader, nil).Once() s.state.On("Final").Return(finalSnapshot).Once() @@ -482,7 +454,7 @@ func (s *SyncerSuite) TestWorkerLoop_RequestsMissingCollections() { }) s.Run("missing collections - request skipped below thresholds", func() { - s.runWorkerLoopMissingCollections(g, nil, func(syncer *Syncer) { + s.runWorkerLoopMissingCollections(g, nil, nil, func(syncer *Syncer) { finalSnapshot := protocolmock.NewSnapshot(s.T()) finalSnapshot.On("Head").Return(finalizedHeader, nil).Once() s.state.On("Final").Return(finalSnapshot).Once() @@ -493,9 +465,9 @@ func (s *SyncerSuite) TestWorkerLoop_RequestsMissingCollections() { }) }) - s.Run("missing collections - request sent when count exceeds missingCollsForBlockThreshold", func() { - s.runWorkerLoopMissingCollections(g, nil, func(syncer *Syncer) { - syncer.missingCollsForBlockThreshold = 9 + s.Run("missing collections - request sent when missing blocks exceeds missingCollectionRequestThreshold", func() { + s.runWorkerLoopMissingCollections(g, nil, nil, func(syncer *Syncer) { + syncer.missingCollectionRequestThreshold = 9 finalSnapshot := protocolmock.NewSnapshot(s.T()) finalSnapshot.On("Head").Return(finalizedHeader, nil).Once() @@ -503,6 +475,7 @@ func (s *SyncerSuite) TestWorkerLoop_RequestsMissingCollections() { for height := lastFullBlockHeight + 1; height <= finalizedHeight; height++ { s.indexer.On("MissingCollectionsAtHeight", height).Return(allGuarantees[height], nil).Once() + for _, guarantee := range allGuarantees[height] { s.requester.On("EntityByID", guarantee.CollectionID, mock.Anything).Once() s.mockGuarantorsForCollection(guarantee, guarantors.ToSkeleton()) @@ -511,52 +484,94 @@ func (s *SyncerSuite) TestWorkerLoop_RequestsMissingCollections() { }) }) - s.Run("missing collections - request sent when age exceeds missingCollsForAgeThreshold", func() { - s.runWorkerLoopMissingCollections(g, nil, func(syncer *Syncer) { - syncer.missingCollsForAgeThreshold = 9 + s.Run("missing collections - processed from execution data", func() { + execDataSyncer := NewExecutionDataSyncer( + unittest.Logger(), + s.executionDataCache, + s.indexer, + ) + s.runWorkerLoopMissingCollections(g, execDataSyncer, nil, func(syncer *Syncer) { finalSnapshot := protocolmock.NewSnapshot(s.T()) finalSnapshot.On("Head").Return(finalizedHeader, nil).Once() s.state.On("Final").Return(finalSnapshot).Once() - for height := s.lastFullBlockHeight.Value() + 1; height <= finalizedHeight; height++ { + for height := lastFullBlockHeight + 1; height <= finalizedHeight; height++ { s.indexer.On("MissingCollectionsAtHeight", height).Return(allGuarantees[height], nil).Once() - } - for height := s.lastFullBlockHeight.Value() + 1; height <= finalizedHeight; height++ { - for _, guarantee := range allGuarantees[height] { - s.requester.On("EntityByID", guarantee.CollectionID, mock.Anything).Once() - s.mockGuarantorsForCollection(guarantee, guarantors.ToSkeleton()) - } + execData, _ := executionDataFixture(g) + s.executionDataCache.On("ByHeight", mock.Anything, height).Return(execData, nil).Once() + s.indexer.On("IndexCollections", execData.StandardCollections()).Return(nil).Once() } }) }) - s.Run("missing collections - processed from execution data", func() { + s.Run("handles context cancelation gracefully", func() { execDataSyncer := NewExecutionDataSyncer( + unittest.Logger(), s.executionDataCache, s.indexer, ) - s.runWorkerLoopMissingCollections(g, execDataSyncer, func(syncer *Syncer) { + s.runWorkerLoopMissingCollections(g, execDataSyncer, nil, func(syncer *Syncer) { finalSnapshot := protocolmock.NewSnapshot(s.T()) finalSnapshot.On("Head").Return(finalizedHeader, nil).Once() s.state.On("Final").Return(finalSnapshot).Once() - for height := lastFullBlockHeight + 1; height <= finalizedHeight; height++ { - execData, _ := executionDataFixture(g) - s.executionDataCache.On("ByHeight", mock.Anything, height).Return(execData, nil).Once() - for _, chunkData := range execData.ChunkExecutionDatas[:len(execData.ChunkExecutionDatas)-1] { - s.indexer.On("OnCollectionReceived", chunkData.Collection).Once() - } - } - // syncer continues until it receives a not found error. - s.executionDataCache.On("ByHeight", mock.Anything, finalizedHeight+1).Return(nil, execution_data.NewBlobNotFoundError(cid.Cid{})).Once() + height := lastFullBlockHeight + 1 + s.indexer.On("MissingCollectionsAtHeight", height).Return(allGuarantees[height], nil).Once() + s.executionDataCache.On("ByHeight", mock.Anything, height).Return(nil, context.Canceled).Once() + + // should exit without error + }) + }) + + s.Run("throws exception if finalized block is not found", func() { + exception := fmt.Errorf("finalized block not found: %w", storage.ErrNotFound) + s.runWorkerLoopMissingCollections(g, nil, exception, func(syncer *Syncer) { + finalSnapshot := protocolmock.NewSnapshot(s.T()) + finalSnapshot.On("Head").Return(nil, exception).Once() + s.state.On("Final").Return(finalSnapshot).Once() + + // should throw exception + }) + }) + + s.Run("throws exception if MissingCollectionsAtHeight returns an error", func() { + exception := fmt.Errorf("missing collections at height: %w", storage.ErrNotFound) + s.runWorkerLoopMissingCollections(g, nil, exception, func(syncer *Syncer) { + finalSnapshot := protocolmock.NewSnapshot(s.T()) + finalSnapshot.On("Head").Return(finalizedHeader, nil).Once() + s.state.On("Final").Return(finalSnapshot).Once() + + height := s.lastFullBlockHeight.Value() + 1 + s.indexer.On("MissingCollectionsAtHeight", height).Return(nil, exception).Once() + + // should throw exception + }) + }) + + s.Run("throws exception if requestCollections returns an error", func() { + exception := fmt.Errorf("request collections: %w", storage.ErrNotFound) + s.runWorkerLoopMissingCollections(g, nil, exception, func(syncer *Syncer) { + syncer.missingCollectionRequestThreshold = 9 // make sure the request is sent + + finalSnapshot := protocolmock.NewSnapshot(s.T()) + finalSnapshot.On("Head").Return(finalizedHeader, nil).Once() + s.state.On("Final").Return(finalSnapshot).Once() + + height := s.lastFullBlockHeight.Value() + 1 + guarantee := allGuarantees[height][0] + s.indexer.On("MissingCollectionsAtHeight", height).Return(allGuarantees[height], nil).Once() + + s.mockGuarantorsForCollectionReturnsError(guarantee, exception) + + // should throw exception }) }) } -func (s *SyncerSuite) runWorkerLoopMissingCollections(g *fixtures.GeneratorSuite, execDataSyncer *ExecutionDataSyncer, onReady func(*Syncer)) { +func (s *SyncerSuite) runWorkerLoopMissingCollections(g *fixtures.GeneratorSuite, execDataSyncer *ExecutionDataSyncer, expectedError error, onReady func(*Syncer)) { synctest.Test(s.T(), func(t *testing.T) { // last full block is latest finalized block, so initial catchup is skipped finalizedHeight := s.lastFullBlockHeight.Value() @@ -575,14 +590,23 @@ func (s *SyncerSuite) runWorkerLoopMissingCollections(g *fixtures.GeneratorSuite execDataSyncer, ) - ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + + var signalerCtx irrecoverable.SignalerContext + if expectedError == nil { + signalerCtx = irrecoverable.NewMockSignalerContext(s.T(), ctx) + } else { + signalerCtx = irrecoverable.NewMockSignalerContextWithCallback(s.T(), ctx, func(err error) { + s.Require().ErrorIs(err, expectedError) + }) + } done := make(chan struct{}) ready := make(chan struct{}) go func() { defer close(done) - syncer.WorkerLoop(ctx, func() { + syncer.WorkerLoop(signalerCtx, func() { onReady(syncer) close(ready) }) @@ -590,7 +614,7 @@ func (s *SyncerSuite) runWorkerLoopMissingCollections(g *fixtures.GeneratorSuite <-ready - time.Sleep(syncer.missingCollsRequestInterval + 1) + time.Sleep(syncer.missingCollectionRequestInterval + 1) synctest.Wait() cancel() diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index 808b90b0690..d3e0e62fce1 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -183,6 +183,7 @@ func (s *Suite) initEngineAndSyncer() (*Engine, *collections.Syncer, *collection indexer, err := collections.NewIndexer( s.log, + s.db, s.collectionExecutedMetric, s.proto.state, s.blocks, diff --git a/engine/access/ingestion2/engine_test.go b/engine/access/ingestion2/engine_test.go index 8067ac3c4cd..7e22207b5ac 100644 --- a/engine/access/ingestion2/engine_test.go +++ b/engine/access/ingestion2/engine_test.go @@ -211,6 +211,7 @@ func (s *Suite) initEngineAndSyncer(ctx irrecoverable.SignalerContext) (*Engine, indexer, err := collections.NewIndexer( s.log, + s.db, s.collectionExecutedMetric, s.proto.state, s.blocks, diff --git a/module/executiondatasync/execution_data/execution_data.go b/module/executiondatasync/execution_data/execution_data.go index 46f6e741738..d0645329d3d 100644 --- a/module/executiondatasync/execution_data/execution_data.go +++ b/module/executiondatasync/execution_data/execution_data.go @@ -68,6 +68,31 @@ type BlockExecutionData struct { ChunkExecutionDatas []*ChunkExecutionData } +// StandardChunks returns all standard (non-system) chunks for the block. +func (bd *BlockExecutionData) StandardChunks() []*ChunkExecutionData { + return bd.ChunkExecutionDatas[:len(bd.ChunkExecutionDatas)-1] +} + +// StandardCollections returns all standard (non-system) collections for the block. +func (bd *BlockExecutionData) StandardCollections() []*flow.Collection { + standardChunks := bd.StandardChunks() + collections := make([]*flow.Collection, len(standardChunks)) + for i, chunk := range standardChunks { + collections[i] = chunk.Collection + } + return collections +} + +// SystemChunk returns the system chunk for the block. +func (bd *BlockExecutionData) SystemChunk() *ChunkExecutionData { + return bd.ChunkExecutionDatas[len(bd.ChunkExecutionDatas)-1] +} + +// SystemCollection returns the system collection for the block. +func (bd *BlockExecutionData) SystemCollection() *flow.Collection { + return bd.SystemChunk().Collection +} + // ConvertTransactionResults converts a list of flow.TransactionResults into a list of // flow.LightTransactionResults to be included in a ChunkExecutionData. func ConvertTransactionResults(results flow.TransactionResults) []flow.LightTransactionResult { diff --git a/module/state_synchronization/indexer/indexer_core.go b/module/state_synchronization/indexer/indexer_core.go index b0e0d2fdbaa..b2d92e65e69 100644 --- a/module/state_synchronization/indexer/indexer_core.go +++ b/module/state_synchronization/indexer/indexer_core.go @@ -228,18 +228,17 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti // than the latest indexed block. Calling the collection handler with a collection that // has already been indexed is a noop. - // index all collections except the system chunk. if there is only a single chunk, it is the - // system chunk and can be skipped. - indexedCount := 0 - if len(data.ChunkExecutionDatas) > 1 { - for _, chunk := range data.ChunkExecutionDatas[0 : len(data.ChunkExecutionDatas)-1] { - c.collectionIndexer.OnCollectionReceived(chunk.Collection) - indexedCount++ + // index all standard (non-system) collections + standardCollections := data.StandardCollections() + if len(standardCollections) > 0 { + err := c.collectionIndexer.IndexCollections(standardCollections) + if err != nil { + return fmt.Errorf("could not index collections: %w", err) } } lg.Debug(). - Int("collection_count", indexedCount). + Int("collection_count", len(standardCollections)). Dur("duration_ms", time.Since(start)). Msg("indexed collections")