Skip to content

Commit 1120855

Browse files
10gics1na
andauthored
eth/filters: add transactionReceipts subscription (#32697)
- Introduce a new subscription kind `transactionReceipts` to allow clients to receive transaction receipts over WebSocket as soon as they are available. - Accept optional `transactionHashes` filter to subscribe to receipts for specific transactions; an empty or omitted filter subscribes to all receipts. - Preserve the same receipt format as returned by `eth_getTransactionReceipt`. - Avoid additional HTTP polling, reducing RPC load and latency. --------- Co-authored-by: Sina Mahmoodi <[email protected]>
1 parent a1b8e48 commit 1120855

File tree

7 files changed

+341
-9
lines changed

7 files changed

+341
-9
lines changed

core/blockchain.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1690,7 +1690,12 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
16901690
// Set new head.
16911691
bc.writeHeadBlock(block)
16921692

1693-
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
1693+
bc.chainFeed.Send(ChainEvent{
1694+
Header: block.Header(),
1695+
Receipts: receipts,
1696+
Transactions: block.Transactions(),
1697+
})
1698+
16941699
if len(logs) > 0 {
16951700
bc.logsFeed.Send(logs)
16961701
}
@@ -2342,6 +2347,13 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co
23422347
// collectLogs collects the logs that were generated or removed during the
23432348
// processing of a block. These logs are later announced as deleted or reborn.
23442349
func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
2350+
_, logs := bc.collectReceiptsAndLogs(b, removed)
2351+
return logs
2352+
}
2353+
2354+
// collectReceiptsAndLogs retrieves receipts from the database and returns both receipts and logs.
2355+
// This avoids duplicate database reads when both are needed.
2356+
func (bc *BlockChain) collectReceiptsAndLogs(b *types.Block, removed bool) ([]*types.Receipt, []*types.Log) {
23452357
var blobGasPrice *big.Int
23462358
if b.ExcessBlobGas() != nil {
23472359
blobGasPrice = eip4844.CalcBlobFee(bc.chainConfig, b.Header())
@@ -2359,7 +2371,7 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
23592371
logs = append(logs, log)
23602372
}
23612373
}
2362-
return logs
2374+
return receipts, logs
23632375
}
23642376

23652377
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
@@ -2588,8 +2600,14 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
25882600
bc.writeHeadBlock(head)
25892601

25902602
// Emit events
2591-
logs := bc.collectLogs(head, false)
2592-
bc.chainFeed.Send(ChainEvent{Header: head.Header()})
2603+
receipts, logs := bc.collectReceiptsAndLogs(head, false)
2604+
2605+
bc.chainFeed.Send(ChainEvent{
2606+
Header: head.Header(),
2607+
Receipts: receipts,
2608+
Transactions: head.Transactions(),
2609+
})
2610+
25932611
if len(logs) > 0 {
25942612
bc.logsFeed.Send(logs)
25952613
}

core/events.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ type NewTxsEvent struct{ Txs []*types.Transaction }
2727
type RemovedLogsEvent struct{ Logs []*types.Log }
2828

2929
type ChainEvent struct {
30-
Header *types.Header
30+
Header *types.Header
31+
Receipts []*types.Receipt
32+
Transactions []*types.Transaction
3133
}
3234

3335
type ChainHeadEvent struct {

eth/filters/api.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,16 @@ var (
4343
errPendingLogsUnsupported = errors.New("pending logs are not supported")
4444
errExceedMaxTopics = errors.New("exceed max topics")
4545
errExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position")
46+
errExceedMaxTxHashes = errors.New("exceed max number of transaction hashes allowed per transactionReceipts subscription")
4647
)
4748

4849
const (
4950
// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
5051
maxTopics = 4
5152
// The maximum number of allowed topics within a topic criteria
5253
maxSubTopics = 1000
54+
// The maximum number of transaction hash criteria allowed in a single subscription
55+
maxTxHashes = 200
5356
)
5457

5558
// filter is a helper struct that holds meta information over the filter type
@@ -296,6 +299,71 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
296299
return rpcSub, nil
297300
}
298301

