Skip to content

Commit 83782e5

Browse files
JukLee0iragzliudan
authored andcommitted
eth/filters: remove use of event.TypeMux for pending logs (ethereum#20312)
1 parent 99c2e18 commit 83782e5

File tree

13 files changed

+247
-256
lines changed

13 files changed

+247
-256
lines changed

accounts/abi/bind/backends/simulated.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi
126126
database: database,
127127
blockchain: blockchain,
128128
config: genesis.Config,
129-
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
129+
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
130130
}
131131
blockchain.Client = backend
132132
backend.rollback()
@@ -146,7 +146,7 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
146146
database: database,
147147
blockchain: blockchain,
148148
config: genesis.Config,
149-
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
149+
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
150150
}
151151
backend.rollback()
152152
return backend
@@ -558,22 +558,34 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty
558558
}
559559

560560
func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
561-
return event.NewSubscription(func(quit <-chan struct{}) error {
562-
<-quit
563-
return nil
564-
})
561+
return nullSubscription()
565562
}
563+
566564
func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
567565
return fb.bc.SubscribeChainEvent(ch)
568566
}
567+
569568
func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
570569
return fb.bc.SubscribeRemovedLogsEvent(ch)
571570
}
571+
572572
func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
573573
return fb.bc.SubscribeLogsEvent(ch)
574574
}
575575

576+
func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
577+
return nullSubscription()
578+
}
579+
576580
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
581+
577582
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
578583
panic("not supported")
579584
}
585+
586+
func nullSubscription() event.Subscription {
587+
return event.NewSubscription(func(quit <-chan struct{}) error {
588+
<-quit
589+
return nil
590+
})
591+
}

core/events.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,6 @@ type OrderTxPreEvent struct{ Tx *types.OrderTransaction }
3030
// LendingTxPreEvent is posted when a order transaction enters the order transaction pool.
3131
type LendingTxPreEvent struct{ Tx *types.LendingTransaction }
3232

33-
// PendingLogsEvent is posted pre mining and notifies of pending logs.
34-
type PendingLogsEvent struct {
35-
Logs []*types.Log
36-
}
37-
3833
// PendingStateEvent is posted pre mining and notifies of pending state changes.
3934
type PendingStateEvent struct{}
4035

eth/api_backend.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
258258
return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
259259
}
260260

261+
func (b *EthApiBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
262+
return b.eth.miner.SubscribePendingLogs(ch)
263+
}
264+
261265
func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
262266
return b.eth.BlockChain().SubscribeChainEvent(ch)
263267
}

eth/filters/api.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,8 @@ type PublicFilterAPI struct {
7272
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
7373
api := &PublicFilterAPI{
7474
backend: backend,
75-
mux: backend.EventMux(),
7675
chainDb: backend.ChainDb(),
77-
events: NewEventSystem(backend.EventMux(), backend, lightMode),
76+
events: NewEventSystem(backend, lightMode),
7877
filters: make(map[rpc.ID]*filter),
7978
}
8079
go api.timeoutLoop()
@@ -439,7 +438,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
439438
hashes := f.hashes
440439
f.hashes = nil
441440
return returnHashes(hashes), nil
442-
case LogsSubscription:
441+
case LogsSubscription, MinedAndPendingLogsSubscription:
443442
logs := f.logs
444443
f.logs = nil
445444
return returnLogs(logs), nil

eth/filters/bench_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
3131
"github.com/XinFinOrg/XDPoSChain/core/types"
3232
"github.com/XinFinOrg/XDPoSChain/ethdb"
33-
"github.com/XinFinOrg/XDPoSChain/event"
3433
"github.com/XinFinOrg/XDPoSChain/node"
3534
)
3635

@@ -124,14 +123,13 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
124123

125124
b.Log("Running filter benchmarks...")
126125
start = time.Now()
127-
mux := new(event.TypeMux)
128126
var backend *testBackend
129127

