Skip to content

Commit e38a197

Browse files
authored
Merge pull request #24 from msgflo/mqtt-subscribe
Support subscribing to data on MQTT
2 parents c37da28 + d4e986c commit e38a197

File tree

5 files changed

+146
-13
lines changed

5 files changed

+146
-13
lines changed

Gruntfile.coffee

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module.exports = ->
1818
src: ['spec/*.coffee']
1919
options:
2020
reporter: 'spec'
21+
grep: process.env.TESTS
2122

2223
# Coding standards
2324
coffeelint:

spec/01transport.coffee

Lines changed: 101 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ transportTests = (type) ->
9999
broker = null
100100

101101
describeIfRoundRobinSupport = if type == 'AMQP' then describe else describe.skip
102+
describeIfSubscribeSupport = if type != 'AMQP' then describe else describe.skip
102103

103104
beforeEach (done) ->
104105
broker = transport.getBroker address
@@ -149,8 +150,8 @@ transportTests = (type) ->
149150

150151
clients.receiver.subscribeToQueue sharedQueue, onReceive, (err) ->
151152
chai.expect(err).to.be.a 'null'
152-
clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
153-
chai.expect(err).to.be.a 'null'
153+
clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
154+
chai.expect(err).to.be.a 'null'
154155

155156

156157
describe 'inqueue==outqueue with binding', ->
@@ -173,8 +174,8 @@ transportTests = (type) ->
173174

174175
clients.receiver.subscribeToQueue sharedQueue, onReceive, (err) ->
175176
chai.expect(err).to.be.a 'null'
176-
clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
177-
chai.expect(err).to.be.a 'null'
177+
clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
178+
chai.expect(err).to.be.a 'null'
178179

179180

180181
describe 'outqueue bound to inqueue', ->
@@ -200,8 +201,8 @@ transportTests = (type) ->
200201

201202
clients.receiver.subscribeToQueue inQueue, onReceive, (err) ->
202203
chai.expect(err).to.be.a 'null'
203-
clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
204-
chai.expect(err).to.be.a 'null'
204+
clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
205+
chai.expect(err).to.be.a 'null'
205206

206207

207208
describe 'multiple outqueues bound to one inqueue', ->
@@ -363,6 +364,100 @@ transportTests = (type) ->
363364
it 'only NACKed message is sent to deadletter', ->
364365
chai.expect(received.deadletter).to.eql [ { foo: 'nack'} ]
365366

367+
describeIfSubscribeSupport 'subscribing to bound topics', ->
368+
sendQueue = 'sub-send-36'
369+
receiveQueue = 'sub-receive-36'
370+
binding = { type:'pubsub', src:sendQueue, tgt:receiveQueue }
371+
connectionData = []
372+
clients = null
373+
374+
# Should be a before, but the 'beforeEach' of higher scope are ran afterwards...
375+
setup = (done) ->
376+
createConnectClients address, ['sender', 'receiver'], (err, c) ->
377+
clients = c
378+
createQueues [
379+
[ clients.receiver, 'inqueue', receiveQueue ]
380+
[ clients.sender, 'outqueue', sendQueue ]
381+
], (err) ->
382+
chai.expect(err).to.not.exist
383+
broker.addBinding binding, (err) ->
384+
chai.expect(err).to.be.a 'null'
385+
return done null
386+
387+
it 'should provide data sent on connection', (done) ->
388+
payloads =
389+
one: { foo: 'sub-96' }
390+
two: { bar: 'sub-97' }
391+
392+
onData = (bind, data) ->
393+
chai.expect(bind.src).to.equal binding.src
394+
chai.expect(bind.tgt).to.equal binding.tgt
395+
connectionData.push data
396+
# wait until we've gotten two packets
397+
if connectionData.length == 2
398+
[one, two] = connectionData
399+
chai.expect(one).to.eql payloads.one
400+
chai.expect(two).to.eql payloads.two
401+
return done null
402+
else if connectionData.length > 2
403+
return done new Error "Got more data than expected"
404+
405+
setup (err) ->
406+
return done err if err
407+
broker.subscribeData binding, onData, (err) ->
408+
return done err if err
409+
clients.sender.sendTo 'outqueue', sendQueue, payloads.one, (err) ->
410+
return done err if err
411+
clients.sender.sendTo 'outqueue', sendQueue, payloads.two, (err) ->
412+
return done err if err
413+
414+
describeIfSubscribeSupport 'subscribing to binding with srcQueue==tgtQueue', ->
415+
sendQueue = 'sub-shared-37'
416+
receiveQueue = sendQueue
417+
binding = { type:'pubsub', src:sendQueue, tgt:receiveQueue }
418+
connectionData = []
419+
clients = null
420+
421+
# Should be a before, but the 'beforeEach' of higher scope are ran afterwards...
422+
setup = (done) ->
423+
createConnectClients address, ['sender', 'receiver'], (err, c) ->
424+
clients = c
425+
createQueues [
426+
[ clients.receiver, 'inqueue', receiveQueue ]
427+
[ clients.sender, 'outqueue', sendQueue ]
428+
], (err) ->
429+
chai.expect(err).to.not.exist
430+
broker.addBinding binding, (err) ->
431+
chai.expect(err).to.be.a 'null'
432+
return done null
433+
434+
it 'should provide data sent on connection', (done) ->
435+
payloads =
436+
one: { foo: 'sub-106' }
437+
two: { bar: 'sub-107' }
438+
439+
onData = (bind, data) ->
440+
chai.expect(bind.src).to.equal binding.src
441+
chai.expect(bind.tgt).to.equal binding.tgt
442+
connectionData.push data
443+
# wait until we've gotten two packets
444+
if connectionData.length == 2
445+
[one, two] = connectionData
446+
chai.expect(one).to.eql payloads.one
447+
chai.expect(two).to.eql payloads.two
448+
return done null
449+
else if connectionData.length > 2
450+
return done new Error "Got more data than expected"
451+
452+
setup (err) ->
453+
return done err if err
454+
broker.subscribeData binding, onData, (err) ->
455+
return done err if err
456+
clients.sender.sendTo 'outqueue', sendQueue, payloads.one, (err) ->
457+
return done err if err
458+
clients.sender.sendTo 'outqueue', sendQueue, payloads.two, (err) ->
459+
return done err if err
460+
366461
describe 'Transport', ->
367462
Object.keys(transports).forEach (type) =>
368463
describe "#{type}", () ->

