Skip to content

Commit 16371e8

Browse files
committed
Replace publishfilter against a variable implementation
1 parent 9309614 commit 16371e8

File tree

8 files changed

+119
-85
lines changed

8 files changed

+119
-85
lines changed

router/broker.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ type subscription struct {
4141
subscribers map[*session]struct{}
4242
}
4343

44+
// FilterFactory is a function which creates a PublishFilter from a publication
45+
type FilterFactory func(msg *wamp.Publish) PublishFilter
46+
4447
type Broker struct {
4548
// topic -> subscription
4649
topicSubscription map[wamp.URI]*subscription
@@ -61,15 +64,19 @@ type Broker struct {
6164
strictURI bool
6265
allowDisclose bool
6366

64-
log stdlog.StdLog
65-
debug bool
67+
log stdlog.StdLog
68+
debug bool
69+
filterFactory FilterFactory
6670
}
6771

6872
// NewBroker returns a new default broker implementation instance.
69-
func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *Broker {
73+
func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool, publishFilter FilterFactory) *Broker {
7074
if logger == nil {
7175
panic("logger is nil")
7276
}
77+
if publishFilter == nil {
78+
publishFilter = NewSimplePublishFilter
79+
}
7380
b := &Broker{
7481
topicSubscription: map[wamp.URI]*subscription{},
7582
pfxTopicSubscription: map[wamp.URI]*subscription{},
@@ -88,8 +95,9 @@ func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *Brok
8895
strictURI: strictURI,
8996
allowDisclose: allowDisclose,
9097

91-
log: logger,
92-
debug: debug,
98+
log: logger,
99+
debug: debug,
100+
filterFactory: publishFilter,
93101
}
94102
go b.run()
95103
return b
@@ -166,7 +174,7 @@ func (b *Broker) Publish(pub *session, msg *wamp.Publish) {
166174
pubID := wamp.GlobalID()
167175

168176
// Get blacklists and whitelists, if any, from publish message.
169-
filter := newPublishFilter(msg)
177+
filter := b.filterFactory(msg)
170178

171179
b.actionChan <- func() {
172180
b.publish(pub, msg, pubID, excludePub, disclose, filter)
@@ -252,7 +260,7 @@ func (b *Broker) run() {
252260
}
253261
}
254262

255-
func (b *Broker) publish(pub *session, msg *wamp.Publish, pubID wamp.ID, excludePub, disclose bool, filter *publishFilter) {
263+
func (b *Broker) publish(pub *session, msg *wamp.Publish, pubID wamp.ID, excludePub, disclose bool, filter PublishFilter) {
256264
// Publish to subscribers with exact match.
257265
if sub, ok := b.topicSubscription[msg.Topic]; ok {
258266
b.pubEvent(pub, msg, pubID, sub, excludePub, false, disclose, filter)
@@ -449,17 +457,28 @@ func (b *Broker) removeSession(subscriber *session) {
449457
}
450458
}
451459

460+
func allowPublish(sub *session, filter PublishFilter) bool {
461+
if filter == nil {
462+
return true
463+
}
464+
if filter.LockRequired() {
465+
sub.RLock()
466+
defer sub.RUnlock()
467+
}
468+
return filter.PublishAllowed(&sub.Session)
469+
}
470+
452471
// pubEvent sends an event to all subscribers that are not excluded from
453472
// receiving the event.
454-
func (b *Broker) pubEvent(pub *session, msg *wamp.Publish, pubID wamp.ID, sub *subscription, excludePublisher, sendTopic, disclose bool, filter *publishFilter) {
455-
for subscriber := range sub.subscribers {
473+
func (b *Broker) pubEvent(pub *session, msg *wamp.Publish, pubID wamp.ID, sub *subscription, excludePublisher, sendTopic, disclose bool, filter PublishFilter) {
474+
for subscriber, _ := range sub.subscribers {
456475
// Do not send event to publisher.
457476
if subscriber == pub && excludePublisher {
458477
continue
459478
}
460479

461480
// Check if receiver is restricted.
462-
if filter != nil && !filter.publishAllowed(subscriber) {
481+
if !allowPublish(subscriber, filter) {
463482
continue
464483
}
465484

@@ -591,13 +610,13 @@ func disclosePublisher(pub *session, details wamp.Dict) {
591610
details[rolePub] = pub.ID
592611
// These values are not required by the specification, but are here for
593612
// compatibility with Crossbar.
594-
pub.rLock()
613+
pub.RLock()
595614
for _, f := range []string{"authid", "authrole"} {
596615
if val, ok := pub.Details[f]; ok {
597616
details[fmt.Sprintf("%s_%s", rolePub, f)] = val
598617
}
599618
}
600-
pub.rUnlock()
619+
pub.RUnlock()
601620
}
602621

603622
// ----- Subscription Meta Procedure Handlers -----

router/broker_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (p *testPeer) Close() { return }
3636

3737
func TestBasicSubscribe(t *testing.T) {
3838
// Test subscribing to a topic.
39-
broker := NewBroker(logger, false, true, debug)
39+
broker := NewBroker(logger, false, true, debug, nil)
4040
subscriber := newTestPeer()
4141
sess := newSession(subscriber, 0, nil)
4242
testTopic := wamp.URI("nexus.test.topic")
@@ -142,7 +142,7 @@ func TestBasicSubscribe(t *testing.T) {
142142
}
143143

144144
func TestUnsubscribe(t *testing.T) {
145-
broker := NewBroker(logger, false, true, debug)
145+
broker := NewBroker(logger, false, true, debug, nil)
146146
testTopic := wamp.URI("nexus.test.topic")
147147

148148
// Subscribe session1 to topic
@@ -250,7 +250,7 @@ func TestUnsubscribe(t *testing.T) {
250250

251251
func TestRemove(t *testing.T) {
252252
// Subscribe to topic
253-
broker := NewBroker(logger, false, true, debug)
253+
broker := NewBroker(logger, false, true, debug, nil)
254254
subscriber := newTestPeer()
255255
sess := newSession(subscriber, 0, nil)
256256
testTopic := wamp.URI("nexus.test.topic")
@@ -292,7 +292,7 @@ func TestRemove(t *testing.T) {
292292
}
293293

294294
func TestBasicPubSub(t *testing.T) {
295-
broker := NewBroker(logger, false, true, debug)
295+
broker := NewBroker(logger, false, true, debug, nil)
296296
subscriber := newTestPeer()
297297
sess := newSession(subscriber, 0, nil)
298298
testTopic := wamp.URI("nexus.test.topic")
@@ -331,7 +331,7 @@ func TestBasicPubSub(t *testing.T) {
331331

332332
func TestPrefxPatternBasedSubscription(t *testing.T) {
333333
// Test match=prefix
334-
broker := NewBroker(logger, false, true, debug)
334+
broker := NewBroker(logger, false, true, debug, nil)
335335
subscriber := newTestPeer()
336336
sess := newSession(subscriber, 0, nil)
337337
testTopic := wamp.URI("nexus.test.topic")
@@ -394,7 +394,7 @@ func TestPrefxPatternBasedSubscription(t *testing.T) {
394394

395395
func TestWildcardPatternBasedSubscription(t *testing.T) {
396396
// Test match=prefix
397-
broker := NewBroker(logger, false, true, debug)
397+
broker := NewBroker(logger, false, true, debug, nil)
398398
subscriber := newTestPeer()
399399
sess := newSession(subscriber, 0, nil)
400400
testTopic := wamp.URI("nexus.test.topic")
@@ -465,7 +465,7 @@ func TestWildcardPatternBasedSubscription(t *testing.T) {
465465
}
466466

467467
func TestSubscriberBlackwhiteListing(t *testing.T) {
468-
broker := NewBroker(logger, false, true, debug)
468+
broker := NewBroker(logger, false, true, debug, nil)
469469
subscriber := newTestPeer()
470470
details := wamp.Dict{
471471
"authid": "jdoe",
@@ -572,7 +572,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) {
572572
}
573573

574574
func TestPublisherExclusion(t *testing.T) {
575-
broker := NewBroker(logger, false, true, debug)
575+
broker := NewBroker(logger, false, true, debug, nil)
576576
subscriber := newTestPeer()
577577
sess := newSession(subscriber, 0, nil)
578578
testTopic := wamp.URI("nexus.test.topic")
@@ -646,7 +646,7 @@ func TestPublisherExclusion(t *testing.T) {
646646
}
647647

648648
func TestPublisherIdentification(t *testing.T) {
649-
broker := NewBroker(logger, false, true, debug)
649+
broker := NewBroker(logger, false, true, debug, nil)
650650
subscriber := newTestPeer()
651651

652652
details := wamp.Dict{

router/dealer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,9 @@ func (d *Dealer) Register(callee *session, msg *wamp.Register) {
204204
disclose, _ := msg.Options[wamp.OptDiscloseCaller].(bool)
205205
// allow disclose for trusted clients
206206
if !d.allowDisclose && disclose {
207-
callee.rLock()
207+
callee.RLock()
208208
authrole, _ := wamp.AsString(callee.Details["authrole"])
209-
callee.rUnlock()
209+
callee.RUnlock()
210210
if authrole != "trusted" {
211211
d.trySend(callee, &wamp.Error{
212212
Type: msg.MessageType(),
@@ -1182,11 +1182,11 @@ func discloseCaller(caller *session, details wamp.Dict) {
11821182
details[roleCaller] = caller.ID
11831183
// These values are not required by the specification, but are here for
11841184
// compatibility with Crossbar.
1185-
caller.rLock()
1185+
caller.RLock()
11861186
for _, f := range []string{"authid", "authrole"} {
11871187
if val, ok := caller.Details[f]; ok {
11881188
details[fmt.Sprintf("%s_%s", roleCaller, f)] = val
11891189
}
11901190
}
1191-
caller.rUnlock()
1191+
caller.RUnlock()
11921192
}

router/publishfilter.go

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,25 @@ import (
66
"github.com/gammazero/nexus/wamp"
77
)
88

9-
type publishFilter struct {
10-
blIDs []wamp.ID
11-
wlIDs []wamp.ID
12-
blMap map[string][]string
13-
wlMap map[string][]string
9+
// PublishFilter is an interface to check whether a publication should be sent
10+
// to a specific session
11+
type PublishFilter interface {
12+
LockRequired() bool
13+
PublishAllowed(sess *wamp.Session) bool
1414
}
1515

16-
// newPublishFilter gets any blacklists and whitelists included in a PUBLISH
16+
type simplePublishFilter struct {
17+
blIDs []wamp.ID
18+
wlIDs []wamp.ID
19+
blMap map[string][]string
20+
wlMap map[string][]string
21+
lockRequired bool
22+
}
23+
24+
// NewSimplePublishFilter gets any blacklists and whitelists included in a PUBLISH
1725
// message. If there are no filters defined by the PUBLISH message, then nil
1826
// is returned.
19-
func newPublishFilter(msg *wamp.Publish) *publishFilter {
27+
func NewSimplePublishFilter(msg *wamp.Publish) PublishFilter {
2028
const (
2129
blacklistPrefix = "exclude_"
2230
whitelistPrefix = "eligible_"
@@ -79,21 +87,26 @@ func newPublishFilter(msg *wamp.Publish) *publishFilter {
7987
if blIDs == nil && wlIDs == nil && blMap == nil && wlMap == nil {
8088
return nil
8189
}
82-
return &publishFilter{blIDs, wlIDs, blMap, wlMap}
90+
return &simplePublishFilter{blIDs, wlIDs, blMap, wlMap, len(blMap) != 0 || len(wlMap) != 0}
91+
}
92+
93+
// LockRequired determines whether a consistent state of the subscriber sessions is
94+
// required while running the filter
95+
func (f *simplePublishFilter) LockRequired() bool {
96+
return f.lockRequired
8397
}
8498

85-
// publishAllowed determines if a message is allowed to be published to a
99+
// PublishAllowed determines if a message is allowed to be published to a
86100
// subscriber, by looking at any blacklists and whitelists provided with the
87101
// publish message.
88102
//
89103
// To receive a published event, the subscriber session must not have any
90104
// values that appear in a blacklist, and must have a value from each
91105
// whitelist.
92-
func (f *publishFilter) publishAllowed(sub *session) bool {
93-
subID := sub.ID
106+
func (f *simplePublishFilter) PublishAllowed(sub *wamp.Session) bool {
94107
// Check each blacklisted ID to see if session ID is blacklisted.
95108
for i := range f.blIDs {
96-
if f.blIDs[i] == subID {
109+
if f.blIDs[i] == sub.ID {
97110
return false
98111
}
99112
}
@@ -102,7 +115,7 @@ func (f *publishFilter) publishAllowed(sub *session) bool {
102115
// If session ID whitelist given, make sure session ID is in whitelist.
103116
if len(f.wlIDs) != 0 {
104117
for i := range f.wlIDs {
105-
if f.wlIDs[i] == subID {
118+
if f.wlIDs[i] == sub.ID {
106119
eligible = true
107120
break
108121
}
@@ -112,13 +125,8 @@ func (f *publishFilter) publishAllowed(sub *session) bool {
112125
}
113126
}
114127

115-
if len(f.blMap) != 0 || len(f.wlMap) != 0 {
116-
sub.rLock()
117-
defer sub.rUnlock()
118-
}
119-
120128
// Check blacklists to see if session has a value in any blacklist.
121-
details := sub.Session.Details
129+
details := sub.Details
122130
for attr, vals := range f.blMap {
123131
// Get the session attribute value to compare with blacklist.
124132
sessAttr, _ := wamp.AsString(details[attr])

0 commit comments

Comments
 (0)