Skip to content

Commit 5b28b8e

Browse files
committed
improve pending receipt handling
1 parent f16fbcb commit 5b28b8e

File tree

1 file changed

+31
-12
lines changed

1 file changed

+31
-12
lines changed

ethreceipts/subscription.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ const (
2121

2222
// After this many attempts, we give up retrying a receipt fetch.
2323
maxReceiptRetryAttempts = 20
24+
25+
// Maximum number of pending receipts to track for retries.
26+
maxPendingReceipts = 5000
2427
)
2528

2629
var (
@@ -56,7 +59,7 @@ type subscriber struct {
5659
finalizer *finalizer
5760
mu sync.Mutex
5861

59-
pendingReceipts map[common.Hash]pendingReceipt
62+
pendingReceipts map[common.Hash]*pendingReceipt
6063
retryMu sync.Mutex
6164
}
6265

@@ -150,7 +153,6 @@ func (s *subscriber) matchFilters(ctx context.Context, filterers []Filterer, rec
150153
filterer Filterer
151154
}
152155
var toFetch []matchedReceipt
153-
var mu sync.Mutex
154156

155157
// First pass: find all matches
156158
for _, receipt := range receipts {
@@ -238,9 +240,7 @@ func (s *subscriber) matchFilters(ctx context.Context, filterers []Filterer, rec
238240
}
239241

240242
// Broadcast to subscribers (needs mutex as multiple goroutines may send)
241-
mu.Lock()
242243
s.ch.Send(item.receipt)
243-
mu.Unlock()
244244

245245
return nil
246246
})
@@ -298,15 +298,24 @@ func (s *subscriber) addPendingReceipt(receipt Receipt, filterer Filterer) {
298298

299299
if s.pendingReceipts == nil {
300300
// lazy init
301-
s.pendingReceipts = make(map[common.Hash]pendingReceipt)
301+
s.pendingReceipts = make(map[common.Hash]*pendingReceipt)
302+
}
303+
304+
if len(s.pendingReceipts) >= maxPendingReceipts {
305+
s.listener.log.Error(
306+
"Pending receipts queue is full, dropping new receipt",
307+
"txHash", txHash.String(),
308+
"queueSize", len(s.pendingReceipts),
309+
)
310+
return
302311
}
303312

304313
if _, exists := s.pendingReceipts[txHash]; exists {
305314
// already pending, skip
306315
return
307316
}
308317

309-
s.pendingReceipts[receipt.TransactionHash()] = pendingReceipt{
318+
s.pendingReceipts[txHash] = &pendingReceipt{
310319
receipt: receipt,
311320
filterer: filterer,
312321
attempts: 1,
@@ -320,11 +329,15 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) {
320329
s.retryMu.Lock()
321330

322331
// Create a snapshot of receipts that are due for retry
323-
var toRetry []pendingReceipt
332+
var toRetry []*pendingReceipt
324333
now := time.Now()
325334

326335
for _, pending := range s.pendingReceipts {
327336
if now.After(pending.nextRetryAt) {
337+
// Claim this item for retry by pushing the nextRetryAt into the future,
338+
// this prevents other concurrent retryPendingReceipts calls from picking
339+
// it up.
340+
pending.nextRetryAt = time.Now().Add(10 * time.Minute)
328341
toRetry = append(toRetry, pending)
329342
}
330343
}
@@ -342,13 +355,19 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) {
342355

343356
for _, pending := range toRetry {
344357
wg.Add(1)
345-
go func(p pendingReceipt) {
358+
go func(p *pendingReceipt) {
346359
defer wg.Done()
347360

348361
select {
349362
case sem <- struct{}{}:
350363
defer func() { <-sem }()
351364
case <-ctx.Done():
365+
// If context is cancelled, release the claim so the item can be retried later.
366+
s.retryMu.Lock()
367+
if current, ok := s.pendingReceipts[p.receipt.TransactionHash()]; ok && current == p {
368+
current.nextRetryAt = time.Now() // Reschedule immediately.
369+
}
370+
s.retryMu.Unlock()
352371
return
353372
}
354373

@@ -359,9 +378,10 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) {
359378
s.retryMu.Lock()
360379
defer s.retryMu.Unlock()
361380

381+
// Check if the item still exists and is the same one we claimed.
362382
currentPending, exists := s.pendingReceipts[txHash]
363-
if !exists {
364-
s.listener.log.Warn("Pending receipt no longer exists in map, skipping", "txHash", txHash.String())
383+
if !exists || currentPending != p {
384+
s.listener.log.Debug("Pending receipt is stale or already processed, skipping retry", "txHash", txHash.String())
365385
return
366386
}
367387

@@ -373,7 +393,7 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) {
373393
return
374394
}
375395

376-
// Provider error - update retry state from the current state in the map
396+
// Provider error - update retry state directly on the pointer.
377397
currentPending.attempts++
378398
if currentPending.attempts >= maxReceiptRetryAttempts {
379399
delete(s.pendingReceipts, txHash)
@@ -394,7 +414,6 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) {
394414
backoff = maxWaitBetweenRetries
395415
}
396416
currentPending.nextRetryAt = time.Now().Add(backoff)
397-
s.pendingReceipts[txHash] = currentPending
398417

399418
s.listener.log.Debug(
400419
"Receipt fetch failed, will retry",

0 commit comments

Comments
 (0)