Skip to content

Commit acaf943

Browse files
JukLee0iragzliudan
authored andcommitted
eth/filters: fix potential deadlock in filter timeout loop (ethereum#22178)
1 parent 83782e5 commit acaf943

File tree

4 files changed

+102
-22
lines changed

4 files changed

+102
-22
lines changed

eth/backend.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"runtime"
2525
"sync"
2626
"sync/atomic"
27+
"time"
2728

2829
"github.com/XinFinOrg/XDPoSChain/XDCx"
2930
"github.com/XinFinOrg/XDPoSChain/XDCxlending"
@@ -405,7 +406,7 @@ func (s *Ethereum) APIs() []rpc.API {
405406
}, {
406407
Namespace: "eth",
407408
Version: "1.0",
408-
Service: filters.NewPublicFilterAPI(s.ApiBackend, false),
409+
Service: filters.NewPublicFilterAPI(s.ApiBackend, false, 5*time.Minute),
409410
Public: true,
410411
}, {
411412
Namespace: "admin",

eth/filters/api.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ var (
4141
// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
4242
const maxTopics = 4
4343

44-
var (
45-
deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
46-
)
47-
4844
// filter is a helper struct that holds meta information over the filter type
4945
// and associated subscription in the event system.
5046
type filter struct {
@@ -66,38 +62,49 @@ type PublicFilterAPI struct {
6662
events *EventSystem
6763
filtersMu sync.Mutex
6864
filters map[rpc.ID]*filter
65+
timeout time.Duration
6966
}
7067

7168
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
72-
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
69+
func NewPublicFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *PublicFilterAPI {
7370
api := &PublicFilterAPI{
7471
backend: backend,
7572
chainDb: backend.ChainDb(),
7673
events: NewEventSystem(backend, lightMode),
7774
filters: make(map[rpc.ID]*filter),
75+
timeout: timeout,
7876
}
79-
go api.timeoutLoop()
77+
go api.timeoutLoop(timeout)
8078

8179
return api
8280
}
8381

8482
// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
8583
// Tt is started when the api is created.
86-
func (api *PublicFilterAPI) timeoutLoop() {
87-
ticker := time.NewTicker(5 * time.Minute)
84+
func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
85+
var toUninstall []*Subscription
86+
ticker := time.NewTicker(timeout)
8887
for {
8988
<-ticker.C
9089
api.filtersMu.Lock()
9190
for id, f := range api.filters {
9291
select {
9392
case <-f.deadline.C:
94-
f.s.Unsubscribe()
93+
toUninstall = append(toUninstall, f.s)
9594
delete(api.filters, id)
9695
default:
9796
continue
9897
}
9998
}
10099
api.filtersMu.Unlock()
100+
101+
// Unsubscribes are processed outside the lock to avoid the following scenario:
102+
// event loop attempts broadcasting events to still active filters while
103+
// Unsubscribe is waiting for it to process the uninstall request.
104+
for _, s := range toUninstall {
105+
s.Unsubscribe()
106+
}
107+
toUninstall = nil
101108
}
102109
}
103110

@@ -115,7 +122,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
115122
)
116123

117124
api.filtersMu.Lock()
118-
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
125+
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
119126
api.filtersMu.Unlock()
120127

121128
go func() {
@@ -185,7 +192,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
185192
)
186193

187194
api.filtersMu.Lock()
188-
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
195+
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub}
189196
api.filtersMu.Unlock()
190197

191198
go func() {
@@ -302,7 +309,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
302309
}
303310

304311
api.filtersMu.Lock()
305-
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub}
312+
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub}
306313
api.filtersMu.Unlock()
307314

308315
go func() {
@@ -431,7 +438,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
431438
// receive timer value and reset timer
432439
<-f.deadline.C
433440
}
434-
f.deadline.Reset(deadline)
441+
f.deadline.Reset(api.timeout)
435442

