Skip to content

Commit 148d074

Browse files
committed
websocket: optimize receipts retrieval
1 parent 118155b commit 148d074

File tree

6 files changed

+101
-97
lines changed

6 files changed

+101
-97
lines changed

core/blockchain.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1690,7 +1690,12 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
16901690
// Set new head.
16911691
bc.writeHeadBlock(block)
16921692

1693-
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
1693+
bc.chainFeed.Send(ChainEvent{
1694+
Header: block.Header(),
1695+
Receipts: receipts,
1696+
Transactions: block.Transactions(),
1697+
})
1698+
16941699
if len(logs) > 0 {
16951700
bc.logsFeed.Send(logs)
16961701
}
@@ -2589,7 +2594,13 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
25892594

25902595
// Emit events
25912596
logs := bc.collectLogs(head, false)
2592-
bc.chainFeed.Send(ChainEvent{Header: head.Header()})
2597+
2598+
bc.chainFeed.Send(ChainEvent{
2599+
Header: head.Header(),
2600+
Receipts: bc.GetReceiptsByHash(head.Hash()),
2601+
Transactions: head.Transactions(),
2602+
})
2603+
25932604
if len(logs) > 0 {
25942605
bc.logsFeed.Send(logs)
25952606
}

core/events.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ type NewTxsEvent struct{ Txs []*types.Transaction }
2727
type RemovedLogsEvent struct{ Logs []*types.Log }
2828

2929
type ChainEvent struct {
30-
Header *types.Header
30+
Header *types.Header
31+
Receipts []*types.Receipt
32+
Transactions []*types.Transaction
3133
}
3234

