-
Notifications
You must be signed in to change notification settings - Fork 202
[Access] POC Collection Syncing #8154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 87 commits
d14ed10
12339f5
a7a0253
6a9ccd5
4277a72
59684ee
3353177
92fe356
79076c4
c4ffce4
aefe0f2
7ff6772
c723fc9
69f2da9
c9f1ffa
1c43552
66e639b
28d459a
96caf34
31889f8
eb4f9f5
3b5fc1d
06b83b0
743d56d
91fe006
52149f6
ef5635b
11f32a2
9ee10b7
c4a867c
8664c48
7f25f25
ee66ca9
075e7cd
4770fb7
8feb65e
2feb170
232a602
f5bda62
25e54a8
75c7ed3
a9348ee
cc66243
3ed17bb
6a44481
1acbab1
e99a2c0
b68267e
389273f
b65768a
1ae6355
a5ba7b0
a9de92a
232e6e0
d1ffe91
271c271
3b813eb
16f1c1f
e1d1480
0943ee9
84072c0
e1ebb8c
594ee94
cde5198
c49da01
029e8e0
cee0e8f
58228be
5bd3c88
9d5fdbf
fea0f65
d0b2fc5
ca8695c
43c0402
b2268c7
1ea00c2
835e793
1cf295c
a4dd3b3
84b37af
4198d32
c53a0e3
00f5c1a
8c72128
d405241
8d8740e
743e7b2
c29aabe
af7ec61
237973d
a31eb4f
a6d6c08
aedc1db
6578186
8f08af3
a3e5814
f555c73
768f401
afbf8bc
3dcb692
17190cb
964ee3c
262c755
ec6cdc9
298234c
1436a6b
c7d5b50
1fe2607
5df74e8
079a2aa
50293fd
d9b4c20
65245e9
0728e39
ee346e2
f6dacb4
49876a9
75a28bb
5b17ae2
c8f71b9
8de563a
91299c2
199a84a
9c3c5d9
8e70d15
15450bf
c4e8228
d71309e
38ef51d
d1250a3
8e51aae
e5e3421
484ff18
75d04b9
5b07e92
f08edf4
e2d29da
bbc700f
01324d6
27f6a59
da0c021
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| package hotstuff | ||
|
|
||
| import "github.com/onflow/flow-go/consensus/hotstuff/model" | ||
|
|
||
| type Distributor interface { | ||
| AddOnBlockFinalizedConsumer(consumer func(block *model.Block)) | ||
| AddOnBlockIncorporatedConsumer(consumer func(block *model.Block)) | ||
| } |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| package collection_sync | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do you think about structuring this like
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’m open to reorganizing things if needed or make more sense in the optimistic syncing PR. For now, I’m keeping the structure relatively flat while grouping related logic together. Receipts, blocks, and collections each have different syncing approaches. Block syncing is handled entirely by the follower engine across all node types, so I’m not sure it even needs to be part of ingestion. Receipt ingestion is currently disabled because block syncing already ingests receipts. So at the moment, only collection syncing is enabled. |
||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "github.com/onflow/flow-go/model/flow" | ||
| "github.com/onflow/flow-go/module/component" | ||
| "github.com/onflow/flow-go/module/irrecoverable" | ||
| ) | ||
|
|
||
| // Tracks missing collections per height and invokes job callbacks when complete. | ||
| type MissingCollectionQueue interface { | ||
| EnqueueMissingCollections(blockHeight uint64, ids []flow.Identifier, callback func()) error | ||
| OnIndexedForBlock(blockHeight uint64) (func(), bool) | ||
|
|
||
| // On receipt of a collection, MCQ updates internal state and, if a block | ||
| // just became complete, returns: (collections, height, true). | ||
| // Otherwise, returns (nil, 0, false). | ||
| OnReceivedCollection(collection *flow.Collection) ([]*flow.Collection, uint64, bool) | ||
|
|
||
| // IsHeightQueued returns true if the given height is still being tracked (has not been indexed yet). | ||
| IsHeightQueued(height uint64) bool | ||
|
|
||
| // Size returns the number of missing collections currently in the queue. | ||
| Size() uint | ||
| } | ||
|
|
||
| // Requests collections by their IDs. | ||
| type CollectionRequester interface { | ||
| RequestCollections(ids []flow.Identifier) error | ||
| } | ||
|
|
||
| // BlockCollectionIndexer stores and indexes collections for a given block height. | ||
| type BlockCollectionIndexer interface { | ||
| // IndexCollectionsForBlock stores and indexes collections for a given block height. | ||
| // No error is exepcted during normal operation. | ||
| IndexCollectionsForBlock(blockHeight uint64, cols []*flow.Collection) error | ||
|
|
||
| // GetMissingCollections retrieves the block and returns collection guarantees that whose collections | ||
| // are missing in storage. | ||
| // Only garantees whose collections that are not already in storage are returned. | ||
| GetMissingCollections(block *flow.Block) ([]*flow.CollectionGuarantee, error) | ||
| } | ||
|
|
||
| // Implements the job lifecycle for a single block height. | ||
| type BlockProcessor interface { | ||
| FetchCollections(ctx irrecoverable.SignalerContext, block *flow.Block, done func()) error | ||
| // MissingCollectionQueueSize returns the number of missing collections currently in the queue. | ||
| MissingCollectionQueueSize() uint | ||
| } | ||
|
|
||
| // Fetcher is a component that consumes finalized block jobs and processes them | ||
| // to index collections. It uses a job consumer with windowed throttling to prevent node overload. | ||
| type Fetcher interface { | ||
| component.Component | ||
| ProgressReader | ||
| OnFinalizedBlock() | ||
| Size() uint | ||
| } | ||
|
|
||
| // ExecutionDataProvider provides the latest height for which execution data indexer has collections. | ||
| // This can be nil if execution data indexing is disabled. | ||
| type ExecutionDataProvider interface { | ||
| HighestIndexedHeight() uint64 | ||
| GetExecutionDataByHeight(ctx context.Context, height uint64) ([]*flow.Collection, error) | ||
| } | ||
|
|
||
| // ExecutionDataProcessor processes execution data when new execution data is available. | ||
| type ExecutionDataProcessor interface { | ||
| OnNewExectuionData() | ||
| } | ||
|
|
||
| // ProgressReader provides the current progress of collection fetching/indexing. | ||
| type ProgressReader interface { | ||
| ProcessedHeight() uint64 | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,130 @@ | ||||||||||||
| package execution_data_index | ||||||||||||
|
|
||||||||||||
| import ( | ||||||||||||
| "fmt" | ||||||||||||
|
|
||||||||||||
| "github.com/rs/zerolog" | ||||||||||||
|
|
||||||||||||
| "github.com/onflow/flow-go/engine/access/collection_sync" | ||||||||||||
| "github.com/onflow/flow-go/module" | ||||||||||||
| "github.com/onflow/flow-go/module/component" | ||||||||||||
| "github.com/onflow/flow-go/module/counters" | ||||||||||||
| "github.com/onflow/flow-go/module/irrecoverable" | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| type ExecutionDataProcessor struct { | ||||||||||||
| component.Component | ||||||||||||
| log zerolog.Logger | ||||||||||||
| newExecutionDataIndexed chan struct{} | ||||||||||||
| provider collection_sync.ExecutionDataProvider | ||||||||||||
| indexer collection_sync.BlockCollectionIndexer | ||||||||||||
| metrics module.CollectionSyncMetrics | ||||||||||||
| // state | ||||||||||||
| processedHeight *counters.PersistentStrictMonotonicCounter | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| var _ collection_sync.ExecutionDataProcessor = (*ExecutionDataProcessor)(nil) | ||||||||||||
| var _ collection_sync.ProgressReader = (*ExecutionDataProcessor)(nil) | ||||||||||||
| var _ component.Component = (*ExecutionDataProcessor)(nil) | ||||||||||||
|
|
||||||||||||
| func NewExecutionDataProcessor( | ||||||||||||
| log zerolog.Logger, | ||||||||||||
| provider collection_sync.ExecutionDataProvider, | ||||||||||||
| indexer collection_sync.BlockCollectionIndexer, | ||||||||||||
| processedHeight *counters.PersistentStrictMonotonicCounter, | ||||||||||||
| metrics module.CollectionSyncMetrics, | ||||||||||||
| ) *ExecutionDataProcessor { | ||||||||||||
| edp := &ExecutionDataProcessor{ | ||||||||||||
| log: log.With().Str("component", "coll_sync_ed_processor").Logger(), | ||||||||||||
| newExecutionDataIndexed: make(chan struct{}, 1), | ||||||||||||
|
||||||||||||
| provider: provider, | ||||||||||||
| indexer: indexer, | ||||||||||||
| metrics: metrics, | ||||||||||||
| processedHeight: processedHeight, | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Initialize the channel so that even if no new execution data comes in, | ||||||||||||
| // the worker loop can still be triggered to process any existing data. | ||||||||||||
| edp.newExecutionDataIndexed <- struct{}{} | ||||||||||||
|
|
||||||||||||
| // Build component manager with worker loop | ||||||||||||
| cm := component.NewComponentManagerBuilder(). | ||||||||||||
| AddWorker(edp.workerLoop). | ||||||||||||
| Build() | ||||||||||||
|
|
||||||||||||
| edp.Component = cm | ||||||||||||
|
|
||||||||||||
| return edp | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| func (edp *ExecutionDataProcessor) OnNewExectuionData() { | ||||||||||||
| select { | ||||||||||||
| case edp.newExecutionDataIndexed <- struct{}{}: | ||||||||||||
| default: | ||||||||||||
| // if the channel is full, no need to block, just return. | ||||||||||||
| // once the worker loop processes the buffered signal, it will | ||||||||||||
| // process the next height all the way to the highest available height. | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| func (edp *ExecutionDataProcessor) workerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { | ||||||||||||
| ready() | ||||||||||||
|
|
||||||||||||
| // using a single threaded loop to index each execution for height | ||||||||||||
| // since indexing collections is blocking anyway, and reading the execution data | ||||||||||||
| // is quick, because we cache for 100 heights. | ||||||||||||
| for { | ||||||||||||
| select { | ||||||||||||
| case <-ctx.Done(): | ||||||||||||
| return | ||||||||||||
| case <-edp.newExecutionDataIndexed: | ||||||||||||
| highestAvailableHeight := edp.provider.HighestIndexedHeight() | ||||||||||||
| lowestMissing := edp.processedHeight.Value() + 1 | ||||||||||||
|
|
||||||||||||
| for height := lowestMissing; height <= highestAvailableHeight; height++ { | ||||||||||||
| collections, err := edp.provider.GetExecutionDataByHeight(ctx, height) | ||||||||||||
|
||||||||||||
| collections, err := edp.provider.GetExecutionDataByHeight(ctx, height) | |
| // TODO: This logic supports ingesting execution data from sealed blocks. Once support is | |
| // added for syncing execution data for unsealed results, this logic will need to be updated | |
| // to account for execution forks. | |
| collections, err := edp.provider.GetExecutionDataByHeight(ctx, height) |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // TODO: to support processing data from unsealed execution results, we should check here that the collections | |
| // returned match the guarantees in the block. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, i added the comments to engine/access/collection_sync/execution_data_index/provider.go#GetExecutionDataByHeight instead
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| package execution_data_index | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "github.com/onflow/flow-go/engine/access/collection_sync" | ||
| "github.com/onflow/flow-go/model/flow" | ||
| "github.com/onflow/flow-go/module/executiondatasync/execution_data" | ||
| "github.com/onflow/flow-go/module/state_synchronization" | ||
| ) | ||
|
|
||
| var _ collection_sync.ExecutionDataProvider = (*executionDataProvider)(nil) | ||
|
|
||
| // executionDataProvider implements ExecutionDataProvider by querying ExecutionDataCache. | ||
| type executionDataProvider struct { | ||
| cache execution_data.ExecutionDataCache | ||
| highestExectuionDataHeight state_synchronization.ExecutionDataIndexedHeight | ||
| } | ||
|
|
||
| // NewExecutionDataProvider creates a new ExecutionDataProvider that reads from the given ExecutionDataCache. | ||
| // The headers storage is used to determine the search range for finding available heights. | ||
| func NewExecutionDataProvider( | ||
| cache execution_data.ExecutionDataCache, | ||
| highestExectuionDataHeight state_synchronization.ExecutionDataIndexedHeight, | ||
| ) *executionDataProvider { | ||
| return &executionDataProvider{ | ||
| cache: cache, | ||
| highestExectuionDataHeight: highestExectuionDataHeight, | ||
| } | ||
| } | ||
|
|
||
| // HighestIndexedHeight returns the highest block height for which execution data is available. | ||
| func (p *executionDataProvider) HighestIndexedHeight() uint64 { | ||
| return p.highestExectuionDataHeight.HighestConsecutiveHeight() | ||
| } | ||
|
|
||
| // GetExecutionDataByHeight returns the execution data for the given block height. | ||
| func (p *executionDataProvider) GetExecutionDataByHeight(ctx context.Context, height uint64) ([]*flow.Collection, error) { | ||
| blockExecutionData, err := p.cache.ByHeight(ctx, height) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return blockExecutionData.StandardCollections(), nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collection indexer is replaced by the collection_sync fetcher (fetch from LN) or block processor (syncing from EN via execution data syncing)