Skip to content

Commit 6c508fc

Browse files
committed
eth/filters: fix potential deadlock in filter timeout loop (ethereum#22178)
1 parent 04628e3 commit 6c508fc

File tree

4 files changed

+95
-15
lines changed

4 files changed

+95
-15
lines changed

eth/backend.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"runtime"
2525
"sync"
2626
"sync/atomic"
27+
"time"
2728

2829
"github.com/XinFinOrg/XDPoSChain/XDCx"
2930
"github.com/XinFinOrg/XDPoSChain/XDCxlending"
@@ -400,7 +401,7 @@ func (s *Ethereum) APIs() []rpc.API {
400401
}, {
401402
Namespace: "eth",
402403
Version: "1.0",
403-
Service: filters.NewPublicFilterAPI(s.ApiBackend, false),
404+
Service: filters.NewPublicFilterAPI(s.ApiBackend, false, 5*time.Minute),
404405
Public: true,
405406
}, {
406407
Namespace: "admin",

eth/filters/api.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ var (
4141
// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
4242
const maxTopics = 4
4343

44-
var (
45-
deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
46-
)
47-
4844
// filter is a helper struct that holds meta information over the filter type
4945
// and associated subscription in the event system.
5046
type filter struct {
@@ -66,38 +62,49 @@ type PublicFilterAPI struct {
6662
events *EventSystem
6763
filtersMu sync.Mutex
6864
filters map[rpc.ID]*filter
65+
timeout time.Duration
6966
}
7067

7168
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
72-
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
69+
func NewPublicFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *PublicFilterAPI {
7370
api := &PublicFilterAPI{
7471
backend: backend,
7572
chainDb: backend.ChainDb(),
7673
events: NewEventSystem(backend, lightMode),
7774
filters: make(map[rpc.ID]*filter),
75+
timeout: timeout,
7876
}
79-
go api.timeoutLoop()
77+
go api.timeoutLoop(timeout)
8078

8179
return api
8280
}
8381

8482
// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
8583
// Tt is started when the api is created.
86-
func (api *PublicFilterAPI) timeoutLoop() {
87-
ticker := time.NewTicker(5 * time.Minute)
84+
func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
85+
var toUninstall []*Subscription
86+
ticker := time.NewTicker(timeout)
8887
for {
8988
<-ticker.C
9089
api.filtersMu.Lock()
9190
for id, f := range api.filters {
9291
select {
9392
case <-f.deadline.C:
94-
f.s.Unsubscribe()
93+
toUninstall = append(toUninstall, f.s)
9594
delete(api.filters, id)
9695
default:
9796
continue
9897
}
9998
}
10099
api.filtersMu.Unlock()
100+
101+
// Unsubscribes are processed outside the lock to avoid the following scenario:
102+
// event loop attempts broadcasting events to still active filters while
103+
// Unsubscribe is waiting for it to process the uninstall request.
104+
for _, s := range toUninstall {
105+
s.Unsubscribe()
106+
}
107+
toUninstall = nil
101108
}
102109
}
103110

@@ -115,7 +122,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
115122
)
116123

117124
api.filtersMu.Lock()
118-
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
125+
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
119126
api.filtersMu.Unlock()
120127

121128
go func() {
@@ -185,7 +192,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
185192
)
186193

187194
api.filtersMu.Lock()
188-
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
195+
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub}
189196
api.filtersMu.Unlock()
190197

191198
go func() {
@@ -302,7 +309,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
302309
}
303310

304311
api.filtersMu.Lock()
305-
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub}
312+
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub}
306313
api.filtersMu.Unlock()
307314

308315
go func() {
@@ -431,7 +438,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
431438
// receive timer value and reset timer
432439
<-f.deadline.C
433440
}
434-
f.deadline.Reset(deadline)
441+
f.deadline.Reset(api.timeout)
435442

436443
switch f.typ {
437444
case PendingTransactionsSubscription, BlocksSubscription:

eth/filters/filter_system_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"math/big"
2323
"math/rand"
2424
"reflect"
25+
"runtime"
2526
"testing"
2627
"time"
2728

@@ -38,6 +39,10 @@ import (
3839
"github.com/XinFinOrg/XDPoSChain/rpc"
3940
)
4041

42+
var (
43+
deadline = 5 * time.Minute
44+
)
45+
4146
type testBackend struct {
4247
mux *event.TypeMux
4348
db ethdb.Database
@@ -587,6 +592,73 @@ func TestPendingLogsSubscription(t *testing.T) {
587592
}
588593
}
589594

595+
// TestPendingTxFilterDeadlock tests if the event loop hangs when pending
596+
// txes arrive at the same time that one of multiple filters is timing out.
597+
// Please refer to #22131 for more details.
598+
func TestPendingTxFilterDeadlock(t *testing.T) {
599+
t.Parallel()
600+
timeout := 100 * time.Millisecond
601+
602+
var (
603+
db = rawdb.NewMemoryDatabase()
604+
backend = &testBackend{db: db}
605+
api = NewFilterAPI(backend, false, timeout)
606+
done = make(chan struct{})
607+
)
608+
609+
go func() {
610+
// Bombard feed with txes until signal was received to stop
611+
i := uint64(0)
612+
for {
613+
select {
614+
case <-done:
615+
return
616+
default:
617+
}
618+
619+
tx := types.NewTransaction(i, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
620+
backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}})
621+
i++
622+
}
623+
}()
624+
625+
// Create a bunch of filters that will
626+
// timeout either in 100ms or 200ms
627+
fids := make([]rpc.ID, 20)
628+
for i := 0; i < len(fids); i++ {
629+
fid := api.NewPendingTransactionFilter()
630+
fids[i] = fid
631+
// Wait for at least one tx to arrive in filter
632+
for {
633+
hashes, err := api.GetFilterChanges(fid)
634+
if err != nil {
635+
t.Fatalf("Filter should exist: %v\n", err)
636+
}
637+
if len(hashes.([]common.Hash)) > 0 {
638+
break
639+
}
640+
runtime.Gosched()
641+
}
642+
}
643+
644+
// Wait until filters have timed out
645+
time.Sleep(3 * timeout)
646+
647+
// If tx loop doesn't consume `done` after a second
648+
// it's hanging.
649+
select {
650+
case done <- struct{}{}:
651+
// Check that all filters have been uninstalled
652+
for _, fid := range fids {
653+
if _, err := api.GetFilterChanges(fid); err == nil {
654+
t.Errorf("Filter %s should have been uninstalled\n", fid)
655+
}
656+
}
657+
case <-time.After(1 * time.Second):
658+
t.Error("Tx sending loop hangs")
659+
}
660+
}
661+
590662
func flattenLogs(pl [][]*types.Log) []*types.Log {
591663
var logs []*types.Log
592664
for _, l := range pl {

les/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (s *LightEthereum) APIs() []rpc.API {
190190
}, {
191191
Namespace: "eth",
192192
Version: "1.0",
193-
Service: filters.NewPublicFilterAPI(s.ApiBackend, true),
193+
Service: filters.NewPublicFilterAPI(s.ApiBackend, false, 5*time.Minute),
194194
Public: true,
195195
}, {
196196
Namespace: "net",

0 commit comments

Comments
 (0)