3335
type ChainHeadEvent struct {

eth/filters/api.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -318,16 +318,16 @@ func (api *FilterAPI) TransactionReceipts(ctx context.Context, filter *Transacti
318318
}
319319

320320
var (
321-
rpcSub = notifier.CreateSubscription()
322-
filteredReceipts = make(chan []*ReceiptWithTx)
323-
txHashes []common.Hash
321+
rpcSub = notifier.CreateSubscription()
322+
matchedReceipts = make(chan []*ReceiptWithTx)
323+
txHashes []common.Hash
324324
)
325325

326326
if filter != nil {
327327
txHashes = filter.TransactionHashes
328328
}
329329

330-
receiptsSub := api.events.SubscribeTransactionReceipts(txHashes, filteredReceipts)
330+
receiptsSub := api.events.SubscribeTransactionReceipts(txHashes, matchedReceipts)
331331

332332
go func() {
333333
defer receiptsSub.Unsubscribe()
@@ -336,11 +336,11 @@ func (api *FilterAPI) TransactionReceipts(ctx context.Context, filter *Transacti
336336

337337
for {
338338
select {
339-
case receiptsWithTx := <-filteredReceipts:
340-
if len(receiptsWithTx) > 0 {
339+
case receiptsWithTxs := <-matchedReceipts:
340+
if len(receiptsWithTxs) > 0 {
341341
// Convert to the same format as eth_getTransactionReceipt
342-
marshaledReceipts := make([]map[string]interface{}, len(receiptsWithTx))
343-
for i, receiptWithTx := range receiptsWithTx {
342+
marshaledReceipts := make([]map[string]interface{}, len(receiptsWithTxs))
343+
for i, receiptWithTx := range receiptsWithTxs {
344344
marshaledReceipts[i] = ethapi.MarshalReceipt(
345345
receiptWithTx.Receipt,
346346
receiptWithTx.Receipt.BlockHash,

eth/filters/filter.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/ethereum/go-ethereum/common"
28+
"github.com/ethereum/go-ethereum/core"
2829
"github.com/ethereum/go-ethereum/core/filtermaps"
2930
"github.com/ethereum/go-ethereum/core/history"
3031
"github.com/ethereum/go-ethereum/core/types"
@@ -551,3 +552,65 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
551552
}
552553
return true
553554
}
555+
556+
// ReceiptWithTx contains a receipt and its corresponding transaction
557+
type ReceiptWithTx struct {
558+
Receipt *types.Receipt
559+
Transaction *types.Transaction
560+
}
561+
562+
// filterReceipts returns the receipts matching the given criteria
563+
// In addition to returning receipts, it also returns the corresponding transactions.
564+
// This is because receipts only contain low-level data, while user-facing data
565+
// may require additional information from the Transaction.
566+
func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx {
567+
var ret []*ReceiptWithTx
568+
569+
receipts := ev.Receipts
570+
txs := ev.Transactions
571+
572+
if len(txHashes) == 0 {
573+
// No filter, send all receipts with their transactions.
574+
ret = make([]*ReceiptWithTx, len(receipts))
575+
for i, receipt := range receipts {
576+
ret[i] = &ReceiptWithTx{
577+
Receipt: receipt,
578+
Transaction: txs[i],
579+
}
580+
}
581+
} else if len(txHashes) == 1 {
582+
// Filter by single transaction hash.
583+
// This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization.
584+
for i, receipt := range receipts {
585+
if receipt.TxHash == (txHashes)[0] {
586+
ret = append(ret, &ReceiptWithTx{
587+
Receipt: receipt,
588+
Transaction: txs[i],
589+
})
590+
break
591+
}
592+
}
593+
} else {
594+
// Filter by multiple transaction hashes.
595+
txHashMap := make(map[common.Hash]bool, len(txHashes))
596+
for _, hash := range txHashes {
597+
txHashMap[hash] = true
598+
}
599+
600+
for i, receipt := range receipts {
601+
if txHashMap[receipt.TxHash] {
602+
ret = append(ret, &ReceiptWithTx{
603+
Receipt: receipt,
604+
Transaction: txs[i],
605+
})
606+
607+
// Early exit if all receipts are found
608+
if len(ret) == len(txHashes) {
609+
break
610+
}
611+
}
612+
}
613+
}
614+
615+
return ret
616+
}

eth/filters/filter_system.go

Lines changed: 3 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -444,92 +444,10 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
444444
}
445445

446446
// Handle transaction receipts subscriptions when a new block is added
447-
es.handleReceiptsEvent(filters, ev)
448-
}
449-
450-
// ReceiptWithTx contains a receipt and its corresponding transaction for websocket subscription
451-
type ReceiptWithTx struct {
452-
Receipt *types.Receipt
453-
Transaction *types.Transaction
454-
}
455-
456-
func (es *EventSystem) handleReceiptsEvent(filters filterIndex, ev core.ChainEvent) {
457-
// If there are no transaction receipt subscriptions, skip processing
458-
if len(filters[TransactionReceiptsSubscription]) == 0 {
459-
return
460-
}
461-
462-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
463-
defer cancel()
464-
465-
// Get receipts for this block
466-
receipts, err := es.backend.GetReceipts(ctx, ev.Header.Hash())
467-
if err != nil {
468-
log.Warn("Failed to get receipts for block", "hash", ev.Header.Hash(), "err", err)
469-
return
470-
}
471-
472-
// Get body to retrieve transactions
473-
body, err := es.backend.GetBody(ctx, ev.Header.Hash(), rpc.BlockNumber(ev.Header.Number.Int64()))
474-
if err != nil {
475-
log.Warn("Failed to get block for receipts", "hash", ev.Header.Hash(), "err", err)
476-
return
477-
}
478-
479-
txs := body.Transactions
480-
if len(txs) != len(receipts) {
481-
log.Warn("Transaction count mismatch", "txs", len(txs), "receipts", len(receipts))
482-
return
483-
}
484-
485447
for _, f := range filters[TransactionReceiptsSubscription] {
486-
var filteredReceipts []*ReceiptWithTx
487-
488-
if len(f.txHashes) == 0 {
489-
// No filter, send all receipts with their transactions.
490-
filteredReceipts = make([]*ReceiptWithTx, len(receipts))
491-
for i, receipt := range receipts {
492-
filteredReceipts[i] = &ReceiptWithTx{
493-
Receipt: receipt,
494-
Transaction: txs[i],
495-
}
496-
}
497-
} else if len(f.txHashes) == 1 {
498-
// Filter by single transaction hash.
499-
// This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization.
500-
for i, receipt := range receipts {
501-
if receipt.TxHash == f.txHashes[0] {
502-
filteredReceipts = append(filteredReceipts, &ReceiptWithTx{
503-
Receipt: receipt,
504-
Transaction: txs[i],
505-
})
506-
break
507-
}
508-
}
509-
} else {
510-
// Filter by multiple transaction hashes.
511-
txHashMap := make(map[common.Hash]bool, len(f.txHashes))
512-
for _, hash := range f.txHashes {
513-
txHashMap[hash] = true
514-
}
515-
516-
for i, receipt := range receipts {
517-
if txHashMap[receipt.TxHash] {
518-
filteredReceipts = append(filteredReceipts, &ReceiptWithTx{
519-
Receipt: receipt,
520-
Transaction: txs[i],
521-
})
522-
523-
// Early exit if all receipts are found
524-
if len(filteredReceipts) == len(f.txHashes) {
525-
break
526-
}
527-
}
528-
}
529-
}
530-
531-
if len(filteredReceipts) > 0 {
532-
f.receipts <- filteredReceipts
448+
matchedReceipts := filterReceipts(f.txHashes, ev)
449+
if len(matchedReceipts) > 0 {
450+
f.receipts <- matchedReceipts
533451
}
534452
}
535453
}

eth/filters/filter_system_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,17 @@ func TestTransactionReceiptsSubscription(t *testing.T) {
829829
}
830830

831831
// Prepare test data
832-
chainEvent := core.ChainEvent{Header: chain[0].Header()}
832+
receipts := blockchain.GetReceiptsByHash(chain[0].Hash())
833+
if receipts == nil {
834+
t.Fatalf("failed to get receipts")
835+
}
836+
837+
chainEvent := core.ChainEvent{
838+
Header: chain[0].Header(),
839+
Receipts: receipts,
840+
Transactions: chain[0].Transactions(),
841+
}
842+
833843
txHashes := make([]common.Hash, txNum)
834844
for i := 0; i < txNum; i++ {
835845
txHashes[i] = chain[0].Transactions()[i].Hash()

0 commit comments

Comments
 (0)