Skip to content

Commit 4034c67

Browse files
yierxcatror
andauthored
eth/filters: fix a breaking change and return rpctransaction (#26757)
* eth/filters: fix a breaking change and return rpctransaction * eth/filters: fix test cases --------- Co-authored-by: Catror <[email protected]>
1 parent fe01a2f commit 4034c67

File tree

2 files changed

+90
-15
lines changed

2 files changed

+90
-15
lines changed

eth/filters/api.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type filter struct {
3939
typ Type
4040
deadline *time.Timer // filter is inactive when deadline triggers
4141
hashes []common.Hash
42+
fullTx bool
4243
txs []*types.Transaction
4344
crit FilterCriteria
4445
logs []*types.Log
@@ -103,14 +104,14 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
103104
//
104105
// It is part of the filter package because this filter can be used through the
105106
// `eth_getFilterChanges` polling method that is also used for log filters.
106-
func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
107+
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
107108
var (
108109
pendingTxs = make(chan []*types.Transaction)
109110
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
110111
)
111112

112113
api.filtersMu.Lock()
113-
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
114+
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, fullTx: fullTx != nil && *fullTx, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
114115
api.filtersMu.Unlock()
115116

116117
go func() {
@@ -412,6 +413,9 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
412413
api.filtersMu.Lock()
413414
defer api.filtersMu.Unlock()
414415

416+
chainConfig := api.sys.backend.ChainConfig()
417+
latest := api.sys.backend.CurrentHeader()
418+
415419
if f, found := api.filters[id]; found {
416420
if !f.deadline.Stop() {
417421
// timer expired but filter is not yet removed in timeout loop
@@ -426,9 +430,21 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
426430
f.hashes = nil
427431
return returnHashes(hashes), nil
428432
case PendingTransactionsSubscription:
429-
txs := f.txs
430-
f.txs = nil
431-
return txs, nil
433+
if f.fullTx {
434+
txs := make([]*ethapi.RPCTransaction, 0, len(f.txs))
435+
for _, tx := range f.txs {
436+
txs = append(txs, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig))
437+
}
438+
f.txs = nil
439+
return txs, nil
440+
} else {
441+
hashes := make([]common.Hash, 0, len(f.txs))
442+
for _, tx := range f.txs {
443+
hashes = append(hashes, tx.Hash())
444+
}
445+
f.txs = nil
446+
return hashes, nil
447+
}
432448
case LogsSubscription, MinedAndPendingLogsSubscription:
433449
logs := f.logs
434450
f.logs = nil

eth/filters/filter_system_test.go

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/ethereum/go-ethereum/crypto"
3838
"github.com/ethereum/go-ethereum/ethdb"
3939
"github.com/ethereum/go-ethereum/event"
40+
"github.com/ethereum/go-ethereum/internal/ethapi"
4041
"github.com/ethereum/go-ethereum/params"
4142
"github.com/ethereum/go-ethereum/rpc"
4243
)
@@ -52,11 +53,12 @@ type testBackend struct {
5253
}
5354

5455
func (b *testBackend) ChainConfig() *params.ChainConfig {
55-
panic("implement me")
56+
return params.TestChainConfig
5657
}
5758

5859
func (b *testBackend) CurrentHeader() *types.Header {
59-
panic("implement me")
60+
hdr, _ := b.HeaderByNumber(context.TODO(), rpc.LatestBlockNumber)
61+
return hdr
6062
}
6163

6264
func (b *testBackend) ChainDb() ethdb.Database {
@@ -256,10 +258,10 @@ func TestPendingTxFilter(t *testing.T) {
256258
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
257259
}
258260

259-
txs []*types.Transaction
261+
hashes []common.Hash
260262
)
261263

262-
fid0 := api.NewPendingTransactionFilter()
264+
fid0 := api.NewPendingTransactionFilter(nil)
263265

264266
time.Sleep(1 * time.Second)
265267
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
@@ -271,7 +273,64 @@ func TestPendingTxFilter(t *testing.T) {
271273
t.Fatalf("Unable to retrieve logs: %v", err)
272274
}
273275

274-
tx := results.([]*types.Transaction)
276+
h := results.([]common.Hash)
277+
hashes = append(hashes, h...)
278+
if len(hashes) >= len(transactions) {
279+
break
280+
}
281+
// check timeout
282+
if time.Now().After(timeout) {
283+
break
284+
}
285+
286+
time.Sleep(100 * time.Millisecond)
287+
}
288+
289+
if len(hashes) != len(transactions) {
290+
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
291+
return
292+
}
293+
for i := range hashes {
294+
if hashes[i] != transactions[i].Hash() {
295+
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
296+
}
297+
}
298+
}
299+
300+
// TestPendingTxFilterFullTx tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
301+
func TestPendingTxFilterFullTx(t *testing.T) {
302+
t.Parallel()
303+
304+
var (
305+
db = rawdb.NewMemoryDatabase()
306+
backend, sys = newTestFilterSystem(t, db, Config{})
307+
api = NewFilterAPI(sys, false)
308+
309+
transactions = []*types.Transaction{
310+
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
311+
types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
312+
types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
313+
types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
314+
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
315+
}
316+
317+
txs []*ethapi.RPCTransaction
318+
)
319+
320+
fullTx := true
321+
fid0 := api.NewPendingTransactionFilter(&fullTx)
322+
323+
time.Sleep(1 * time.Second)
324+
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
325+
326+
timeout := time.Now().Add(1 * time.Second)
327+
for {
328+
results, err := api.GetFilterChanges(fid0)
329+
if err != nil {
330+
t.Fatalf("Unable to retrieve logs: %v", err)
331+
}
332+
333+
tx := results.([]*ethapi.RPCTransaction)
275334
txs = append(txs, tx...)
276335
if len(txs) >= len(transactions) {
277336
break
@@ -289,8 +348,8 @@ func TestPendingTxFilter(t *testing.T) {
289348
return
290349
}
291350
for i := range txs {
292-
if txs[i].Hash() != transactions[i].Hash() {
293-
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
351+
if txs[i].Hash != transactions[i].Hash() {
352+
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash)
294353
}
295354
}
296355
}
@@ -854,15 +913,15 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
854913
// timeout either in 100ms or 200ms
855914
fids := make([]rpc.ID, 20)
856915
for i := 0; i < len(fids); i++ {
857-
fid := api.NewPendingTransactionFilter()
916+
fid := api.NewPendingTransactionFilter(nil)
858917
fids[i] = fid
859918
// Wait for at least one tx to arrive in filter
860919
for {
861-
txs, err := api.GetFilterChanges(fid)
920+
hashes, err := api.GetFilterChanges(fid)
862921
if err != nil {
863922
t.Fatalf("Filter should exist: %v\n", err)
864923
}
865-
if len(txs.([]*types.Transaction)) > 0 {
924+
if len(hashes.([]common.Hash)) > 0 {
866925
break
867926
}
868927
runtime.Gosched()

0 commit comments

Comments
 (0)