@@ -31,6 +31,7 @@ import (
3131 "github.com/ethereum/go-ethereum/core/rawdb"
3232 "github.com/ethereum/go-ethereum/core/types"
3333 "github.com/ethereum/go-ethereum/event"
34+ "github.com/ethereum/go-ethereum/log"
3435 "github.com/ethereum/go-ethereum/rpc"
3536)
3637
@@ -92,8 +93,21 @@ type EventSystem struct {
9293 backend Backend
9394 lightMode bool
9495 lastHead * types.Header
95- install chan * subscription // install filter for event notification
96- uninstall chan * subscription // remove filter for event notification
96+
97+ // Subscriptions
98+ txSub event.Subscription // Subscription for new transaction event
99+ logsSub event.Subscription // Subscription for new log event
100+ rmLogsSub event.Subscription // Subscription for removed log event
101+ chainSub event.Subscription // Subscription for new chain event
102+ pendingLogSub * event.TypeMuxSubscription // Subscription for pending log event
103+
104+ // Channels
105+ install chan * subscription // install filter for event notification
106+ uninstall chan * subscription // remove filter for event notification
107+ txCh chan core.TxPreEvent // Channel to receive new transaction event
108+ logsCh chan []* types.Log // Channel to receive new log event
109+ rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
110+ chainCh chan core.ChainEvent // Channel to receive new chain event
97111}
98112
99113// NewEventSystem creates a new manager that listens for event on the given mux,
@@ -109,10 +123,27 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
109123 lightMode : lightMode ,
110124 install : make (chan * subscription ),
111125 uninstall : make (chan * subscription ),
126+ txCh : make (chan core.TxPreEvent , txChanSize ),
127+ logsCh : make (chan []* types.Log , logsChanSize ),
128+ rmLogsCh : make (chan core.RemovedLogsEvent , rmLogsChanSize ),
129+ chainCh : make (chan core.ChainEvent , chainEvChanSize ),
112130 }
113131
114- go m .eventLoop ()
132+ // Subscribe events
133+ m .txSub = m .backend .SubscribeTxPreEvent (m .txCh )
134+ m .logsSub = m .backend .SubscribeLogsEvent (m .logsCh )
135+ m .rmLogsSub = m .backend .SubscribeRemovedLogsEvent (m .rmLogsCh )
136+ m .chainSub = m .backend .SubscribeChainEvent (m .chainCh )
137+ // TODO(rjl493456442): use feed to subscribe pending log event
138+ m .pendingLogSub = m .mux .Subscribe (core.PendingLogsEvent {})
139+
140+ // Make sure none of the subscriptions are empty
141+ if m .txSub == nil || m .logsSub == nil || m .rmLogsSub == nil || m .chainSub == nil ||
142+ m .pendingLogSub .Closed () {
143+ log .Crit ("Subscribe for event system failed" )
144+ }
115145
146+ go m .eventLoop ()
116147 return m
117148}
118149
@@ -412,50 +443,35 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
412443
413444// eventLoop (un)installs filters and processes mux events.
414445func (es * EventSystem ) eventLoop () {
415- var (
416- index = make (filterIndex )
417- sub = es .mux .Subscribe (core.PendingLogsEvent {})
418- // Subscribe TxPreEvent form txpool
419- txCh = make (chan core.TxPreEvent , txChanSize )
420- txSub = es .backend .SubscribeTxPreEvent (txCh )
421- // Subscribe RemovedLogsEvent
422- rmLogsCh = make (chan core.RemovedLogsEvent , rmLogsChanSize )
423- rmLogsSub = es .backend .SubscribeRemovedLogsEvent (rmLogsCh )
424- // Subscribe []*types.Log
425- logsCh = make (chan []* types.Log , logsChanSize )
426- logsSub = es .backend .SubscribeLogsEvent (logsCh )
427- // Subscribe ChainEvent
428- chainEvCh = make (chan core.ChainEvent , chainEvChanSize )
429- chainEvSub = es .backend .SubscribeChainEvent (chainEvCh )
430- )
431-
432- // Unsubscribe all events
433- defer sub .Unsubscribe ()
434- defer txSub .Unsubscribe ()
435- defer rmLogsSub .Unsubscribe ()
436- defer logsSub .Unsubscribe ()
437- defer chainEvSub .Unsubscribe ()
438-
446+ // Ensure all subscriptions get cleaned up
447+ defer func () {
448+ es .pendingLogSub .Unsubscribe ()
449+ es .txSub .Unsubscribe ()
450+ es .logsSub .Unsubscribe ()
451+ es .rmLogsSub .Unsubscribe ()
452+ es .chainSub .Unsubscribe ()
453+ }()
454+
455+ index := make (filterIndex )
439456 for i := UnknownSubscription ; i < LastIndexSubscription ; i ++ {
440457 index [i ] = make (map [rpc.ID ]* subscription )
441458 }
442459
443460 for {
444461 select {
445- case ev , active := <- sub .Chan ():
446- if ! active { // system stopped
447- return
448- }
449- es .broadcast (index , ev )
450-
451462 // Handle subscribed events
452- case ev := <- txCh :
463+ case ev := <- es . txCh :
453464 es .broadcast (index , ev )
454- case ev := <- rmLogsCh :
465+ case ev := <- es . logsCh :
455466 es .broadcast (index , ev )
456- case ev := <- logsCh :
467+ case ev := <- es . rmLogsCh :
457468 es .broadcast (index , ev )
458- case ev := <- chainEvCh :
469+ case ev := <- es .chainCh :
470+ es .broadcast (index , ev )
471+ case ev , active := <- es .pendingLogSub .Chan ():
472+ if ! active { // system stopped
473+ return
474+ }
459475 es .broadcast (index , ev )
460476
461477 case f := <- es .install :
@@ -467,6 +483,7 @@ func (es *EventSystem) eventLoop() {
467483 index [f.typ ][f.id ] = f
468484 }
469485 close (f .installed )
486+
470487 case f := <- es .uninstall :
471488 if f .typ == MinedAndPendingLogsSubscription {
472489 // the type are logs and pending logs subscriptions
@@ -478,13 +495,13 @@ func (es *EventSystem) eventLoop() {
478495 close (f .err )
479496
480497 // System stopped
481- case <- txSub .Err ():
498+ case <- es . txSub .Err ():
482499 return
483- case <- rmLogsSub .Err ():
500+ case <- es . logsSub .Err ():
484501 return
485- case <- logsSub .Err ():
502+ case <- es . rmLogsSub .Err ():
486503 return
487- case <- chainEvSub .Err ():
504+ case <- es . chainSub .Err ():
488505 return
489506 }
490507 }
0 commit comments