436443
switch f.typ {
437444
case PendingTransactionsSubscription, BlocksSubscription:

eth/filters/filter_system_test.go

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"math/big"
2323
"math/rand"
2424
"reflect"
25+
"runtime"
2526
"testing"
2627
"time"
2728

@@ -38,6 +39,10 @@ import (
3839
"github.com/XinFinOrg/XDPoSChain/rpc"
3940
)
4041

42+
var (
43+
deadline = 5 * time.Minute
44+
)
45+
4146
type testBackend struct {
4247
mux *event.TypeMux
4348
db ethdb.Database
@@ -149,7 +154,7 @@ func TestBlockSubscription(t *testing.T) {
149154
var (
150155
db = rawdb.NewMemoryDatabase()
151156
backend = &testBackend{db: db}
152-
api = NewPublicFilterAPI(backend, false)
157+
api = NewPublicFilterAPI(backend, false, deadline)
153158
genesis = new(core.Genesis).MustCommit(db)
154159
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
155160
chainEvents = []core.ChainEvent{}
@@ -201,7 +206,7 @@ func TestPendingTxFilter(t *testing.T) {
201206
var (
202207
db = rawdb.NewMemoryDatabase()
203208
backend = &testBackend{db: db}
204-
api = NewPublicFilterAPI(backend, false)
209+
api = NewPublicFilterAPI(backend, false, deadline)
205210

206211
transactions = []*types.Transaction{
207212
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
@@ -256,7 +261,7 @@ func TestLogFilterCreation(t *testing.T) {
256261
var (
257262
db = rawdb.NewMemoryDatabase()
258263
backend = &testBackend{db: db}
259-
api = NewPublicFilterAPI(backend, false)
264+
api = NewPublicFilterAPI(backend, false, deadline)
260265

261266
testCases = []struct {
262267
crit FilterCriteria
@@ -300,7 +305,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {
300305
var (
301306
db = rawdb.NewMemoryDatabase()
302307
backend = &testBackend{db: db}
303-
api = NewPublicFilterAPI(backend, false)
308+
api = NewPublicFilterAPI(backend, false, deadline)
304309
)
305310

306311
// different situations where log filter creation should fail.
@@ -322,7 +327,7 @@ func TestInvalidGetLogsRequest(t *testing.T) {
322327
var (
323328
db = rawdb.NewMemoryDatabase()
324329
backend = &testBackend{db: db}
325-
api = NewPublicFilterAPI(backend, false)
330+
api = NewPublicFilterAPI(backend, false, deadline)
326331
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
327332
)
328333

@@ -347,7 +352,7 @@ func TestLogFilter(t *testing.T) {
347352
var (
348353
db = rawdb.NewMemoryDatabase()
349354
backend = &testBackend{db: db}
350-
api = NewPublicFilterAPI(backend, false)
355+
api = NewPublicFilterAPI(backend, false, deadline)
351356

352357
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
353358
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -461,7 +466,7 @@ func TestPendingLogsSubscription(t *testing.T) {
461466
var (
462467
db = rawdb.NewMemoryDatabase()
463468
backend = &testBackend{db: db}
464-
api = NewPublicFilterAPI(backend, false)
469+
api = NewPublicFilterAPI(backend, false, deadline)
465470

466471
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
467472
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -587,6 +592,73 @@ func TestPendingLogsSubscription(t *testing.T) {
587592
}
588593
}
589594

595+
// TestPendingTxFilterDeadlock tests if the event loop hangs when pending
596+
// txes arrive at the same time that one of multiple filters is timing out.
597+
// Please refer to #22131 for more details.
598+
func TestPendingTxFilterDeadlock(t *testing.T) {
599+
t.Parallel()
600+
timeout := 100 * time.Millisecond
601+
602+
var (
603+
db = rawdb.NewMemoryDatabase()
604+
backend = &testBackend{db: db}
605+
api = NewFilterAPI(backend, false, timeout)
606+
done = make(chan struct{})
607+
)
608+
609+
go func() {
610+
// Bombard feed with txes until signal was received to stop
611+
i := uint64(0)
612+
for {
613+
select {
614+
case <-done:
615+
return
616+
default:
617+
}
618+
619+
tx := types.NewTransaction(i, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
620+
backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}})
621+
i++
622+
}
623+
}()
624+
625+
// Create a bunch of filters that will
626+
// timeout either in 100ms or 200ms
627+
fids := make([]rpc.ID, 20)
628+
for i := 0; i < len(fids); i++ {
629+
fid := api.NewPendingTransactionFilter()
630+
fids[i] = fid
631+
// Wait for at least one tx to arrive in filter
632+
for {
633+
hashes, err := api.GetFilterChanges(fid)
634+
if err != nil {
635+
t.Fatalf("Filter should exist: %v\n", err)
636+
}
637+
if len(hashes.([]common.Hash)) > 0 {
638+
break
639+
}
640+
runtime.Gosched()
641+
}
642+
}
643+
644+
// Wait until filters have timed out
645+
time.Sleep(3 * timeout)
646+
647+
// If tx loop doesn't consume `done` after a second
648+
// it's hanging.
649+
select {
650+
case done <- struct{}{}:
651+
// Check that all filters have been uninstalled
652+
for _, fid := range fids {
653+
if _, err := api.GetFilterChanges(fid); err == nil {
654+
t.Errorf("Filter %s should have been uninstalled\n", fid)
655+
}
656+
}
657+
case <-time.After(1 * time.Second):
658+
t.Error("Tx sending loop hangs")
659+
}
660+
}
661+
590662
func flattenLogs(pl [][]*types.Log) []*types.Log {
591663
var logs []*types.Log
592664
for _, l := range pl {

les/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (s *LightEthereum) APIs() []rpc.API {
190190
}, {
191191
Namespace: "eth",
192192
Version: "1.0",
193-
Service: filters.NewPublicFilterAPI(s.ApiBackend, true),
193+
Service: filters.NewPublicFilterAPI(s.ApiBackend, true, 5*time.Minute),
194194
Public: true,
195195
}, {
196196
Namespace: "net",

0 commit comments

Comments
 (0)