130128
for i := 0; i < benchFilterCnt; i++ {
131129
if i%20 == 0 {
132130
db.Close()
133131
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
134-
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
132+
backend = &testBackend{db: db, sections: cnt}
135133
}
136134
var addr common.Address
137135
addr[0] = byte(i)
@@ -190,8 +188,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
190188

191189
b.Log("Running filter benchmarks...")
192190
start := time.Now()
193-
mux := new(event.TypeMux)
194-
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
191+
backend := &testBackend{db: db}
195192
filter := NewRangeFilter(backend, 0, int64(headNum), []common.Address{{}}, nil)
196193
filter.Logs(context.Background())
197194
d := time.Since(start)

eth/filters/filter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232

3333
type Backend interface {
3434
ChainDb() ethdb.Database
35-
EventMux() *event.TypeMux
3635
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
3736
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
3837
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
@@ -42,6 +41,7 @@ type Backend interface {
4241
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
4342
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
4443
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
44+
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
4545

4646
BloomStatus() (uint64, uint64)
4747
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)

eth/filters/filter_system.go

Lines changed: 79 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,6 @@ const (
6767
chainEvChanSize = 10
6868
)
6969

70-
var (
71-
ErrInvalidSubscriptionID = errors.New("invalid id")
72-
)
73-
7470
type subscription struct {
7571
id rpc.ID
7672
typ Type
@@ -86,25 +82,25 @@ type subscription struct {
8682
// EventSystem creates subscriptions, processes events and broadcasts them to the
8783
// subscription which match the subscription criteria.
8884
type EventSystem struct {
89-
mux *event.TypeMux
9085
backend Backend
9186
lightMode bool
9287
lastHead *types.Header
9388

9489
// Subscriptions
95-
txsSub event.Subscription // Subscription for new transaction event
96-
logsSub event.Subscription // Subscription for new log event
97-
rmLogsSub event.Subscription // Subscription for removed log event
98-
chainSub event.Subscription // Subscription for new chain event
99-
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
90+
txsSub event.Subscription // Subscription for new transaction event
91+
logsSub event.Subscription // Subscription for new log event
92+
rmLogsSub event.Subscription // Subscription for removed log event
93+
pendingLogsSub event.Subscription // Subscription for pending log event
94+
chainSub event.Subscription // Subscription for new chain event
10095

10196
// Channels
102-
install chan *subscription // install filter for event notification
103-
uninstall chan *subscription // remove filter for event notification
104-
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
105-
logsCh chan []*types.Log // Channel to receive new log event
106-
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
107-
chainCh chan core.ChainEvent // Channel to receive new chain event
97+
install chan *subscription // install filter for event notification
98+
uninstall chan *subscription // remove filter for event notification
99+
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
100+
logsCh chan []*types.Log // Channel to receive new log event
101+
pendingLogsCh chan []*types.Log // Channel to receive new log event
102+
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
103+
chainCh chan core.ChainEvent // Channel to receive new chain event
108104
}
109105

110106
// NewEventSystem creates a new manager that listens for event on the given mux,
@@ -113,30 +109,28 @@ type EventSystem struct {
113109
//
114110
// The returned manager has a loop that needs to be stopped with the Stop function
115111
// or by stopping the given mux.
116-
func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
112+
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
117113
m := &EventSystem{
118-
mux: mux,
119-
backend: backend,
120-
lightMode: lightMode,
121-
install: make(chan *subscription),
122-
uninstall: make(chan *subscription),
123-
txsCh: make(chan core.NewTxsEvent, txChanSize),
124-
logsCh: make(chan []*types.Log, logsChanSize),
125-
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
126-
chainCh: make(chan core.ChainEvent, chainEvChanSize),
114+
backend: backend,
115+
lightMode: lightMode,
116+
install: make(chan *subscription),
117+
uninstall: make(chan *subscription),
118+
txsCh: make(chan core.NewTxsEvent, txChanSize),
119+
logsCh: make(chan []*types.Log, logsChanSize),
120+
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
121+
pendingLogsCh: make(chan []*types.Log, logsChanSize),
122+
chainCh: make(chan core.ChainEvent, chainEvChanSize),
127123
}
128124

129125
// Subscribe events
130126
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
131127
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
132128
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
133129
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
134-
// TODO(rjl493456442): use feed to subscribe pending log event
135-
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
130+
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
136131

137132
// Make sure none of the subscriptions are empty
138-
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
139-
m.pendingLogSub.Closed() {
133+
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
140134
log.Crit("Subscribe for event system failed")
141135
}
142136

@@ -314,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
314308

315309
type filterIndex map[Type]map[rpc.ID]*subscription
316310

317-
// broadcast event to filters that match criteria.
318-
func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
319-
if ev == nil {
311+
func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
312+
if len(ev) == 0 {
320313
return
321314
}
315+
for _, f := range filters[LogsSubscription] {
316+
matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
317+
if len(matchedLogs) > 0 {
318+
f.logs <- matchedLogs
319+
}
320+
}
321+
}
322322

323-
switch e := ev.(type) {
324-
case []*types.Log:
325-
if len(e) > 0 {
326-
for _, f := range filters[LogsSubscription] {
327-
if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
328-
f.logs <- matchedLogs
329-
}
330-
}
323+
func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
324+
if len(ev) == 0 {
325+
return
326+
}
327+
for _, f := range filters[PendingLogsSubscription] {
328+
matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
329+
if len(matchedLogs) > 0 {
330+
f.logs <- matchedLogs
331331
}
332-
case core.RemovedLogsEvent:
333-
for _, f := range filters[LogsSubscription] {
334-
if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
335-
f.logs <- matchedLogs
336-
}
332+
}
333+
}
334+
335+
func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
336+
for _, f := range filters[LogsSubscription] {
337+
matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
338+
if len(matchedLogs) > 0 {
339+
f.logs <- matchedLogs
337340
}
338-
case *event.TypeMuxEvent:
339-
if muxe, ok := e.Data.(core.PendingLogsEvent); ok {
340-
for _, f := range filters[PendingLogsSubscription] {
341-
if e.Time.After(f.created) {
342-
if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
343-
f.logs <- matchedLogs
344-
}
341+
}
342+
}
343+
344+
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
345+
hashes := make([]common.Hash, 0, len(ev.Txs))
346+
for _, tx := range ev.Txs {
347+
hashes = append(hashes, tx.Hash())
348+
}
349+
for _, f := range filters[PendingTransactionsSubscription] {
350+
f.hashes <- hashes
351+
}
352+
}
353+
354+
func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
355+
for _, f := range filters[BlocksSubscription] {
356+
f.headers <- ev.Block.Header()
357+
}
358+
if es.lightMode && len(filters[LogsSubscription]) > 0 {
359+
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
360+
for _, f := range filters[LogsSubscription] {
361+
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
362+
f.logs <- matchedLogs
345363
}
346364
}
347-
}
348-
case core.NewTxsEvent:
349-
hashes := make([]common.Hash, 0, len(e.Txs))
350-
for _, tx := range e.Txs {
351-
hashes = append(hashes, tx.Hash())
352-
}
353-
for _, f := range filters[PendingTransactionsSubscription] {
354-
f.hashes <- hashes
355-
}
356-
case core.ChainEvent:
357-
for _, f := range filters[BlocksSubscription] {
358-
f.headers <- e.Block.Header()
359-
}
360-
if es.lightMode && len(filters[LogsSubscription]) > 0 {
361-
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
362-
for _, f := range filters[LogsSubscription] {
363-
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
364-
f.logs <- matchedLogs
365-
}
366-
}
367-
})
368-
}
365+
})
369366
}
370367
}
371368

