@@ -31,30 +31,32 @@ import (
31
31
// block, transaction and log events. The Filtering system can be used to listen
32
32
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
33
33
type FilterSystem struct {
34
- eventMux * event.TypeMux
35
-
36
34
filterMu sync.RWMutex
37
35
filterId int
38
36
filters map [int ]* Filter
39
37
created map [int ]time.Time
40
-
41
- quit chan struct {}
38
+ sub event.Subscription
42
39
}
43
40
44
41
// NewFilterSystem returns a newly allocated filter manager
45
42
func NewFilterSystem (mux * event.TypeMux ) * FilterSystem {
46
43
fs := & FilterSystem {
47
- eventMux : mux ,
48
- filters : make (map [int ]* Filter ),
49
- created : make (map [int ]time.Time ),
44
+ filters : make (map [int ]* Filter ),
45
+ created : make (map [int ]time.Time ),
50
46
}
47
+ fs .sub = mux .Subscribe (
48
+ //core.PendingBlockEvent{},
49
+ core.ChainEvent {},
50
+ core.TxPreEvent {},
51
+ vm .Logs (nil ),
52
+ )
51
53
go fs .filterLoop ()
52
54
return fs
53
55
}
54
56
55
57
// Stop quits the filter loop required for polling events
56
58
func (fs * FilterSystem ) Stop () {
57
- close ( fs .quit )
59
+ fs .sub . Unsubscribe ( )
58
60
}
59
61
60
62
// Add adds a filter to the filter manager
@@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter {
89
91
// filterLoop waits for specific events from ethereum and fires their handlers
90
92
// when the filter matches the requirements.
91
93
func (fs * FilterSystem ) filterLoop () {
92
- // Subscribe to events
93
- eventCh := fs .eventMux .Subscribe (
94
- //core.PendingBlockEvent{},
95
- core.ChainEvent {},
96
- core.TxPreEvent {},
97
- vm .Logs (nil ),
98
- ).Chan ()
99
-
100
- out:
101
- for {
102
- select {
103
- case <- fs .quit :
104
- break out
105
- case event , ok := <- eventCh :
106
- if ! ok {
107
- // Event subscription closed, set the channel to nil to stop spinning
108
- eventCh = nil
109
- continue
110
- }
111
- // A real event arrived, notify the registered filters
112
- switch ev := event .Data .(type ) {
113
- case core.ChainEvent :
114
- fs .filterMu .RLock ()
115
- for id , filter := range fs .filters {
116
- if filter .BlockCallback != nil && fs .created [id ].Before (event .Time ) {
117
- filter .BlockCallback (ev .Block , ev .Logs )
118
- }
94
+ for event := range fs .sub .Chan () {
95
+ switch ev := event .Data .(type ) {
96
+ case core.ChainEvent :
97
+ fs .filterMu .RLock ()
98
+ for id , filter := range fs .filters {
99
+ if filter .BlockCallback != nil && fs .created [id ].Before (event .Time ) {
100
+ filter .BlockCallback (ev .Block , ev .Logs )
119
101
}
120
- fs .filterMu .RUnlock ()
102
+ }
103
+ fs .filterMu .RUnlock ()
121
104
122
- case core.TxPreEvent :
123
- fs .filterMu .RLock ()
124
- for id , filter := range fs .filters {
125
- if filter .TransactionCallback != nil && fs .created [id ].Before (event .Time ) {
126
- filter .TransactionCallback (ev .Tx )
127
- }
105
+ case core.TxPreEvent :
106
+ fs .filterMu .RLock ()
107
+ for id , filter := range fs .filters {
108
+ if filter .TransactionCallback != nil && fs .created [id ].Before (event .Time ) {
109
+ filter .TransactionCallback (ev .Tx )
128
110
}
129
- fs . filterMu . RUnlock ()
130
-
131
- case vm. Logs :
132
- fs . filterMu . RLock ()
133
- for id , filter := range fs .filters {
134
- if filter . LogsCallback != nil && fs .created [ id ]. Before ( event . Time ) {
135
- msgs := filter . FilterLogs ( ev )
136
- if len ( msgs ) > 0 {
137
- filter . LogsCallback (msgs )
138
- }
111
+ }
112
+ fs . filterMu . RUnlock ()
113
+
114
+ case vm. Logs :
115
+ fs .filterMu . RLock ()
116
+ for id , filter := range fs .filters {
117
+ if filter . LogsCallback != nil && fs . created [ id ]. Before ( event . Time ) {
118
+ msgs := filter . FilterLogs ( ev )
119
+ if len (msgs ) > 0 {
120
+ filter . LogsCallback ( msgs )
139
121
}
140
122
}
141
- fs .filterMu .RUnlock ()
142
123
}
124
+ fs .filterMu .RUnlock ()
143
125
}
144
126
}
145
127
}
0 commit comments