src/amqp.coffee

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,18 @@ class MessageBroker extends Client
190190

191191
else
192192
return callback new Error 'Unsupported binding type: '+binding.type
193-
removeBinding: (binding, callback) ->
194-
# FIXME: implement
193+
194+
removeBinding: (binding, callback) -> # FIXME: implement
195195
return callback null
196-
listBindings: (from, callback) ->
196+
listBindings: (from, callback) -> # FIXME: implement
197197
return callback null, []
198-
198+
199+
# Data subscriptions
200+
subscribeData: (binding, datahandler, callback) -> # TODO: implement
201+
return callback null
202+
unsubscribeData: (binding, datahandler, callback) -> # TODO: implement
203+
return callback null
204+
199205
# Participant registration
200206
subscribeParticipantChange: (handler) ->
201207
deserialize = (message) =>

src/interfaces.coffee

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ class MessageBroker extends MessagingSystem
6666
listBindings: (callback) ->
6767
throw new Error 'Not Implemented'
6868

69+
# Subscribing to data on a binding
70+
subscribeData: (binding, datahandler, callback) ->
71+
throw new Error 'Not Implemented'
72+
unsubscribeData: (binding, datahandler, callback) ->
73+
throw new Error 'Not Implemented'
74+
6975
# Participant registration
7076
subscribeParticipantChange: (handler) ->
7177
throw new Error 'Not Implemented'

src/routing.coffee

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,32 @@ bindingId = (f, t) ->
3030
class Binder
3131
constructor: (@transport) ->
3232
@bindings = {}
33+
@subscriptions = {}
3334

3435
addBinding: (binding, callback) ->
3536
from = binding.src
3637
to = binding.tgt
3738
# TODO: handle non-pubsub types
3839
id = bindingId from, to
3940
debug 'Binder.addBinding', binding.type, id
40-
return callback null if @bindings[id] or from == to
41+
return callback null if @bindings[id] # already exists, avoid duplicate
4142

4243
handler = (msg) =>
4344
binding = @bindings[id]
4445
return if not binding?.enabled
4546
debug 'edge message', from, to, msg
46-
@transport.sendTo 'outqueue', to, msg.data, (err) ->
47-
throw err if err
47+
48+
subscription = @subscriptions[id]
49+
if subscription
50+
for subCallback in subscription.handlers
51+
subCallback(subscription.binding, msg.data)
52+
53+
if from != to
54+
@transport.sendTo 'outqueue', to, msg.data, (err) ->
55+
throw err if err
56+
else
57+
# same topic/queue, data should appear without our forwarding
58+
4859
@transport.subscribeToQueue from, handler, (err) =>
4960
return callback err if err
5061
@bindings[id] =
@@ -68,6 +79,18 @@ class Binder
6879
debug 'Binder.listBindings'
6980
return callback null, []
7081

82+
subscribeData: (binding, datahandler, callback) ->
83+
id = bindingId binding.src, binding.tgt
84+
@subscriptions[id] = { handlers: [], binding: binding } if not @subscriptions[id]
85+
@subscriptions[id].handlers.push datahandler
86+
return callback null
87+
unsubscribeData: (binding, datahandler, callback) ->
88+
id = bindingId binding.src, binding.tgt
89+
subscription = @subscriptions[id]
90+
handlerIndex = subscription.handlers.indexOf datahandler
91+
return callback new Error "Subscription was not found" if handlerIndex == -1
92+
subscription.handlers = subscription.handlers.splice(handlerIndex, 1)
93+
return callback null
7194

7295
exports.Binder = Binder
7396
exports.binderMixin = (transport) ->
@@ -76,4 +99,6 @@ exports.binderMixin = (transport) ->
7699
transport.addBinding = b.addBinding.bind b
77100
transport.removeBinding = b.removeBinding.bind b
78101
transport.listBindings = b.listBindings.bind b
102+
transport.subscribeData = b.subscribeData.bind b
103+
transport.unsubscribeData = b.unsubscribeData.bind b
79104

0 commit comments

Comments
 (0)