-
Notifications
You must be signed in to change notification settings - Fork 27
fix: prevent concurrent gRPC Send() on blockchain subscription streams #613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,10 +71,12 @@ type Job struct { | |
| // and the subtree processor, including an error channel for asynchronous result reporting. | ||
|
|
||
| type NewSubtreeRequest struct { | ||
| Subtree *subtreepkg.Subtree // The subtree to process | ||
| ParentTxMap TxInpointsMap // Map of parent transactions | ||
| SkipNotification bool // Whether to skip notification to the network | ||
| ErrChan chan error // Channel for error reporting | ||
| Subtree *subtreepkg.Subtree // The subtree to process | ||
| ParentTxMap TxInpointsMap // Map of parent transactions | ||
| DeletedTxs *txmap.SyncedMap[chainhash.Hash, subtreepkg.TxInpoints] // Backup map for deleted transactions | ||
| SkipNotification bool // Whether to skip notification to the network | ||
| ErrChan chan error // Channel for error reporting | ||
| OnStorageComplete func() // Called when storage completes to trigger cleanup | ||
| } | ||
|
|
||
| // moveBlockRequest represents a request to move a block in the chain. | ||
|
|
@@ -252,6 +254,10 @@ type SubtreeProcessor struct { | |
| // currentTxMap tracks transactions currently held in the subtree processor | ||
| currentTxMap TxInpointsMap | ||
|
|
||
| // deletedTxs stores transaction parent info for recently deleted transactions | ||
| // This provides a backup/fallback for Server when transactions are deleted during async storage | ||
| deletedTxs *txmap.SyncedMap[chainhash.Hash, subtreepkg.TxInpoints] | ||
|
|
||
| // removeMap tracks transactions marked for removal | ||
| removeMap txmap.TxMap | ||
|
|
||
|
|
@@ -428,6 +434,7 @@ func NewSubtreeProcessor(_ context.Context, logger ulogger.Logger, tSettings *se | |
| chainedSubtreeCount: atomic.Int32{}, | ||
| queue: queue, | ||
| currentTxMap: NewSplitTxInpointsMap(splitMapBuckets), | ||
| deletedTxs: txmap.NewSyncedMap[chainhash.Hash, subtreepkg.TxInpoints](), | ||
| removeMap: txmap.NewSplitSwissMap(256, 16), | ||
| blockchainClient: blockchainClient, | ||
| subtreeStore: subtreeStore, | ||
|
|
@@ -553,7 +560,11 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { | |
| send := NewSubtreeRequest{ | ||
| Subtree: incompleteSubtree, | ||
| ParentTxMap: stp.currentTxMap, | ||
| DeletedTxs: stp.deletedTxs, | ||
| ErrChan: make(chan error), | ||
| OnStorageComplete: func() { | ||
| stp.cleanupDeletedTxs(incompleteSubtree) | ||
| }, | ||
| } | ||
|
|
||
| // Send announcement, respecting context cancellation | ||
|
|
@@ -751,7 +762,11 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { | |
| send := NewSubtreeRequest{ | ||
| Subtree: incompleteSubtree, | ||
| ParentTxMap: stp.currentTxMap, | ||
| DeletedTxs: stp.deletedTxs, | ||
| ErrChan: make(chan error), | ||
| OnStorageComplete: func() { | ||
| stp.cleanupDeletedTxs(incompleteSubtree) | ||
| }, | ||
| } | ||
|
|
||
| // Send announcement, but respect context cancellation | ||
|
|
@@ -977,6 +992,27 @@ func (stp *SubtreeProcessor) createIncompleteSubtreeCopy() (*subtreepkg.Subtree, | |
| return incompleteSubtree, nil | ||
| } | ||
|
|
||
| // cleanupDeletedTxs performs actual deletion from currentTxMap for transactions | ||
| // that were previously soft-deleted. Called after subtree storage completes. | ||
| // Only deletes if the transaction is still marked as deleted (not re-added). | ||
| // | ||
| // This function is called via the OnStorageComplete callback to safely remove | ||
| // transactions that were marked for deletion while the subtree was being stored. | ||
| // | ||
|
Comment on lines
+995
to
+1001
|
||
| // Parameters: | ||
| // - subtree: The subtree whose soft-deleted transactions should be cleaned up | ||
| func (stp *SubtreeProcessor) cleanupDeletedTxs(subtree *subtreepkg.Subtree) { | ||
| if stp.deletedTxs == nil { | ||
| return | ||
| } | ||
| for _, node := range subtree.Nodes { | ||
| if !node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { | ||
| // Remove from deletedTxs backup map (transaction data no longer needed after storage) | ||
| stp.deletedTxs.Delete(node.Hash) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // GetCurrentRunningState returns the current operational state of the processor. | ||
| // | ||
| // Returns: | ||
|
|
@@ -1758,8 +1794,12 @@ func (stp *SubtreeProcessor) processCompleteSubtree(skipNotification bool) (err | |
| stp.newSubtreeChan <- NewSubtreeRequest{ | ||
| Subtree: oldSubtree, | ||
| ParentTxMap: stp.currentTxMap, | ||
| DeletedTxs: stp.deletedTxs, | ||
| SkipNotification: skipNotification, | ||
| ErrChan: errCh, | ||
| OnStorageComplete: func() { | ||
| stp.cleanupDeletedTxs(oldSubtree) | ||
| }, | ||
| } | ||
|
|
||
| // wait for the writing of the subtree to complete in a separate goroutine | ||
|
|
@@ -1979,7 +2019,10 @@ func (stp *SubtreeProcessor) removeTxFromSubtrees(ctx context.Context, hash chai | |
| } | ||
|
|
||
| if foundIndex >= 0 { | ||
| // remove tx from the currentTxMap | ||
| // Save to deleted backup map before removing (for Server fallback during async storage) | ||
| if txInpoints, found := stp.currentTxMap.Get(hash); found { | ||
| stp.deletedTxs.Set(hash, *txInpoints) | ||
| } | ||
| stp.currentTxMap.Delete(hash) | ||
|
Comment on lines
+2022
to
2026
|
||
|
|
||
| // we found the transaction in a subtree | ||
|
|
@@ -2048,7 +2091,10 @@ func (stp *SubtreeProcessor) removeTxsFromSubtrees(ctx context.Context, hashes [ | |
| } | ||
|
|
||
| if foundIndex >= 0 { | ||
| // remove tx from the currentTxMap | ||
| // Save to deleted backup map before removing (for Server fallback during async storage) | ||
| if txInpoints, found := stp.currentTxMap.Get(hash); found { | ||
| stp.deletedTxs.Set(hash, *txInpoints) | ||
| } | ||
| stp.currentTxMap.Delete(hash) | ||
|
|
||
| // we found the transaction in a subtree | ||
|
|
@@ -2150,13 +2196,22 @@ func (stp *SubtreeProcessor) reChainSubtrees(fromIndex int) error { | |
| return errors.NewProcessingError("error getting txInpoints from currentTxMap for %s", node.Hash.String()) | ||
| } | ||
|
|
||
| // Remove from currentTxMap so addNode won't skip it as a duplicate | ||
| // Save to deleted backup before removing (protects brief window during delete-and-readd) | ||
| stp.deletedTxs.Set(node.Hash, *parents) | ||
|
|
||
| // Delete from currentTxMap so addNode won't skip it as a duplicate | ||
| stp.currentTxMap.Delete(node.Hash) | ||
|
|
||
| // Immediately re-add the node | ||
| // Re-add the node (adds back to currentTxMap) | ||
| if err = stp.addNode(node, parents, true); err != nil { | ||
| // Restore to currentTxMap to avoid inconsistent state | ||
| stp.currentTxMap.Set(node.Hash, parents) | ||
| stp.deletedTxs.Delete(node.Hash) | ||
| return errors.NewProcessingError("error adding node to subtree", err) | ||
| } | ||
|
|
||
| // Clear from deletedTxs (transaction successfully re-added, no longer deleted) | ||
| stp.deletedTxs.Delete(node.Hash) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -2658,7 +2713,15 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* | |
| // this will also store it by the Server in the subtree store | ||
| for _, subtree := range stp.chainedSubtrees { | ||
| errCh := make(chan error) | ||
| stp.newSubtreeChan <- NewSubtreeRequest{Subtree: subtree, ParentTxMap: stp.currentTxMap, ErrChan: errCh} | ||
| stp.newSubtreeChan <- NewSubtreeRequest{ | ||
| Subtree: subtree, | ||
| ParentTxMap: stp.currentTxMap, | ||
| DeletedTxs: stp.deletedTxs, | ||
| ErrChan: errCh, | ||
| OnStorageComplete: func() { | ||
| stp.cleanupDeletedTxs(subtree) | ||
| }, | ||
| } | ||
|
|
||
| if err = <-errCh; err != nil { | ||
| return errors.NewProcessingError("[reorgBlocks] error sending subtree to newSubtreeChan", err) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OnStorageComplete is only invoked by the coordinator goroutine after subtree/meta workers complete. If storeSubtreeData returns early (e.g., subtree already exists / setup error before goroutines start), the callback never runs, which can leave deletedTxs entries around longer than intended. Consider ensuring OnStorageComplete is invoked (or explicitly skipped) on all return paths where the subtree is already stored or storage is aborted, so cleanup semantics are predictable.