Skip to content

Commit f16fbcb

Browse files
committed
clean-up
1 parent 98740b3 commit f16fbcb

File tree

1 file changed

+10
-15
lines changed

1 file changed

+10
-15
lines changed

ethreceipts/ethreceipts.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -316,10 +316,6 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
316316

317317
sub := l.Subscribe(query)
318318

319-
// Use an internal context to coordinate cancellation and cleanup
320-
internalCtx, internalCancel := context.WithCancel(ctx)
321-
322-
// Use a WaitGroup to ensure the goroutine cleans up before the function returns
323319
var wg sync.WaitGroup
324320

325321
exhausted := make(chan struct{})
@@ -356,7 +352,6 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
356352
wg.Add(1)
357353
go func() {
358354
defer wg.Done()
359-
defer internalCancel()
360355
defer sub.Unsubscribe()
361356
defer close(mined)
362357
defer close(finalized)
@@ -370,7 +365,7 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
370365

371366
for {
372367
select {
373-
case <-internalCtx.Done():
368+
case <-ctx.Done():
374369
return
375370

376371
case <-sub.Done():
@@ -429,11 +424,9 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
429424
// Wait for the first mined receipt or an exit signal
430425
select {
431426
case <-ctx.Done():
432-
internalCancel()
433427
wg.Wait() // Ensure cleanup
434428
return nil, nil, ctx.Err()
435429
case <-sub.Done():
436-
internalCancel()
437430
wg.Wait() // Ensure cleanup
438431
return nil, nil, ErrSubscriptionClosed
439432
case <-exhausted:
@@ -443,7 +436,6 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
443436
case receipt, ok := <-mined:
444437
if !ok {
445438
// Mined channel closed without sending, implies goroutine exited early.
446-
internalCancel()
447439
wg.Wait() // Ensure cleanup
448440
// Check if exhaustion occurred
449441
select {
@@ -464,17 +456,19 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
464456
// this transaction hash.
465457
func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash common.Hash, forceFetch bool) (*types.Receipt, error) {
466458

459+
// TODO: this might block forever if all workers are busy, should we have a
460+
// timeout?
467461
l.fetchSem <- struct{}{}
468462

469-
resultCh := make(chan *types.Receipt)
470-
errCh := make(chan error)
463+
// channels to receive result or error: it could be difficult to coordinate
464+
// closing them manually because we're also selecting on ctx.Done(), so we
465+
// use buffered channels of size 1 and let them be garbage collected after
466+
// this function returns.
467+
resultCh := make(chan *types.Receipt, 1)
468+
errCh := make(chan error, 1)
471469

472470
go func() {
473-
474471
defer func() {
475-
close(resultCh)
476-
close(errCh)
477-
478472
<-l.fetchSem
479473
}()
480474

@@ -953,6 +947,7 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int {
953947
}
954948

955949
l.latestBlockNumTime.Store(time.Now().Unix())
950+
956951
latestBlockNum := l.fetchLatestBlockNum()
957952
if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 {
958953
l.latestBlockNumCache.Store(latestBlockNum.Uint64())

0 commit comments

Comments
 (0)