-
Notifications
You must be signed in to change notification settings - Fork 162
feat(vmsync): dynamic state sync with coordinator, pivot cadence, and engine-driven target updates #1379
base: master
Are you sure you want to change the base?
Conversation
… engine-driven target updates - Add Coordinator to orchestrate dynamic state sync, enforce pivot cadence, and manage queue execution. - Introduce engine hook OnEngineAccept to enqueue accepted blocks and advance the sync target. - Implement pivot policy (every N blocks) and idempotence (skip behind/equal, allow same-height reorgs). resolves #1259 Signed-off-by: Tsvetan Dimitrov ([email protected])
plugin/evm/vmsync/client.go
Outdated
| return c.finishSync() | ||
| }, | ||
| ApplyBlock: func(b EthBlockWrapper) error { | ||
| return c.Chain.BlockChain().InsertBlock(b.GetEthBlock()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't sufficient. You need to do a very large portion of the operations in wrappedBlock.Verify() and wrappedBlock.Accept().
plugin/evm/vmsync/coordinator.go
Outdated
| co.finish(err) | ||
| return | ||
| } | ||
| // All syncers finished successfully: finalize VM and execute the queued batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have they finished? There might still be outstanding UpdateSyncTarget stuff (my solution was Finalize)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, I just forgot about the Finalize method. I'll add it.
| "github.com/ava-labs/coreth/plugin/evm/message" | ||
| ) | ||
|
|
||
| var _ message.Syncable = (*syncTarget)(nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is the right interface to satisfy. We should probably have the atomic root as well, but also, Accept is completely meaningless. This is intended to be something passed back to the engine to invoke the state sync at the beginning (this calls acceptSyncSummary currently)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I knew that this wasn't sufficient, but I used it as a placeholder until we finalize a better target struct.
| log.Warn("could not enqueue block for post-finalization execution", "hash", ethb.Hash(), "height", ethb.NumberU64()) | ||
| } | ||
|
|
||
| syncTarget := newSyncTarget(ethb.Hash(), ethb.Root(), ethb.NumberU64()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird we have to make two separate calls to the coordinator here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the pivot interval is managed by the client anyway, can you just put this UpdateSyncTarget stuff in the enqueueBlockOperation function?
When UpdateSyncTarget is called, remove all queued blocks with height <= new target height since they will never be executed. This prevents processing blocks that the sync has already advanced past. - Add RemoveBlocksBelowHeight method to blockQueue to filter stale blocks. - Call RemoveBlocksBelowHeight in UpdateSyncTarget after pivot check. - Support accept/reject/verify operations in block queue. - Add OnEngineReject and OnEngineVerify handlers to sync client. - Propagate context through ApplyQueuedBatch for proper cancellation. - Remove unnecessary defer vm.versiondb.Abort() from Accept. - Prevent recursion during batch execution via state check. - Make dequeueBatch private to reduce API surface. resolves #1259 Signed-off-by: Tsvetan Dimitrov ([email protected])
| } | ||
|
|
||
| // Skip notification if we're already processing the queue to avoid recursion. | ||
| if c.coordinator.CurrentState() == StateExecutingBatch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to enqueue it. Say there's 5 blocks left to process, we can't process this block as normal, since its state probably depends on theirs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, will you cleanup this function a little bit
plugin/evm/vmsync/client.go
Outdated
| ethb := b.GetEthBlock() | ||
| // Best-effort enqueue, ignored unless Running. | ||
| if ok := c.coordinator.AddBlockOperation(b, OpAccept); !ok { | ||
| // TODO(powerslider): should we return an error here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think yes
plugin/evm/vmsync/client.go
Outdated
| } | ||
|
|
||
| // Skip notification if we're already processing the queue to avoid recursion. | ||
| if c.coordinator.CurrentState() == StateExecutingBatch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot of duplicated logic in these calls, can we push them down into the coordinator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I know, I'll clean it up later.
plugin/evm/vmsync/coordinator.go
Outdated
| } | ||
| co.state.Store(int32(StateExecutingBatch)) | ||
| if co.queue != nil { | ||
| if err := co.queue.ProcessQueue(ctx); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is racy. We need to atomically switch from reprocessing the batches, enqueuing more as needed, to normal VM operations
plugin/evm/wrapped_block.go
Outdated
| func (b *wrappedBlock) Accept(ctx context.Context) error { | ||
| // Notify sync client that engine accepted a block. | ||
| if client := b.vm.SyncerClient(); client != nil { | ||
| if err := client.OnEngineAccept(b); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will work. I know you made the comment about recursively finding a sync client, but you might just need to pass in a callback. Like:
if err := client.OnEngineAccept(b, (*wrappedBlock).accept)); err != nil {
| } | ||
| } else { | ||
| // If there is no extension, we still need to apply the changes to the versionDB | ||
| if err := vdbBatch.Write(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is out of scope, but good catch. Maybe this should just go in a separate PR now
- Add context parameter to finishSync() and propagate through stateSyncStatic/Dynamic - Add context parameter to FinalizeVM callback in Coordinator - Add context parameter to ProcessQueuedBlockOperations (renamed from ApplyQueuedBatch) - Add context parameter to executeBlockOperationBatch (moved from blockQueue) - Propagate context through ProcessQueue operations - Add cancellation checks before expensive operations in finishSync() using declarative operation list pattern with runWithCancellationCheck helper. - Add cancellation checks in ProcessQueuedBlockOperations before state transitions. - Add cancellation checks in executeBlockOperationBatch loop using select pattern. - Improve error messages to include operation index and type for better debugging. Refactoring: - Move block operation processing logic from blockQueue to Coordinator (executeBlockOperationBatch) for better separation of concerns. - Simplify blockQueue to be a pure data structure (enqueue, dequeueBatch, removeBelowHeight). - Rename pivot.go to pivot_policy.go for clarity. - Remove cancel function from Coordinator struct, pass as parameter to finish(). Pivot Policy: - Add defaultPivotInterval constant (10000 blocks) in pivot_policy.go. - Apply default pivot interval when WithPivotInterval is not explicitly called. - Update newPivotPolicy to use default when interval is 0. This change enables graceful shutdown of state sync operations and ensures that cancellation signals propagate correctly through all layers of the dynamic state sync orchestration. resolves #1259 Signed-off-by: Tsvetan Dimitrov ([email protected])
plugin/evm/vmsync/pivot_policy.go
Outdated
| interval uint64 | ||
| // nextHeight is the next height threshold at or beyond which we | ||
| // should forward an update. A value of 0 means uninitialized. | ||
| nextHeight uint64 // accessed atomically |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use an atomic.Uint64 for clarity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll see if I can simplify this.
- Refactor block operation handling and error management to improve maintainability, reduce code duplication, and enhance type safety.
Enable blocks to be enqueued during StateExecutingBatch for processing in the next batch, while preventing recursion by skipping sync target updates during batch execution. Block Enqueuing During Batch Execution: - Update AddBlockOperation to allow enqueuing during both StateRunning and StateExecutingBatch states. - Remove early return check in enqueueBlockOperation that prevented enqueuing during batch execution. - Blocks enqueued during batch execution are automatically processed in the next batch (via dequeueBatch snapshot behavior). Prevent Recursion: - Skip UpdateSyncTarget in OnEngineAccept when state is StateExecutingBatch - Blocks are still enqueued during batch execution, but sync target updates are deferred to prevent recursion. - Add documentation explaining the behavior. Code Simplification: - Simplify finishSync cancellation checks from per-operation checks to single check at beginning. - Operations are not cancellable mid-execution, so single check is sufficient and more efficient. This change ensures blocks arriving during batch execution can be queued for the next batch (solving dependency issues) while maintaining fast consensus-critical paths and preventing recursion.
plugin/evm/vmsync/client.go
Outdated
| // During batch execution, only enqueuing occurs to prevent recursion. | ||
| func (c *client) OnEngineAccept(b EthBlockWrapper) error { | ||
| if !c.DynamicStateSyncEnabled || c.coordinator == nil { | ||
| ethb := c.enqueueBlockOperation(b, OpAccept) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this ethBlock?
|
|
||
| // Only update sync target on accept operations. | ||
| // Only update sync target on accept operations when not in batch execution. | ||
| syncTarget := newSyncTarget(ethb.Hash(), ethb.Root(), ethb.NumberU64()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need the atomic root? Not totally sure, still haven't attempted dynamic state sync for the atomic trie
| func runWithCancellationCheck(ctx context.Context, operationName string, op func() error) error { | ||
| if err := ctx.Err(); err != nil { | ||
| return fmt.Errorf("finishSync cancelled before %s: %w", operationName, err) | ||
| ethb := b.GetEthBlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to enqueue the wrappedBlock, right? At least so we can do the atomic ops, unless it's just a typecast away
…o sync flow - Add Finalize method to the Syncer interface and implement it across all syncer types. Integrate Finalize calls into both static and dynamic state sync flows to allow syncers to clean up their state before VM finalization. - Dynamic sync: Syncers complete -> Finalize syncers -> Finalize VM -> Execute batch - Static sync: Syncers complete -> Finalize syncers -> Finalize VM
…mic state sync Fix critical issues in dynamic state sync that could cause blocks to be processed twice and introduce race conditions. Improve error handling and state consistency throughout the sync flow. Double Execution Prevention: - Change OnEngineAccept/Reject/Verify to return (bool, error) indicating whether block was enqueued for deferred processing - Update wrapped_block.Accept/Reject/Verify to skip immediate execution when block is enqueued during dynamic sync which prevents blocks from being processed both immediately and from queue. Race Condition Fixes: - Add state re-check in UpdateSyncTarget before modifying queue to handle concurrent state transitions. - Prevent UpdateSyncTarget from being called during batch execution to avoid race with removeBelowHeight. State Consistency Improvements: - Set StateAborted on all error paths in ProcessQueuedBlockOperations before returning to ensure consistent state. - Add context checks at critical points (before FinalizeVM, before batch execution) to catch cancellations early. - Ensure state transitions are atomic with error handling. Error Handling Enhancements: - Improve OnEngineAccept error handling when UpdateSyncTarget fails - Return clear error message indicating block was enqueued but sync target update failed. - Reorder operations to check batch execution state before enqueuing.
| // was queued, false if the block is nil. | ||
| func (q *blockQueue) enqueue(b EthBlockWrapper, op BlockOperationType) bool { | ||
| if b == nil { | ||
| return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this reasonable and/or possible?
|
|
||
| // dequeueBatch returns the current buffered operations and clears the buffer. New | ||
| // arrivals after the snapshot are not included and remain buffered for later. | ||
| func (q *blockQueue) dequeueBatch() []blockOperation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with this model you'll still have a racy finish
| } | ||
|
|
||
| // Skip notification if we're already processing the queue to avoid recursion. | ||
| if c.coordinator.CurrentState() == StateExecutingBatch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, will you cleanup this function a little bit
| log.Warn("could not enqueue block for post-finalization execution", "hash", ethb.Hash(), "height", ethb.NumberU64()) | ||
| } | ||
|
|
||
| syncTarget := newSyncTarget(ethb.Hash(), ethb.Root(), ethb.NumberU64()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the pivot interval is managed by the client anyway, can you just put this UpdateSyncTarget stuff in the enqueueBlockOperation function?
| isResume := client.resumableSummary != nil && | ||
| proposedSummary.GetBlockHash() == client.resumableSummary.GetBlockHash() | ||
| func (c *client) acceptSyncSummary(proposedSummary message.Syncable) (block.StateSyncMode, error) { | ||
| // If dynamic sync is already running, treat new summaries as target updates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only called once by the engine. If it's called multiple times, that indicates a bug. You don't need to handle this case at all
| c.Chain.BloomIndexer().AddCheckpoint(parentHeight/params.BloomBitsBlocks, parentHash) | ||
|
|
||
| if err := client.Chain.BlockChain().ResetToStateSyncedBlock(block); err != nil { | ||
| if err := ctx.Err(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why check here?
| co.state.Store(int32(StateFinalizing)) | ||
|
|
||
| if co.callbacks.FinalizeVM != nil { | ||
| if err := ctx.Err(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your context checks are weird, I'm not sure I get the point
| // removeBelowHeight removes all queued blocks with height <= targetHeight. | ||
| // This is called after UpdateSyncTarget to remove blocks that will never be executed | ||
| // because the sync target has advanced past them. | ||
| func (q *blockQueue) removeBelowHeight(targetHeight uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the name makes me think height < targetHeight. Maybe just be open to changing the name in the future
| // executeBlockOperationBatch executes queued block operations in FIFO order. | ||
| // Partial completion is acceptable as operations are idempotent. | ||
| func (co *Coordinator) executeBlockOperationBatch(ctx context.Context) error { | ||
| operations := co.queue.dequeueBatch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the next batch? Blocks that are enqueued while this happens
| // Reject implements the snowman.Block interface | ||
| // If [b] contains an atomic transaction, attempt to re-issue it | ||
| func (b *wrappedBlock) Reject(context.Context) error { | ||
| func (b *wrappedBlock) Reject(_ context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You change this a lot. I think we prefer no name rather than _
| func (b *wrappedBlock) Reject(_ context.Context) error { | |
| func (b *wrappedBlock) Reject(context.Context) error { |
Why this should be merged
check ava-labs/avalanchego#4582
How this works
How this was tested
still not expliciitly
Need to be documented?
no
Need to update RELEASES.md?
no
resolves ava-labs/avalanchego#4582
Signed-off-by: Tsvetan Dimitrov ([email protected])