From 118155b5656be86b9e8c6069c169d7969dbd2f88 Mon Sep 17 00:00:00 2001 From: 10gic <2391796+10gic@users.noreply.github.com> Date: Sun, 21 Sep 2025 22:22:27 +0800 Subject: [PATCH 1/6] websocket: add `transactionReceipts` for receipts notification --- eth/filters/api.go | 68 ++++++++++++++++ eth/filters/filter_system.go | 117 ++++++++++++++++++++++++++ eth/filters/filter_system_test.go | 131 ++++++++++++++++++++++++++++++ internal/ethapi/api.go | 8 +- 4 files changed, 320 insertions(+), 4 deletions(-) diff --git a/eth/filters/api.go b/eth/filters/api.go index d678c403894..1aa8ff862a0 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -43,6 +43,7 @@ var ( errPendingLogsUnsupported = errors.New("pending logs are not supported") errExceedMaxTopics = errors.New("exceed max topics") errExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position") + errExceedMaxTxHashes = errors.New("exceed max number of transaction hashes allowed per transactionReceipts subscription") ) const ( @@ -50,6 +51,8 @@ const ( maxTopics = 4 // The maximum number of allowed topics within a topic criteria maxSubTopics = 1000 + // The maximum number of transaction hash criteria allowed in a single subscription + maxTxHashes = 200 ) // filter is a helper struct that holds meta information over the filter type @@ -295,6 +298,71 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc return rpcSub, nil } +// TransactionReceiptsFilter defines criteria for transaction receipts subscription. +// If TransactionHashes is nil or empty, receipts for all transactions included in new blocks will be delivered. +// Otherwise, only receipts for the specified transactions will be delivered. +type TransactionReceiptsFilter struct { + TransactionHashes []common.Hash `json:"transactionHashes,omitempty"` +} + +// TransactionReceipts creates a subscription that fires transaction receipts when transactions are included in blocks. +func (api *FilterAPI) TransactionReceipts(ctx context.Context, filter *TransactionReceiptsFilter) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + // Validate transaction hashes limit + if filter != nil && len(filter.TransactionHashes) > maxTxHashes { + return nil, errExceedMaxTxHashes + } + + var ( + rpcSub = notifier.CreateSubscription() + filteredReceipts = make(chan []*ReceiptWithTx) + txHashes []common.Hash + ) + + if filter != nil { + txHashes = filter.TransactionHashes + } + + receiptsSub := api.events.SubscribeTransactionReceipts(txHashes, filteredReceipts) + + go func() { + defer receiptsSub.Unsubscribe() + + signer := types.LatestSigner(api.sys.backend.ChainConfig()) + + for { + select { + case receiptsWithTx := <-filteredReceipts: + if len(receiptsWithTx) > 0 { + // Convert to the same format as eth_getTransactionReceipt + marshaledReceipts := make([]map[string]interface{}, len(receiptsWithTx)) + for i, receiptWithTx := range receiptsWithTx { + marshaledReceipts[i] = ethapi.MarshalReceipt( + receiptWithTx.Receipt, + receiptWithTx.Receipt.BlockHash, + receiptWithTx.Receipt.BlockNumber.Uint64(), + signer, + receiptWithTx.Transaction, + int(receiptWithTx.Receipt.TransactionIndex), + ) + } + + // Send a batch of tx receipts in one notification + notifier.Notify(rpcSub.ID, marshaledReceipts) + } + case <-rpcSub.Err(): + return + } + } + }() + + return rpcSub, nil +} + // FilterCriteria represents a request to create a new filter. // Same as ethereum.FilterQuery but with UnmarshalJSON() method. type FilterCriteria ethereum.FilterQuery diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ecf1c870c13..35207516348 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -158,6 +158,8 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + // TransactionReceiptsSubscription queries for transaction receipts when transactions are included in blocks + TransactionReceiptsSubscription // LastIndexSubscription keeps track of the last index LastIndexSubscription ) @@ -182,6 +184,8 @@ type subscription struct { logs chan []*types.Log txs chan []*types.Transaction headers chan *types.Header + receipts chan []*ReceiptWithTx + txHashes []common.Hash // contains transaction hashes for transactionReceipts subscription filtering installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled } @@ -268,6 +272,7 @@ func (sub *Subscription) Unsubscribe() { case <-sub.f.logs: case <-sub.f.txs: case <-sub.f.headers: + case <-sub.f.receipts: } } @@ -353,6 +358,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logs: logs, txs: make(chan []*types.Transaction), headers: make(chan *types.Header), + receipts: make(chan []*ReceiptWithTx), installed: make(chan struct{}), err: make(chan error), } @@ -369,6 +375,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti logs: make(chan []*types.Log), txs: make(chan []*types.Transaction), headers: headers, + receipts: make(chan []*ReceiptWithTx), installed: make(chan struct{}), err: make(chan error), } @@ -385,6 +392,26 @@ func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subsc logs: make(chan []*types.Log), txs: txs, headers: make(chan *types.Header), + receipts: make(chan []*ReceiptWithTx), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeTransactionReceipts creates a subscription that writes transaction receipts for +// transactions when they are included in blocks. If txHashes is provided, only receipts +// for those specific transaction hashes will be delivered. +func (es *EventSystem) SubscribeTransactionReceipts(txHashes []common.Hash, receipts chan []*ReceiptWithTx) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: TransactionReceiptsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + txs: make(chan []*types.Transaction), + headers: make(chan *types.Header), + receipts: receipts, + txHashes: txHashes, installed: make(chan struct{}), err: make(chan error), } @@ -415,6 +442,96 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) for _, f := range filters[BlocksSubscription] { f.headers <- ev.Header } + + // Handle transaction receipts subscriptions when a new block is added + es.handleReceiptsEvent(filters, ev) +} + +// ReceiptWithTx contains a receipt and its corresponding transaction for websocket subscription +type ReceiptWithTx struct { + Receipt *types.Receipt + Transaction *types.Transaction +} + +func (es *EventSystem) handleReceiptsEvent(filters filterIndex, ev core.ChainEvent) { + // If there are no transaction receipt subscriptions, skip processing + if len(filters[TransactionReceiptsSubscription]) == 0 { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Get receipts for this block + receipts, err := es.backend.GetReceipts(ctx, ev.Header.Hash()) + if err != nil { + log.Warn("Failed to get receipts for block", "hash", ev.Header.Hash(), "err", err) + return + } + + // Get body to retrieve transactions + body, err := es.backend.GetBody(ctx, ev.Header.Hash(), rpc.BlockNumber(ev.Header.Number.Int64())) + if err != nil { + log.Warn("Failed to get block for receipts", "hash", ev.Header.Hash(), "err", err) + return + } + + txs := body.Transactions + if len(txs) != len(receipts) { + log.Warn("Transaction count mismatch", "txs", len(txs), "receipts", len(receipts)) + return + } + + for _, f := range filters[TransactionReceiptsSubscription] { + var filteredReceipts []*ReceiptWithTx + + if len(f.txHashes) == 0 { + // No filter, send all receipts with their transactions. + filteredReceipts = make([]*ReceiptWithTx, len(receipts)) + for i, receipt := range receipts { + filteredReceipts[i] = &ReceiptWithTx{ + Receipt: receipt, + Transaction: txs[i], + } + } + } else if len(f.txHashes) == 1 { + // Filter by single transaction hash. + // This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization. + for i, receipt := range receipts { + if receipt.TxHash == f.txHashes[0] { + filteredReceipts = append(filteredReceipts, &ReceiptWithTx{ + Receipt: receipt, + Transaction: txs[i], + }) + break + } + } + } else { + // Filter by multiple transaction hashes. + txHashMap := make(map[common.Hash]bool, len(f.txHashes)) + for _, hash := range f.txHashes { + txHashMap[hash] = true + } + + for i, receipt := range receipts { + if txHashMap[receipt.TxHash] { + filteredReceipts = append(filteredReceipts, &ReceiptWithTx{ + Receipt: receipt, + Transaction: txs[i], + }) + + // Early exit if all receipts are found + if len(filteredReceipts) == len(f.txHashes) { + break + } + } + } + } + + if len(filteredReceipts) > 0 { + f.receipts <- filteredReceipts + } + } } // eventLoop (un)installs filters and processes mux events. diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 0048e74995f..459cde8f794 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" @@ -781,3 +782,133 @@ func TestPendingTxFilterDeadlock(t *testing.T) { } } } + +// TestTransactionReceiptsSubscription tests the transaction receipts subscription functionality +func TestTransactionReceiptsSubscription(t *testing.T) { + t.Parallel() + + const txNum = 5 + + // Setup test environment + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{}) + api = NewFilterAPI(sys) + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + signer = types.NewLondonSigner(big.NewInt(1)) + genesis = &core.Genesis{ + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(1000000000000000000)}}, // 1 ETH + Config: params.TestChainConfig, + BaseFee: big.NewInt(params.InitialBaseFee), + } + _, chain, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 1, func(i int, gen *core.BlockGen) { + // Add transactions to the block + for j := 0; j < txNum; j++ { + toAddr := common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268") + tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{ + Nonce: uint64(j), + GasPrice: gen.BaseFee(), + Gas: 21000, + To: &toAddr, + Value: big.NewInt(1000), + Data: nil, + }), signer, key1) + gen.AddTx(tx) + } + }) + ) + + // Insert the blocks into the chain + blockchain, err := core.NewBlockChain(db, genesis, ethash.NewFaker(), nil) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + if n, err := blockchain.InsertChain(chain); err != nil { + t.Fatalf("block %d: failed to insert into chain: %v", n, err) + } + + // Prepare test data + chainEvent := core.ChainEvent{Header: chain[0].Header()} + txHashes := make([]common.Hash, txNum) + for i := 0; i < txNum; i++ { + txHashes[i] = chain[0].Transactions()[i].Hash() + } + + testCases := []struct { + name string + filterTxHashes []common.Hash + expectedReceiptTxHashes []common.Hash + expectError bool + }{ + { + name: "no filter - should return all receipts", + filterTxHashes: nil, + expectedReceiptTxHashes: txHashes, + expectError: false, + }, + { + name: "single tx hash filter", + filterTxHashes: []common.Hash{txHashes[0]}, + expectedReceiptTxHashes: []common.Hash{txHashes[0]}, + expectError: false, + }, + { + name: "multiple tx hashes filter", + filterTxHashes: []common.Hash{txHashes[0], txHashes[1], txHashes[2]}, + expectedReceiptTxHashes: []common.Hash{txHashes[0], txHashes[1], txHashes[2]}, + expectError: false, + }, + } + + // Run test cases + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + receiptsChan := make(chan []*ReceiptWithTx) + sub := api.events.SubscribeTransactionReceipts(tc.filterTxHashes, receiptsChan) + + // Send chain event + backend.chainFeed.Send(chainEvent) + + // Wait for receipts + timeout := time.After(1 * time.Second) + var receivedReceipts []*types.Receipt + for { + select { + case receiptsWithTx := <-receiptsChan: + for _, receiptWithTx := range receiptsWithTx { + receivedReceipts = append(receivedReceipts, receiptWithTx.Receipt) + } + case <-timeout: + t.Fatalf("timeout waiting for receipts") + } + if len(receivedReceipts) >= len(tc.expectedReceiptTxHashes) { + break + } + } + + // Verify receipt count + if len(receivedReceipts) != len(tc.expectedReceiptTxHashes) { + t.Errorf("Expected %d receipts, got %d", len(tc.expectedReceiptTxHashes), len(receivedReceipts)) + } + + // Verify specific transaction hashes are present + if tc.expectedReceiptTxHashes != nil { + receivedHashes := make(map[common.Hash]bool) + for _, receipt := range receivedReceipts { + receivedHashes[receipt.TxHash] = true + } + + for _, expectedHash := range tc.expectedReceiptTxHashes { + if !receivedHashes[expectedHash] { + t.Errorf("Expected receipt for tx %x not found", expectedHash) + } + } + } + + // Cleanup + sub.Unsubscribe() + <-sub.Err() + }) + } +} diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index ebb8ece7301..83f8fa06c20 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -627,7 +627,7 @@ func (api *BlockChainAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rp result := make([]map[string]interface{}, len(receipts)) for i, receipt := range receipts { - result[i] = marshalReceipt(receipt, block.Hash(), block.NumberU64(), signer, txs[i], i) + result[i] = MarshalReceipt(receipt, block.Hash(), block.NumberU64(), signer, txs[i], i) } return result, nil } @@ -1472,11 +1472,11 @@ func (api *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash commo return nil, err } // Derive the sender. - return marshalReceipt(receipt, blockHash, blockNumber, api.signer, tx, int(index)), nil + return MarshalReceipt(receipt, blockHash, blockNumber, api.signer, tx, int(index)), nil } -// marshalReceipt marshals a transaction receipt into a JSON object. -func marshalReceipt(receipt *types.Receipt, blockHash common.Hash, blockNumber uint64, signer types.Signer, tx *types.Transaction, txIndex int) map[string]interface{} { +// MarshalReceipt marshals a transaction receipt into a JSON object. +func MarshalReceipt(receipt *types.Receipt, blockHash common.Hash, blockNumber uint64, signer types.Signer, tx *types.Transaction, txIndex int) map[string]interface{} { from, _ := types.Sender(signer, tx) fields := map[string]interface{}{ From 148d074a36f57a41279d02d7c43c68b4baedc8e9 Mon Sep 17 00:00:00 2001 From: 10gic <2391796+10gic@users.noreply.github.com> Date: Thu, 25 Sep 2025 02:13:21 +0800 Subject: [PATCH 2/6] websocket: optimize receipts retrieval --- core/blockchain.go | 15 +++++- core/events.go | 4 +- eth/filters/api.go | 16 +++--- eth/filters/filter.go | 63 ++++++++++++++++++++++ eth/filters/filter_system.go | 88 ++----------------------------- eth/filters/filter_system_test.go | 12 ++++- 6 files changed, 101 insertions(+), 97 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 30f3da3004a..dd4ea3fe912 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1690,7 +1690,12 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types // Set new head. bc.writeHeadBlock(block) - bc.chainFeed.Send(ChainEvent{Header: block.Header()}) + bc.chainFeed.Send(ChainEvent{ + Header: block.Header(), + Receipts: receipts, + Transactions: block.Transactions(), + }) + if len(logs) > 0 { bc.logsFeed.Send(logs) } @@ -2589,7 +2594,13 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) { // Emit events logs := bc.collectLogs(head, false) - bc.chainFeed.Send(ChainEvent{Header: head.Header()}) + + bc.chainFeed.Send(ChainEvent{ + Header: head.Header(), + Receipts: bc.GetReceiptsByHash(head.Hash()), + Transactions: head.Transactions(), + }) + if len(logs) > 0 { bc.logsFeed.Send(logs) } diff --git a/core/events.go b/core/events.go index 5ad2cb1f7b3..ef0de324262 100644 --- a/core/events.go +++ b/core/events.go @@ -27,7 +27,9 @@ type NewTxsEvent struct{ Txs []*types.Transaction } type RemovedLogsEvent struct{ Logs []*types.Log } type ChainEvent struct { - Header *types.Header + Header *types.Header + Receipts []*types.Receipt + Transactions []*types.Transaction } type ChainHeadEvent struct { diff --git a/eth/filters/api.go b/eth/filters/api.go index 1aa8ff862a0..e95c341a746 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -318,16 +318,16 @@ func (api *FilterAPI) TransactionReceipts(ctx context.Context, filter *Transacti } var ( - rpcSub = notifier.CreateSubscription() - filteredReceipts = make(chan []*ReceiptWithTx) - txHashes []common.Hash + rpcSub = notifier.CreateSubscription() + matchedReceipts = make(chan []*ReceiptWithTx) + txHashes []common.Hash ) if filter != nil { txHashes = filter.TransactionHashes } - receiptsSub := api.events.SubscribeTransactionReceipts(txHashes, filteredReceipts) + receiptsSub := api.events.SubscribeTransactionReceipts(txHashes, matchedReceipts) go func() { defer receiptsSub.Unsubscribe() @@ -336,11 +336,11 @@ func (api *FilterAPI) TransactionReceipts(ctx context.Context, filter *Transacti for { select { - case receiptsWithTx := <-filteredReceipts: - if len(receiptsWithTx) > 0 { + case receiptsWithTxs := <-matchedReceipts: + if len(receiptsWithTxs) > 0 { // Convert to the same format as eth_getTransactionReceipt - marshaledReceipts := make([]map[string]interface{}, len(receiptsWithTx)) - for i, receiptWithTx := range receiptsWithTx { + marshaledReceipts := make([]map[string]interface{}, len(receiptsWithTxs)) + for i, receiptWithTx := range receiptsWithTxs { marshaledReceipts[i] = ethapi.MarshalReceipt( receiptWithTx.Receipt, receiptWithTx.Receipt.BlockHash, diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 1a9918d0ee7..0b2f403355b 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/history" "github.com/ethereum/go-ethereum/core/types" @@ -551,3 +552,65 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo } return true } + +// ReceiptWithTx contains a receipt and its corresponding transaction +type ReceiptWithTx struct { + Receipt *types.Receipt + Transaction *types.Transaction +} + +// filterReceipts returns the receipts matching the given criteria +// In addition to returning receipts, it also returns the corresponding transactions. +// This is because receipts only contain low-level data, while user-facing data +// may require additional information from the Transaction. +func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx { + var ret []*ReceiptWithTx + + receipts := ev.Receipts + txs := ev.Transactions + + if len(txHashes) == 0 { + // No filter, send all receipts with their transactions. + ret = make([]*ReceiptWithTx, len(receipts)) + for i, receipt := range receipts { + ret[i] = &ReceiptWithTx{ + Receipt: receipt, + Transaction: txs[i], + } + } + } else if len(txHashes) == 1 { + // Filter by single transaction hash. + // This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization. + for i, receipt := range receipts { + if receipt.TxHash == (txHashes)[0] { + ret = append(ret, &ReceiptWithTx{ + Receipt: receipt, + Transaction: txs[i], + }) + break + } + } + } else { + // Filter by multiple transaction hashes. + txHashMap := make(map[common.Hash]bool, len(txHashes)) + for _, hash := range txHashes { + txHashMap[hash] = true + } + + for i, receipt := range receipts { + if txHashMap[receipt.TxHash] { + ret = append(ret, &ReceiptWithTx{ + Receipt: receipt, + Transaction: txs[i], + }) + + // Early exit if all receipts are found + if len(ret) == len(txHashes) { + break + } + } + } + } + + return ret +} diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 35207516348..02783fa5ec5 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -444,92 +444,10 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) } // Handle transaction receipts subscriptions when a new block is added - es.handleReceiptsEvent(filters, ev) -} - -// ReceiptWithTx contains a receipt and its corresponding transaction for websocket subscription -type ReceiptWithTx struct { - Receipt *types.Receipt - Transaction *types.Transaction -} - -func (es *EventSystem) handleReceiptsEvent(filters filterIndex, ev core.ChainEvent) { - // If there are no transaction receipt subscriptions, skip processing - if len(filters[TransactionReceiptsSubscription]) == 0 { - return - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Get receipts for this block - receipts, err := es.backend.GetReceipts(ctx, ev.Header.Hash()) - if err != nil { - log.Warn("Failed to get receipts for block", "hash", ev.Header.Hash(), "err", err) - return - } - - // Get body to retrieve transactions - body, err := es.backend.GetBody(ctx, ev.Header.Hash(), rpc.BlockNumber(ev.Header.Number.Int64())) - if err != nil { - log.Warn("Failed to get block for receipts", "hash", ev.Header.Hash(), "err", err) - return - } - - txs := body.Transactions - if len(txs) != len(receipts) { - log.Warn("Transaction count mismatch", "txs", len(txs), "receipts", len(receipts)) - return - } - for _, f := range filters[TransactionReceiptsSubscription] { - var filteredReceipts []*ReceiptWithTx - - if len(f.txHashes) == 0 { - // No filter, send all receipts with their transactions. - filteredReceipts = make([]*ReceiptWithTx, len(receipts)) - for i, receipt := range receipts { - filteredReceipts[i] = &ReceiptWithTx{ - Receipt: receipt, - Transaction: txs[i], - } - } - } else if len(f.txHashes) == 1 { - // Filter by single transaction hash. - // This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization. - for i, receipt := range receipts { - if receipt.TxHash == f.txHashes[0] { - filteredReceipts = append(filteredReceipts, &ReceiptWithTx{ - Receipt: receipt, - Transaction: txs[i], - }) - break - } - } - } else { - // Filter by multiple transaction hashes. - txHashMap := make(map[common.Hash]bool, len(f.txHashes)) - for _, hash := range f.txHashes { - txHashMap[hash] = true - } - - for i, receipt := range receipts { - if txHashMap[receipt.TxHash] { - filteredReceipts = append(filteredReceipts, &ReceiptWithTx{ - Receipt: receipt, - Transaction: txs[i], - }) - - // Early exit if all receipts are found - if len(filteredReceipts) == len(f.txHashes) { - break - } - } - } - } - - if len(filteredReceipts) > 0 { - f.receipts <- filteredReceipts + matchedReceipts := filterReceipts(f.txHashes, ev) + if len(matchedReceipts) > 0 { + f.receipts <- matchedReceipts } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 459cde8f794..e5a1a2b25f1 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -829,7 +829,17 @@ func TestTransactionReceiptsSubscription(t *testing.T) { } // Prepare test data - chainEvent := core.ChainEvent{Header: chain[0].Header()} + receipts := blockchain.GetReceiptsByHash(chain[0].Hash()) + if receipts == nil { + t.Fatalf("failed to get receipts") + } + + chainEvent := core.ChainEvent{ + Header: chain[0].Header(), + Receipts: receipts, + Transactions: chain[0].Transactions(), + } + txHashes := make([]common.Hash, txNum) for i := 0; i < txNum; i++ { txHashes[i] = chain[0].Transactions()[i].Hash() From 9e3c25a676c5e5acb5b854bed3c1975662070bea Mon Sep 17 00:00:00 2001 From: 10gic <2391796+10gic@users.noreply.github.com> Date: Thu, 25 Sep 2025 10:50:40 +0800 Subject: [PATCH 3/6] chore: add sanity check for receipts and transaction length --- eth/filters/filter.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 0b2f403355b..02399bc8018 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -569,6 +569,11 @@ func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx receipts := ev.Receipts txs := ev.Transactions + if len(receipts) != len(txs) { + log.Warn("Receipts and transactions length mismatch", "receipts", len(receipts), "transactions", len(txs)) + return ret + } + if len(txHashes) == 0 { // No filter, send all receipts with their transactions. ret = make([]*ReceiptWithTx, len(receipts)) @@ -582,7 +587,7 @@ func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx // Filter by single transaction hash. // This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization. for i, receipt := range receipts { - if receipt.TxHash == (txHashes)[0] { + if receipt.TxHash == txHashes[0] { ret = append(ret, &ReceiptWithTx{ Receipt: receipt, Transaction: txs[i], From af46f3f544f881b4c3f8717f93b02b575f682d69 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Mon, 29 Sep 2025 20:55:38 +0200 Subject: [PATCH 4/6] cache receipts --- core/blockchain.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index dd4ea3fe912..0f1f5d6eeac 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2347,6 +2347,14 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co // collectLogs collects the logs that were generated or removed during the // processing of a block. These logs are later announced as deleted or reborn. func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { + receipts, logs := bc.collectLogsAndReceipts(b, removed) + _ = receipts // receipts are not used here, but retrieved for efficiency in other callers + return logs +} + +// collectLogsAndReceipts retrieves receipts from the database and returns both receipts and logs. +// This avoids duplicate database reads when both are needed. +func (bc *BlockChain) collectLogsAndReceipts(b *types.Block, removed bool) ([]*types.Receipt, []*types.Log) { var blobGasPrice *big.Int if b.ExcessBlobGas() != nil { blobGasPrice = eip4844.CalcBlobFee(bc.chainConfig, b.Header()) @@ -2364,7 +2372,7 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { logs = append(logs, log) } } - return logs + return receipts, logs } // reorg takes two blocks, an old chain and a new chain and will reconstruct the @@ -2593,11 +2601,11 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) { bc.writeHeadBlock(head) // Emit events - logs := bc.collectLogs(head, false) + receipts, logs := bc.collectLogsAndReceipts(head, false) bc.chainFeed.Send(ChainEvent{ Header: head.Header(), - Receipts: bc.GetReceiptsByHash(head.Hash()), + Receipts: receipts, Transactions: head.Transactions(), }) From 4b926f6645dd570236ba4d8f365be0fdf4992ea1 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Mon, 29 Sep 2025 20:56:47 +0200 Subject: [PATCH 5/6] clean --- core/blockchain.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 0f1f5d6eeac..c74fe56efd6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2347,8 +2347,7 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co // collectLogs collects the logs that were generated or removed during the // processing of a block. These logs are later announced as deleted or reborn. func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { - receipts, logs := bc.collectLogsAndReceipts(b, removed) - _ = receipts // receipts are not used here, but retrieved for efficiency in other callers + _, logs := bc.collectLogsAndReceipts(b, removed) return logs } From 9168002c47b99c0f0b3ccd8e14ea88428259b88a Mon Sep 17 00:00:00 2001 From: 10gic Date: Tue, 30 Sep 2025 14:11:39 +0800 Subject: [PATCH 6/6] chore: rename collectLogsAndReceipts to collectReceiptsAndLogs --- core/blockchain.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c74fe56efd6..c74168d82d1 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2347,13 +2347,13 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co // collectLogs collects the logs that were generated or removed during the // processing of a block. These logs are later announced as deleted or reborn. func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { - _, logs := bc.collectLogsAndReceipts(b, removed) + _, logs := bc.collectReceiptsAndLogs(b, removed) return logs } -// collectLogsAndReceipts retrieves receipts from the database and returns both receipts and logs. +// collectReceiptsAndLogs retrieves receipts from the database and returns both receipts and logs. // This avoids duplicate database reads when both are needed. -func (bc *BlockChain) collectLogsAndReceipts(b *types.Block, removed bool) ([]*types.Receipt, []*types.Log) { +func (bc *BlockChain) collectReceiptsAndLogs(b *types.Block, removed bool) ([]*types.Receipt, []*types.Log) { var blobGasPrice *big.Int if b.ExcessBlobGas() != nil { blobGasPrice = eip4844.CalcBlobFee(bc.chainConfig, b.Header()) @@ -2600,7 +2600,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) { bc.writeHeadBlock(head) // Emit events - receipts, logs := bc.collectLogsAndReceipts(head, false) + receipts, logs := bc.collectReceiptsAndLogs(head, false) bc.chainFeed.Send(ChainEvent{ Header: head.Header(),