@@ -446,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
446443
func (es *EventSystem) eventLoop() {
447444
// Ensure all subscriptions get cleaned up
448445
defer func() {
449-
es.pendingLogSub.Unsubscribe()
450446
es.txsSub.Unsubscribe()
451447
es.logsSub.Unsubscribe()
452448
es.rmLogsSub.Unsubscribe()
449+
es.pendingLogsSub.Unsubscribe()
453450
es.chainSub.Unsubscribe()
454451
}()
455452

@@ -460,20 +457,16 @@ func (es *EventSystem) eventLoop() {
460457

461458
for {
462459
select {
463-
// Handle subscribed events
464460
case ev := <-es.txsCh:
465-
es.broadcast(index, ev)
461+
es.handleTxsEvent(index, ev)
466462
case ev := <-es.logsCh:
467-
es.broadcast(index, ev)
463+
es.handleLogs(index, ev)
468464
case ev := <-es.rmLogsCh:
469-
es.broadcast(index, ev)
465+
es.handleRemovedLogs(index, ev)
466+
case ev := <-es.pendingLogsCh:
467+
es.handlePendingLogs(index, ev)
470468
case ev := <-es.chainCh:
471-
es.broadcast(index, ev)
472-
case ev, active := <-es.pendingLogSub.Chan():
473-
if !active { // system stopped
474-
return
475-
}
476-
es.broadcast(index, ev)
469+
es.handleChainEvent(index, ev)
477470

478471
case f := <-es.install:
479472
if f.typ == MinedAndPendingLogsSubscription {

0 commit comments

Comments
 (0)