Skip to content

Commit 014d8d9

Browse files
committed
whisper: message filtering optimized
1 parent 4c845bd commit 014d8d9

File tree

1 file changed

+55
-11
lines changed

1 file changed

+55
-11
lines changed

whisper/whisperv6/filter.go

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,29 @@ type Filter struct {
3535
PoW float64 // Proof of work as described in the Whisper spec
3636
AllowP2P bool // Indicates whether this filter is interested in direct peer-to-peer messages
3737
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
38+
id string // unique identifier
3839

3940
Messages map[common.Hash]*ReceivedMessage
4041
mutex sync.RWMutex
4142
}
4243

4344
// Filters represents a collection of filters
4445
type Filters struct {
45-
watchers map[string]*Filter
46-
whisper *Whisper
47-
mutex sync.RWMutex
46+
watchers map[string]*Filter
47+
topicMatcher map[TopicType]map[*Filter]struct{}
48+
allTopicsMatcher map[*Filter]struct{}
49+
50+
whisper *Whisper
51+
mutex sync.RWMutex
4852
}
4953

5054
// NewFilters returns a newly created filter collection
5155
func NewFilters(w *Whisper) *Filters {
5256
return &Filters{
53-
watchers: make(map[string]*Filter),
54-
whisper: w,
57+
watchers: make(map[string]*Filter),
58+
topicMatcher: make(map[TopicType]map[*Filter]struct{}),
59+
allTopicsMatcher: make(map[*Filter]struct{}),
60+
whisper: w,
5561
}
5662
}
5763

@@ -81,7 +87,9 @@ func (fs *Filters) Install(watcher *Filter) (string, error) {
8187
watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
8288
}
8389

90+
watcher.id = id
8491
fs.watchers[id] = watcher
92+
fs.addTopicMatcher(watcher)
8593
return id, err
8694
}
8795

@@ -91,12 +99,49 @@ func (fs *Filters) Uninstall(id string) bool {
9199
fs.mutex.Lock()
92100
defer fs.mutex.Unlock()
93101
if fs.watchers[id] != nil {
102+
fs.removeFromTopicMatchers(fs.watchers[id])
94103
delete(fs.watchers, id)
95104
return true
96105
}
97106
return false
98107
}
99108

109+
// addTopicMatcher adds a filter to the topic matchers
110+
func (fs *Filters) addTopicMatcher(watcher *Filter) {
111+
if len(watcher.Topics) == 0 {
112+
fs.allTopicsMatcher[watcher] = struct{}{}
113+
} else {
114+
for _, t := range watcher.Topics {
115+
topic := BytesToTopic(t)
116+
if fs.topicMatcher[topic] == nil {
117+
fs.topicMatcher[topic] = make(map[*Filter]struct{})
118+
}
119+
fs.topicMatcher[topic][watcher] = struct{}{}
120+
}
121+
}
122+
}
123+
124+
// removeFromTopicMatchers removes a filter from the topic matchers
125+
func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
126+
delete(fs.allTopicsMatcher, watcher)
127+
for _, topic := range watcher.Topics {
128+
delete(fs.topicMatcher[BytesToTopic(topic)], watcher)
129+
}
130+
}
131+
132+
// getWatchersByTopic returns a slice containing the filters that
133+
// match a specific topic
134+
func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter {
135+
res := make([]*Filter, 0, len(fs.allTopicsMatcher))
136+
for watcher, _ := range fs.allTopicsMatcher {
137+
res = append(res, watcher)
138+
}
139+
for watcher, _ := range fs.topicMatcher[topic] {
140+
res = append(res, watcher)
141+
}
142+
return res
143+
}
144+
100145
// Get returns a filter from the collection with a specific ID
101146
func (fs *Filters) Get(id string) *Filter {
102147
fs.mutex.RLock()
@@ -112,11 +157,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
112157
fs.mutex.RLock()
113158
defer fs.mutex.RUnlock()
114159

115-
i := -1 // only used for logging info
116-
for _, watcher := range fs.watchers {
117-
i++
160+
candidates := fs.getWatchersByTopic(env.Topic)
161+
for _, watcher := range candidates {
118162
if p2pMessage && !watcher.AllowP2P {
119-
log.Trace(fmt.Sprintf("msg [%x], filter [%d]: p2p messages are not allowed", env.Hash(), i))
163+
log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id))
120164
continue
121165
}
122166

@@ -128,10 +172,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
128172
if match {
129173
msg = env.Open(watcher)
130174
if msg == nil {
131-
log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i)
175+
log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", watcher.id)
132176
}
133177
} else {
134-
log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i)
178+
log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", watcher.id)
135179
}
136180
}
137181

0 commit comments

Comments
 (0)