302+
// TransactionReceiptsFilter defines criteria for transaction receipts subscription.
303+
// If TransactionHashes is nil or empty, receipts for all transactions included in new blocks will be delivered.
304+
// Otherwise, only receipts for the specified transactions will be delivered.
305+
type TransactionReceiptsFilter struct {
306+
TransactionHashes []common.Hash `json:"transactionHashes,omitempty"`
307+
}
308+
309+
// TransactionReceipts creates a subscription that fires transaction receipts when transactions are included in blocks.
310+
func (api *FilterAPI) TransactionReceipts(ctx context.Context, filter *TransactionReceiptsFilter) (*rpc.Subscription, error) {
311+
notifier, supported := rpc.NotifierFromContext(ctx)
312+
if !supported {
313+
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
314+
}
315+
316+
// Validate transaction hashes limit
317+
if filter != nil && len(filter.TransactionHashes) > maxTxHashes {
318+
return nil, errExceedMaxTxHashes
319+
}
320+
321+
var (
322+
rpcSub = notifier.CreateSubscription()
323+
matchedReceipts = make(chan []*ReceiptWithTx)
324+
txHashes []common.Hash
325+
)
326+
327+
if filter != nil {
328+
txHashes = filter.TransactionHashes
329+
}
330+
331+
receiptsSub := api.events.SubscribeTransactionReceipts(txHashes, matchedReceipts)
332+
333+
go func() {
334+
defer receiptsSub.Unsubscribe()
335+
336+
signer := types.LatestSigner(api.sys.backend.ChainConfig())
337+
338+
for {
339+
select {
340+
case receiptsWithTxs := <-matchedReceipts:
341+
if len(receiptsWithTxs) > 0 {
342+
// Convert to the same format as eth_getTransactionReceipt
343+
marshaledReceipts := make([]map[string]interface{}, len(receiptsWithTxs))
344+
for i, receiptWithTx := range receiptsWithTxs {
345+
marshaledReceipts[i] = ethapi.MarshalReceipt(
346+
receiptWithTx.Receipt,
347+
receiptWithTx.Receipt.BlockHash,
348+
receiptWithTx.Receipt.BlockNumber.Uint64(),
349+
signer,
350+
receiptWithTx.Transaction,
351+
int(receiptWithTx.Receipt.TransactionIndex),
352+
)
353+
}
354+
355+
// Send a batch of tx receipts in one notification
356+
notifier.Notify(rpcSub.ID, marshaledReceipts)
357+
}
358+
case <-rpcSub.Err():
359+
return
360+
}
361+
}
362+
}()
363+
364+
return rpcSub, nil
365+
}
366+
299367
// FilterCriteria represents a request to create a new filter.
300368
// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
301369
type FilterCriteria ethereum.FilterQuery

eth/filters/filter.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/ethereum/go-ethereum/common"
28+
"github.com/ethereum/go-ethereum/core"
2829
"github.com/ethereum/go-ethereum/core/filtermaps"
2930
"github.com/ethereum/go-ethereum/core/history"
3031
"github.com/ethereum/go-ethereum/core/types"
@@ -551,3 +552,70 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
551552
}
552553
return true
553554
}
555+
556+
// ReceiptWithTx contains a receipt and its corresponding transaction
557+
type ReceiptWithTx struct {
558+
Receipt *types.Receipt
559+
Transaction *types.Transaction
560+
}
561+
562+
// filterReceipts returns the receipts matching the given criteria
563+
// In addition to returning receipts, it also returns the corresponding transactions.
564+
// This is because receipts only contain low-level data, while user-facing data
565+
// may require additional information from the Transaction.
566+
func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx {
567+
var ret []*ReceiptWithTx
568+
569+
receipts := ev.Receipts
570+
txs := ev.Transactions
571+
572+
if len(receipts) != len(txs) {
573+
log.Warn("Receipts and transactions length mismatch", "receipts", len(receipts), "transactions", len(txs))
574+
return ret
575+
}
576+
577+
if len(txHashes) == 0 {
578+
// No filter, send all receipts with their transactions.
579+
ret = make([]*ReceiptWithTx, len(receipts))
580+
for i, receipt := range receipts {
581+
ret[i] = &ReceiptWithTx{
582+
Receipt: receipt,
583+
Transaction: txs[i],
584+
}
585+
}
586+
} else if len(txHashes) == 1 {
587+
// Filter by single transaction hash.
588+
// This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization.
589+
for i, receipt := range receipts {
590+
if receipt.TxHash == txHashes[0] {
591+
ret = append(ret, &ReceiptWithTx{
592+
Receipt: receipt,
593+
Transaction: txs[i],
594+
})
595+
break
596+
}
597+
}
598+
} else {
599+
// Filter by multiple transaction hashes.
600+
txHashMap := make(map[common.Hash]bool, len(txHashes))
601+
for _, hash := range txHashes {
602+
txHashMap[hash] = true
603+
}
604+
605+
for i, receipt := range receipts {
606+
if txHashMap[receipt.TxHash] {
607+
ret = append(ret, &ReceiptWithTx{
608+
Receipt: receipt,
609+
Transaction: txs[i],
610+
})
611+
612+
// Early exit if all receipts are found
613+
if len(ret) == len(txHashes) {
614+
break
615+
}
616+
}
617+
}
618+
}
619+
620+
return ret
621+
}

