Skip to content

Commit f34dbda

Browse files
committed
fix race conditions
1 parent a2d4a0d commit f34dbda

File tree

3 files changed

+62
-34
lines changed

3 files changed

+62
-34
lines changed

ethreceipts/ethreceipts.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
316316

317317
sub := l.Subscribe(query)
318318

319+
// Use an internal context to coordinate cancellation and cleanup
320+
internalCtx, internalCancel := context.WithCancel(ctx)
321+
319322
// Use a WaitGroup to ensure the goroutine cleans up before the function returns
320323
var wg sync.WaitGroup
321324

@@ -353,6 +356,7 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
353356
wg.Add(1)
354357
go func() {
355358
defer wg.Done()
359+
defer internalCancel()
356360
defer sub.Unsubscribe()
357361
defer close(mined)
358362
defer close(finalized)
@@ -366,7 +370,7 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
366370

367371
for {
368372
select {
369-
case <-ctx.Done():
373+
case <-internalCtx.Done():
370374
return
371375

372376
case <-sub.Done():
@@ -425,9 +429,11 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
425429
// Wait for the first mined receipt or an exit signal
426430
select {
427431
case <-ctx.Done():
432+
internalCancel()
428433
wg.Wait() // Ensure cleanup
429434
return nil, nil, ctx.Err()
430435
case <-sub.Done():
436+
internalCancel()
431437
wg.Wait() // Ensure cleanup
432438
return nil, nil, ErrSubscriptionClosed
433439
case <-exhausted:
@@ -437,6 +443,7 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
437443
case receipt, ok := <-mined:
438444
if !ok {
439445
// Mined channel closed without sending, implies goroutine exited early.
446+
internalCancel()
440447
wg.Wait() // Ensure cleanup
441448
// Check if exhaustion occurred
442449
select {
@@ -667,9 +674,9 @@ func (l *ReceiptsListener) listener() error {
667674
if reorg {
668675
for _, list := range filters {
669676
for _, filterer := range list {
670-
if f, _ := filterer.(*filter); f != nil {
671-
f.startBlockNum = latestBlockNum
672-
f.lastMatchBlockNum = 0
677+
if f, ok := filterer.(*filter); ok {
678+
f.setStartBlockNum(latestBlockNum)
679+
f.setLastMatchBlockNum(0)
673680
}
674681
}
675682
}
@@ -686,12 +693,12 @@ func (l *ReceiptsListener) listener() error {
686693
for y, matched := range list {
687694
filterer := filters[x][y]
688695
if matched || filterer.StartBlockNum() == 0 {
689-
if f, _ := filterer.(*filter); f != nil {
690-
if f.startBlockNum == 0 {
691-
f.startBlockNum = latestBlockNum
696+
if f, ok := filterer.(*filter); ok {
697+
if f.StartBlockNum() == 0 {
698+
f.setStartBlockNum(latestBlockNum)
692699
}
693700
if matched {
694-
f.lastMatchBlockNum = latestBlockNum
701+
f.setLastMatchBlockNum(latestBlockNum)
695702
}
696703
}
697704
} else {
@@ -713,10 +720,8 @@ func (l *ReceiptsListener) listener() error {
713720
subscriber := subscribers[x]
714721
subscriber.RemoveFilter(filterer)
715722

716-
select {
717-
case <-f.Exhausted():
718-
default:
719-
close(f.exhausted)
723+
if f, ok := filterer.(*filter); ok {
724+
f.closeExhausted()
720725
}
721726
}
722727
}
@@ -883,7 +888,7 @@ func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *
883888
}
884889

885890
if f, ok := item.filterer.(*filter); ok {
886-
f.lastMatchBlockNum = r.BlockNumber.Uint64()
891+
f.setLastMatchBlockNum(r.BlockNumber.Uint64())
887892
}
888893

889894
receipt := Receipt{

ethreceipts/filterer.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ethreceipts
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/0xsequence/ethkit"
78
"github.com/0xsequence/ethkit/go-ethereum/core/types"
@@ -158,14 +159,16 @@ type filter struct {
158159
options FilterOptions
159160
cond FilterCond
160161

162+
mu sync.RWMutex
161163
// startBlockNum is the first block number observed once filter is active
162164
startBlockNum uint64
163165

164166
// lastMatchBlockNum is the block number where a last match occured
165167
lastMatchBlockNum uint64
166168

167169
// exhausted signals if the filter hit MaxWait
168-
exhausted chan struct{}
170+
exhausted chan struct{}
171+
exhaustedOnce sync.Once
169172
}
170173

171174
var (
@@ -254,13 +257,35 @@ func (f *filter) Match(ctx context.Context, receipt Receipt) (bool, error) {
254257
}
255258

256259
func (f *filter) StartBlockNum() uint64 {
260+
f.mu.RLock()
261+
defer f.mu.RUnlock()
257262
return f.startBlockNum
258263
}
259264

260265
func (f *filter) LastMatchBlockNum() uint64 {
266+
f.mu.RLock()
267+
defer f.mu.RUnlock()
261268
return f.lastMatchBlockNum
262269
}
263270

271+
func (f *filter) setStartBlockNum(num uint64) {
272+
f.mu.Lock()
273+
defer f.mu.Unlock()
274+
f.startBlockNum = num
275+
}
276+
277+
func (f *filter) setLastMatchBlockNum(num uint64) {
278+
f.mu.Lock()
279+
defer f.mu.Unlock()
280+
f.lastMatchBlockNum = num
281+
}
282+
283+
func (f *filter) closeExhausted() {
284+
f.exhaustedOnce.Do(func() {
285+
close(f.exhausted)
286+
})
287+
}
288+
264289
func (f *filter) Exhausted() <-chan struct{} {
265290
return f.exhausted
266291
}

ethreceipts/subscription.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (s *subscriber) addPendingReceipt(receipt Receipt, filterer Filterer) {
319319
func (s *subscriber) retryPendingReceipts(ctx context.Context) {
320320
s.retryMu.Lock()
321321

322-
// Collect receipts that are due for retry
322+
// Create a snapshot of receipts that are due for retry
323323
var toRetry []pendingReceipt
324324
now := time.Now()
325325

@@ -336,7 +336,7 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) {
336336

337337
s.listener.log.Info(fmt.Sprintf("ethreceipts: retrying %d pending receipts", len(toRetry)))
338338

339-
// Process retries concurrently with bounded parallelism
339+
// Collect receipts that are due for retry
340340
sem := make(chan struct{}, maxConcurrentReceiptRetries)
341341
var wg sync.WaitGroup
342342

@@ -359,46 +359,44 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) {
359359
s.retryMu.Lock()
360360
defer s.retryMu.Unlock()
361361

362-
if err != nil {
362+
currentPending, exists := s.pendingReceipts[txHash]
363+
if !exists {
364+
s.listener.log.Warn("Pending receipt no longer exists in map, skipping", "txHash", txHash.String())
365+
return
366+
}
363367

368+
if err != nil {
364369
if errors.Is(err, ethereum.NotFound) {
365370
// Transaction genuinely doesn't exist - remove from queue
366371
delete(s.pendingReceipts, txHash)
367-
s.listener.log.Debug("Receipt not found after retry, removing from queue",
368-
"txHash", txHash.String())
372+
s.listener.log.Debug("Receipt not found after retry, removing from queue", "txHash", txHash.String())
369373
return
370374
}
371375

372-
// Provider error - update retry state
373-
p.attempts++
374-
375-
// Check if max attempts reached
376-
if p.attempts >= maxReceiptRetryAttempts {
377-
// This will definitely remove the pending receipt, if it arrives
378-
// later subscribers won't get notified.
376+
// Provider error - update retry state from the current state in the map
377+
currentPending.attempts++
378+
if currentPending.attempts >= maxReceiptRetryAttempts {
379379
delete(s.pendingReceipts, txHash)
380380
s.listener.log.Error("Failed to fetch receipt after max retries",
381381
"txHash", txHash.String(),
382-
"attempts", p.attempts,
382+
"attempts", currentPending.attempts,
383383
"error", err)
384-
385384
// TODO: perhaps we should close the subscription here as we failed
386385
// to deliver a receipt after many attempts?
387386
return
388387
}
389388

390389
// Exponential backoff for next retry
391-
backoff := time.Duration(1<<uint(p.attempts)) * time.Second
390+
backoff := time.Duration(1<<uint(currentPending.attempts)) * time.Second
392391
if backoff > maxWaitBetweenRetries {
393392
backoff = maxWaitBetweenRetries
394393
}
395-
396-
p.nextRetryAt = time.Now().Add(backoff)
397-
s.pendingReceipts[txHash] = p
394+
currentPending.nextRetryAt = time.Now().Add(backoff)
395+
s.pendingReceipts[txHash] = currentPending
398396

399397
s.listener.log.Debug("Receipt fetch failed, will retry",
400398
"txHash", txHash.String(),
401-
"attempt", p.attempts,
399+
"attempt", currentPending.attempts,
402400
"nextRetryIn", backoff,
403401
)
404402
return
@@ -433,7 +431,7 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) {
433431

434432
s.listener.log.Info("Successfully fetched receipt after retry",
435433
"txHash", txHash.String(),
436-
"attempts", p.attempts)
434+
"attempts", currentPending.attempts)
437435
}(pending)
438436
}
439437

0 commit comments

Comments
 (0)