fix: prevent concurrent gRPC Send() on blockchain subscription streams#613
fix: prevent concurrent gRPC Send() on blockchain subscription streams#613freemans13 wants to merge 2 commits intobsv-blockchain:mainfrom
Conversation
Co-authored-by: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
|
🤖 Claude Code Review Status: Complete SummaryThis PR correctly fixes two critical concurrency bugs:
Both fixes are well-implemented with comprehensive test coverage. The synchronous notification delivery may introduce performance considerations for high subscriber counts, but correctness takes precedence. History:
|
|
|
||
| // cleanupDeletedTxs performs actual deletion from currentTxMap for transactions | ||
| // that were previously soft-deleted. Called after subtree storage completes. | ||
| // Only deletes if the transaction is still marked as deleted (not re-added). |
There was a problem hiding this comment.
✅ Resolved: Comment updated at line 1010 to accurately describe behavior: "Remove from deletedTxs backup map (transaction data no longer needed after storage)"
The rpcCallCache uses ttlcache with a 10s TTL, but without DisableTouchOnHit. By default, every Get() resets the TTL timer. When coinbase polls getinfo or getbestblockhash every 5s, the cache entry is touched before it expires, keeping stale data alive forever. Adding WithDisableTouchOnHit ensures entries expire exactly 10s after creation regardless of how often they are read. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
There was a problem hiding this comment.
Pull request overview
This PR addresses a critical reliability issue in the blockchain subscription system where concurrent gRPC Send() calls on the same stream could corrupt the stream and effectively stall downstream components that depend on block notifications.
Changes:
- Serialize blockchain subscription notifications (and initial notification) to avoid concurrent gRPC
Send()on a single server stream. - Add tests demonstrating the concurrent-send race and the expected serial-send behavior.
- Introduce a
DeletedTxsfallback +OnStorageCompletecallback for subtree meta creation to tolerate parent-map mutations during async storage.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| services/blockchain/Server.go | Sends subscription notifications synchronously; sends initial notification before adding subscriber; defers removal of dead subscribers. |
| services/blockchain/subscription_race_test.go | Adds tests simulating concurrent vs serial Send() patterns. |
| services/blockassembly/subtreeprocessor/SubtreeProcessor.go | Adds deletedTxs tracking + request fields to support BlockAssembly fallback and cleanup callback. |
| services/blockassembly/Server.go | Falls back to DeletedTxs when ParentTxMap is missing entries; invokes OnStorageComplete after storage completes. |
| services/blockassembly/server_test.go | Updates tests for new request fields and adds coverage for DeletedTxs fallback behavior. |
| services/blockassembly/subtreeprocessor/subtree_size_benchmark_test.go | Updates benchmark request literal to include new fields. |
| services/rpc/handlers.go | Disables “touch on hit” for the RPC TTL cache (behavior change, unrelated to the subscription fix). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Queue dead subscribers for removal | ||
| for _, s := range dead { | ||
| b.deadSubscriptions <- s |
There was a problem hiding this comment.
Potential deadlock: startSubscriptions now sends into b.deadSubscriptions from the same goroutine that drains it. If more than cap(deadSubscriptions) subscribers fail during one notification, the loop at the end of this case will block before returning to the select, and dead subscribers will never be drained. Consider removing dead subscribers directly (e.g., delete from b.subscribers under a write lock and close done) instead of enqueueing to b.deadSubscriptions here, or otherwise ensure these sends cannot block (e.g., unbounded queue/non-blocking send).
| // Queue dead subscribers for removal | |
| for _, s := range dead { | |
| b.deadSubscriptions <- s | |
| // Remove dead subscribers directly to avoid potential deadlock on | |
| // b.deadSubscriptions when many subscribers fail in one notification. | |
| if len(dead) > 0 { | |
| b.subscribersMu.Lock() | |
| for _, s := range dead { | |
| delete(b.subscribers, s) | |
| safeClose(s.done) | |
| } | |
| remaining := len(b.subscribers) | |
| b.subscribersMu.Unlock() | |
| b.logger.Infof("[Blockchain][startSubscriptions] Removed %d dead subscription(s) (Total=%d).", len(dead), remaining) |
| b.subscribersMu.RLock() | ||
| // Collect dead subscribers to remove after releasing the read lock | ||
| var dead []subscriber | ||
| for sub := range b.subscribers { | ||
| b.logger.Debugf("[Blockchain][startSubscriptions] Sending notification to %s in background: %s", sub.source, notification.Stringify()) | ||
|
|
||
| go func(s subscriber) { | ||
| b.logger.Debugf("[Blockchain][startSubscriptions] Sending notification to %s: %s", s.source, notification.Stringify()) | ||
|
|
||
| if err := s.subscription.Send(notification); err != nil { | ||
| b.deadSubscriptions <- s | ||
| } | ||
| }(sub) | ||
| b.logger.Debugf("[Blockchain][startSubscriptions] Sending notification to %s: %s", sub.source, notification.Stringify()) | ||
|
|
||
| // Send synchronously — NOT in a goroutine. Concurrent Send() calls | ||
| // on the same gRPC ServerStream corrupt the stream, causing the | ||
| // subscriber to be silently dropped and never receive notifications. | ||
| if err := sub.subscription.Send(notification); err != nil { | ||
| dead = append(dead, sub) | ||
| } | ||
| } | ||
| b.subscribersMu.RUnlock() |
There was a problem hiding this comment.
b.subscribersMu.RLock is held while calling sub.subscription.Send(notification). Since Send may block on network backpressure, this can stall writers (new/dead subscription handling) and prolong read-lock hold time across potentially many subscribers. Safer pattern is to snapshot current subscribers under the RLock (copy keys to a slice), release the lock, then iterate and Send without holding the lock.
| // Send initial notification BEFORE adding to the subscribers map. | ||
| // This prevents concurrent Send() between sendInitialNotification | ||
| // and the notification delivery loop above. | ||
| b.sendInitialNotification(s) | ||
|
|
There was a problem hiding this comment.
sendInitialNotification can enqueue to b.deadSubscriptions on Send() error. Because this is now called synchronously from the subscription manager goroutine, a full deadSubscriptions buffer would block here and prevent the manager from draining the channel. Since the subscriber hasn't been added to the map yet, consider having sendInitialNotification return an error (or bool) and let this case close s.done / skip adding the subscriber without sending to deadSubscriptions.
| // Trigger cleanup of soft-deleted transactions | ||
| if subtreeRequest.OnStorageComplete != nil { | ||
| subtreeRequest.OnStorageComplete() | ||
| } |
There was a problem hiding this comment.
OnStorageComplete is only invoked by the coordinator goroutine after subtree/meta workers complete. If storeSubtreeData returns early (e.g., subtree already exists / setup error before goroutines start), the callback never runs, which can leave deletedTxs entries around longer than intended. Consider ensuring OnStorageComplete is invoked (or explicitly skipped) on all return paths where the subtree is already stored or storage is aborted, so cleanup semantics are predictable.
| for i := 0; i < 100; i++ { | ||
| wg.Add(2) | ||
| // Goroutine 1: simulates sendInitialNotification (old line 698: go b.sendInitialNotification(s)) | ||
| go func() { | ||
| defer wg.Done() | ||
| _ = mock.Send(notification) | ||
| }() | ||
| // Goroutine 2: simulates regular notification send (old line 679: go func(s subscriber) { s.subscription.Send(...) }) | ||
| go func() { | ||
| defer wg.Done() | ||
| _ = mock.Send(notification) | ||
| }() | ||
| } | ||
| wg.Wait() | ||
|
|
||
| if !mock.raceDetected.Load() { | ||
| t.Skip("Race condition not triggered in this run (timing dependent)") |
There was a problem hiding this comment.
This test is timing-dependent and can silently skip, which makes CI flaky and can mask regressions (it might pass without ever demonstrating the race). Consider making the overlap deterministic (e.g., use a start barrier so both goroutines enter Send() simultaneously) and fail the test if concurrent Send isn't detected under the forced conditions.
| for i := 0; i < 100; i++ { | |
| wg.Add(2) | |
| // Goroutine 1: simulates sendInitialNotification (old line 698: go b.sendInitialNotification(s)) | |
| go func() { | |
| defer wg.Done() | |
| _ = mock.Send(notification) | |
| }() | |
| // Goroutine 2: simulates regular notification send (old line 679: go func(s subscriber) { s.subscription.Send(...) }) | |
| go func() { | |
| defer wg.Done() | |
| _ = mock.Send(notification) | |
| }() | |
| } | |
| wg.Wait() | |
| if !mock.raceDetected.Load() { | |
| t.Skip("Race condition not triggered in this run (timing dependent)") | |
| startCh := make(chan struct{}) | |
| for i := 0; i < 100; i++ { | |
| wg.Add(2) | |
| // Goroutine 1: simulates sendInitialNotification (old line 698: go b.sendInitialNotification(s)) | |
| go func() { | |
| defer wg.Done() | |
| <-startCh | |
| _ = mock.Send(notification) | |
| }() | |
| // Goroutine 2: simulates regular notification send (old line 679: go func(s subscriber) { s.subscription.Send(...) }) | |
| go func() { | |
| defer wg.Done() | |
| <-startCh | |
| _ = mock.Send(notification) | |
| }() | |
| } | |
| // Release all goroutines at once to force overlapping Send() calls. | |
| close(startCh) | |
| wg.Wait() | |
| if !mock.raceDetected.Load() { | |
| t.Fatalf("expected concurrent Send() calls to be detected under forced conditions, but none were") |
| // cleanupDeletedTxs performs actual deletion from currentTxMap for transactions | ||
| // that were previously soft-deleted. Called after subtree storage completes. | ||
| // Only deletes if the transaction is still marked as deleted (not re-added). | ||
| // | ||
| // This function is called via the OnStorageComplete callback to safely remove | ||
| // transactions that were marked for deletion while the subtree was being stored. | ||
| // |
There was a problem hiding this comment.
The comment for cleanupDeletedTxs says it "performs actual deletion from currentTxMap", but the function only deletes entries from stp.deletedTxs. Please update the comment to match the implementation (or adjust the implementation if currentTxMap cleanup is intended).
| // Save to deleted backup map before removing (for Server fallback during async storage) | ||
| if txInpoints, found := stp.currentTxMap.Get(hash); found { | ||
| stp.deletedTxs.Set(hash, *txInpoints) | ||
| } | ||
| stp.currentTxMap.Delete(hash) |
There was a problem hiding this comment.
deletedTxs entries added here are only removed when cleanupDeletedTxs is invoked for a stored subtree containing the tx hash. If a tx is removed after its subtree was already stored (e.g., removing from chainedSubtrees during rechain), the entry can remain in deletedTxs indefinitely, causing unbounded growth over time. Consider adding a TTL/size limit for deletedTxs, or ensuring entries are removed once it's known no in-flight storage will reference them.
| c := ttlcache.New[string, any]( | ||
| ttlcache.WithTTL[string, any](10*time.Second), | ||
| ttlcache.WithDisableTouchOnHit[string, any](), | ||
| ) |
There was a problem hiding this comment.
This changes rpcCallCache semantics by disabling touch-on-hit, which is unrelated to the PR’s stated goal (gRPC subscription Send concurrency). If this behavior change is intentional, it likely deserves a brief note in the PR description; otherwise consider moving it to a separate PR to keep this fix scoped and reduce risk.


Summary
Send()calls on the same gRPCServerStreamSend()corrupts the stream silently, causing subsequent sends to failRecv()never gets an error, so it blocks forever without reconnectingFix
Test plan
TestSubscriptionConcurrentSendRace: proves the old goroutine-based approach triggers concurrentSend()on the same stream (200/200 races detected)TestSubscriptionSerialSend: proves the fixed synchronous approach has zero concurrent access (101/101 serial)generate, verify RPC height updates without bouncing pods🤖 Generated with Claude Code