eth/filters/filter_system.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ const (
158158
PendingTransactionsSubscription
159159
// BlocksSubscription queries hashes for blocks that are imported
160160
BlocksSubscription
161+
// TransactionReceiptsSubscription queries for transaction receipts when transactions are included in blocks
162+
TransactionReceiptsSubscription
161163
// LastIndexSubscription keeps track of the last index
162164
LastIndexSubscription
163165
)
@@ -182,6 +184,8 @@ type subscription struct {
182184
logs chan []*types.Log
183185
txs chan []*types.Transaction
184186
headers chan *types.Header
187+
receipts chan []*ReceiptWithTx
188+
txHashes []common.Hash // contains transaction hashes for transactionReceipts subscription filtering
185189
installed chan struct{} // closed when the filter is installed
186190
err chan error // closed when the filter is uninstalled
187191
}
@@ -268,6 +272,7 @@ func (sub *Subscription) Unsubscribe() {
268272
case <-sub.f.logs:
269273
case <-sub.f.txs:
270274
case <-sub.f.headers:
275+
case <-sub.f.receipts:
271276
}
272277
}
273278

@@ -353,6 +358,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
353358
logs: logs,
354359
txs: make(chan []*types.Transaction),
355360
headers: make(chan *types.Header),
361+
receipts: make(chan []*ReceiptWithTx),
356362
installed: make(chan struct{}),
357363
err: make(chan error),
358364
}
@@ -369,6 +375,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
369375
logs: make(chan []*types.Log),
370376
txs: make(chan []*types.Transaction),
371377
headers: headers,
378+
receipts: make(chan []*ReceiptWithTx),
372379
installed: make(chan struct{}),
373380
err: make(chan error),
374381
}
@@ -385,6 +392,26 @@ func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subsc
385392
logs: make(chan []*types.Log),
386393
txs: txs,
387394
headers: make(chan *types.Header),
395+
receipts: make(chan []*ReceiptWithTx),
396+
installed: make(chan struct{}),
397+
err: make(chan error),
398+
}
399+
return es.subscribe(sub)
400+
}
401+
402+
// SubscribeTransactionReceipts creates a subscription that writes transaction receipts for
403+
// transactions when they are included in blocks. If txHashes is provided, only receipts
404+
// for those specific transaction hashes will be delivered.
405+
func (es *EventSystem) SubscribeTransactionReceipts(txHashes []common.Hash, receipts chan []*ReceiptWithTx) *Subscription {
406+
sub := &subscription{
407+
id: rpc.NewID(),
408+
typ: TransactionReceiptsSubscription,
409+
created: time.Now(),
410+
logs: make(chan []*types.Log),
411+
txs: make(chan []*types.Transaction),
412+
headers: make(chan *types.Header),
413+
receipts: receipts,
414+
txHashes: txHashes,
388415
installed: make(chan struct{}),
389416
err: make(chan error),
390417
}
@@ -415,6 +442,14 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
415442
for _, f := range filters[BlocksSubscription] {
416443
f.headers <- ev.Header
417444
}
445+
446+
// Handle transaction receipts subscriptions when a new block is added
447+
for _, f := range filters[TransactionReceiptsSubscription] {
448+
matchedReceipts := filterReceipts(f.txHashes, ev)
449+
if len(matchedReceipts) > 0 {
450+
f.receipts <- matchedReceipts
451+
}
452+
}
418453
}
419454

420455
// eventLoop (un)installs filters and processes mux events.

0 commit comments

Comments
 (0)