Skip to content

Commit 3aeabd8

Browse files
authored
Merge branch 'master' into greenkeeper/mqtt-2.3.1
2 parents 9cd4728 + f84f96a commit 3aeabd8

File tree

11 files changed

+315
-30
lines changed

11 files changed

+315
-30
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ deploy:
2424
2525
skip_cleanup: true
2626
api_key:
27-
secure: K+uFHsh/EgT6zKDdXJV+770WdXzuBaiLQkN1EI/JU5AfjOqw8cWI2ycbH9LWlOCcE29l/cVFL+ycx/1x69IwmAWk61TlwPXqT1KiDBn8lxEN2dPub/0Cr7iHP/XV60daao2TJS+KS44wS6MIMcwBNkbeCZXsOKQZjFR/qjlAM5kCCQO+cZBWC+m2k3w0KHRO+6AdAOke205iGrI0/ZHLNU96f0jhMtwy3oM2DQEwY6aerAVx6cBes9Cdqzn86qgOohP5XXVpllTzeBTt6WjIQsQB1PXFReYfN0Tfcsvg2bcOUMhc5Exj1p40/Q9gSNKZon/4kRQFKvut1+189qMga6JKA6wdYO8xltVgRgwGc6pyeXMrJHZ/vzxyOLCYTE/W15Kg61dXrzBi4Ke+1Bm7DGuYFvAanSgEboseNFCkKSNirDO2a2Xv2ldRlWQpgEOADhAczGSlmUTz2/UqMEwrvc1M+DyRnGD2zyDV4My23jGAKnI/vqFke3PvLjGa+GpzcvEYntEq2iCIDSaoaJH47d1T9Rj2/g5Vl9hw6bfkLZIFk46fUfU4KzZJHmxs3x7s0vijNY2DAT3TrLhIw20gMbQJBiAW1BCojgZ2B+OB/ChYQcxYJ2IQEJxsLBXFrlSC3tfXCzWgf7yjrwYsP3Hbj+AbP+74E7uwLh/eSDfrpCY=
27+
secure: lovNd++XUGKSGeO0ze6rI87Y/RCdEvOlD1Z4FzVaCbjD6meNiOygf4SrHdVMUa8Xe0kCBUc9MRVqKZEvvgJ/yLgv+LwwU6+jhecGMWmovVh0b7q3il7ixzdvR4Lyyl4YxNu2AJMFn6G/YzniNm5EwaIss5vkoUAijmfys5FQFMsj/IQp08u2PRusfSua0Nnx5FtMDS17k2bqheCE7rTMlHswHuHCrE0gabBZoMgLK1K3DGvROvvQzbMlMYrLLbT8Xdm08opqzIUYTHqo7fIZOFqPieuw9gPb6T/M3QY9k0UIflF/rh9hoCAdJRa/BT8pxwXmE2V0PxKGMCTzphfKQ1mfpqQCTM4jNPDSRhadkAe1JTas9PeP91IPSECcamN9soBW/3fUe4ji3UGpaJHmU97Owd3BBzNQl2PMoHUBmyI4C0QkwukjgyStky/oEFaL6uh/scA2Hrin1jsOv2sXEltjkogbRWspGv9q+1DfJkHeiYqpp44nU4Kb2dXVNFqYSVxNUNewRrY3hsH8K1cBK4x61jBOY5ItUNQ2a64GhZ7WyHr2gIPxwT33HD+4ksRnEknl6KU9qc+6kwdi7fERQ8nEyDUOv4ZWtvvoHXiwZzu7m7Xwhtj4RdJ2IEUniJlmHq7M/yr8FPGCP+AXQzBQq2BUPt3OUEWAYrlk0vfjFA4=
2828
on:
2929
tags: true
3030
repo: msgflo/msgflo-nodejs

CHANGES.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,22 @@
11

2+
# msgflo-nodejs 0.10.0, 15.03.2017
3+
4+
* Transport: Add interface for listing existing datasubscriptions.
5+
`MessageBroker.listSubscriptions()`
6+
7+
# msgflo-nodejs 0.9.0, 04.02.2017
8+
9+
* Transport: Add new interface for subscribing to data changes on a binding.
10+
See MessageBroker `subscribeData()` and `unsubscribeData()`.
11+
* MQTT: Support `subscribeData()`
12+
* MQTT: Fix falsy messages not being forwarded
13+
* MQTT: Fix addBinding() not working if a removeBinding() had been done before
14+
* No support for subscribeData on AMQP yet
15+
16+
# msgflo-nodejs 0.8.2, 01.02.2017
17+
18+
* MsgFlo discovery message now sent periodically. Defaults to couple of times per minute.
19+
220
# msgflo-nodejs 0.7.2, 09.10.2016
321

422
* MQTT: Fix compatibility with newer versions of library (> 1.4.x)

Gruntfile.coffee

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module.exports = ->
1818
src: ['spec/*.coffee']
1919
options:
2020
reporter: 'spec'
21+
grep: process.env.TESTS
2122

2223
# Coding standards
2324
coffeelint:

package.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "msgflo-nodejs",
33
"description": "Node.js participant support for MsgFlo",
4-
"version": "0.7.2",
4+
"version": "0.10.5",
55
"author": {
66
"name": "Jon Nordby",
77
"email": "[email protected]"
@@ -17,7 +17,7 @@
1717
"commander": "^2.6.0",
1818
"debug": "^2.1.2",
1919
"fbp": "^1.1.2",
20-
"newrelic": "1.36.0",
20+
"newrelic": "^1.39.0",
2121
"uuid": "^3.0.1"
2222
},
2323
"devDependencies": {
@@ -29,8 +29,8 @@
2929
"grunt-contrib-coffee": "^1.0.0",
3030
"grunt-mocha-test": "^0.13.2",
3131
"grunt-shell-spawn": "^0.3.1",
32-
"mocha": "~3.2.0",
33-
"mqtt": "^2.3.1"
32+
"mocha": "~3.4.0",
33+
"mqtt": "^2.7.1"
3434
},
3535
"keywords": [],
3636
"bin": {

spec/01transport.coffee

Lines changed: 140 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ transportTests = (type) ->
149149

150150
clients.receiver.subscribeToQueue sharedQueue, onReceive, (err) ->
151151
chai.expect(err).to.be.a 'null'
152-
clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
153-
chai.expect(err).to.be.a 'null'
152+
clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
153+
chai.expect(err).to.be.a 'null'
154154

155155

156156
describe 'inqueue==outqueue with binding', ->
@@ -173,8 +173,8 @@ transportTests = (type) ->
173173

174174
clients.receiver.subscribeToQueue sharedQueue, onReceive, (err) ->
175175
chai.expect(err).to.be.a 'null'
176-
clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
177-
chai.expect(err).to.be.a 'null'
176+
clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
177+
chai.expect(err).to.be.a 'null'
178178

179179

180180
describe 'outqueue bound to inqueue', ->
@@ -200,6 +200,48 @@ transportTests = (type) ->
200200

201201
clients.receiver.subscribeToQueue inQueue, onReceive, (err) ->
202202
chai.expect(err).to.be.a 'null'
203+
clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
204+
chai.expect(err).to.be.a 'null'
205+
206+
describe 'outqueue bound to inqueue then removed', ->
207+
it 'sending to inqueue, show up on outqueue', (done) ->
208+
payload = { foo: 'bar922' }
209+
inQueue = 'inqueue922'
210+
outQueue = 'outqueue922'
211+
createConnectClients address, ['sender', 'receiver'], (err, clients) ->
212+
createQueues [
213+
[ clients.receiver, 'inqueue', inQueue ]
214+
[ clients.sender, 'outqueue', outQueue ]
215+
], (err) ->
216+
chai.expect(err).to.not.exist
217+
218+
binding = { type:'pubsub', src:outQueue, tgt:inQueue }
219+
bindingRemoved = false
220+
221+
onReceive = (msg) ->
222+
if bindingRemoved
223+
done new Error "Received data on removed binding"
224+
done = null
225+
return
226+
227+
clients.receiver.ackMessage msg
228+
chai.expect(msg).to.include.keys 'data'
229+
chai.expect(msg.data).to.eql payload
230+
bindingRemoved = true
231+
broker.removeBinding binding, (err) ->
232+
chai.expect(err).to.be.a 'null'
233+
clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
234+
chai.expect(err).to.be.a 'null'
235+
setTimeout () ->
236+
done null if done
237+
done = null
238+
return
239+
, 300
240+
241+
clients.receiver.subscribeToQueue inQueue, onReceive, (err) ->
242+
chai.expect(err).to.be.a 'null'
243+
broker.addBinding binding, (err) ->
244+
chai.expect(err).to.be.a 'null'
203245
clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
204246
chai.expect(err).to.be.a 'null'
205247

@@ -363,6 +405,100 @@ transportTests = (type) ->
363405
it 'only NACKed message is sent to deadletter', ->
364406
chai.expect(received.deadletter).to.eql [ { foo: 'nack'} ]
365407

408+
describe 'subscribing to bound topics', ->
409+
sendQueue = 'sub-send-36'
410+
receiveQueue = 'sub-receive-36'
411+
binding = { type:'pubsub', src:sendQueue, tgt:receiveQueue }
412+
connectionData = []
413+
clients = null
414+
415+
# Should be a before, but the 'beforeEach' of higher scope are ran afterwards...
416+
setup = (done) ->
417+
createConnectClients address, ['sender', 'receiver'], (err, c) ->
418+
clients = c
419+
createQueues [
420+
[ clients.receiver, 'inqueue', receiveQueue ]
421+
[ clients.sender, 'outqueue', sendQueue ]
422+
], (err) ->
423+
chai.expect(err).to.not.exist
424+
broker.addBinding binding, (err) ->
425+
chai.expect(err).to.be.a 'null'
426+
return done null
427+
428+
it 'should provide data sent on connection', (done) ->
429+
payloads =
430+
one: { foo: 'sub-96' }
431+
two: { bar: 'sub-97' }
432+
433+
onData = (bind, data) ->
434+
chai.expect(bind.src).to.equal binding.src
435+
chai.expect(bind.tgt).to.equal binding.tgt
436+
connectionData.push data
437+
# wait until we've gotten two packets
438+
if connectionData.length == 2
439+
[one, two] = connectionData
440+
chai.expect(one).to.eql payloads.one
441+
chai.expect(two).to.eql payloads.two
442+
return done null
443+
else if connectionData.length > 2
444+
return done new Error "Got more data than expected"
445+
446+
setup (err) ->
447+
return done err if err
448+
broker.subscribeData binding, onData, (err) ->
449+
return done err if err
450+
clients.sender.sendTo 'outqueue', sendQueue, payloads.one, (err) ->
451+
return done err if err
452+
clients.sender.sendTo 'outqueue', sendQueue, payloads.two, (err) ->
453+
return done err if err
454+
455+
describe 'subscribing to binding with srcQueue==tgtQueue', ->
456+
sendQueue = 'sub-shared-37'
457+
receiveQueue = sendQueue
458+
binding = { type:'pubsub', src:sendQueue, tgt:receiveQueue }
459+
connectionData = []
460+
clients = null
461+
462+
# Should be a before, but the 'beforeEach' of higher scope are ran afterwards...
463+
setup = (done) ->
464+
createConnectClients address, ['sender', 'receiver'], (err, c) ->
465+
clients = c
466+
createQueues [
467+
[ clients.receiver, 'inqueue', receiveQueue ]
468+
[ clients.sender, 'outqueue', sendQueue ]
469+
], (err) ->
470+
chai.expect(err).to.not.exist
471+
broker.addBinding binding, (err) ->
472+
chai.expect(err).to.be.a 'null'
473+
return done null
474+
475+
it 'should provide data sent on connection', (done) ->
476+
payloads =
477+
one: { foo: 'sub-106' }
478+
two: { bar: 'sub-107' }
479+
480+
onData = (bind, data) ->
481+
chai.expect(bind.src).to.equal binding.src
482+
chai.expect(bind.tgt).to.equal binding.tgt
483+
connectionData.push data
484+
# wait until we've gotten two packets
485+
if connectionData.length == 2
486+
[one, two] = connectionData
487+
chai.expect(one).to.eql payloads.one
488+
chai.expect(two).to.eql payloads.two
489+
return done null
490+
else if connectionData.length > 2
491+
return done new Error "Got more data than expected"
492+
493+
setup (err) ->
494+
return done err if err
495+
broker.subscribeData binding, onData, (err) ->
496+
return done err if err
497+
clients.sender.sendTo 'outqueue', sendQueue, payloads.one, (err) ->
498+
return done err if err
499+
clients.sender.sendTo 'outqueue', sendQueue, payloads.two, (err) ->
500+
return done err if err
501+
366502
describe 'Transport', ->
367503
Object.keys(transports).forEach (type) =>
368504
describe "#{type}", () ->

src/amqp.coffee

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11

22
debug = require('debug')('msgflo:amqp')
33
async = require 'async'
4+
uuid = require 'uuid'
5+
46
interfaces = require './interfaces'
57

68
try
79
amqp = require 'amqplib/callback_api'
810
catch e
911
amqp = e
1012

11-
1213
class Client extends interfaces.MessagingClient
1314
constructor: (@address, @options={}) ->
1415
@connection = null
@@ -149,12 +150,55 @@ class Client extends interfaces.MessagingClient
149150
@channel.sendToQueue 'fbp', data
150151
return callback null
151152

153+
154+
dataSubscriptionQueueName = (id) ->
155+
throw new Error("Missing id") if not id
156+
return ".msgflo-broker-subscriptions-#{id}"
157+
158+
bindingId = (b) ->
159+
return "[#{b.src}]->[#{b.tgt}]"
160+
152161
class MessageBroker extends Client
153162
constructor: (address, options) ->
154163
super address, options
164+
@options.id = uuid.v4() if not @options.id
165+
@subscriptions = {}
166+
167+
connect: (callback) ->
168+
super (err) =>
169+
return callback err if err
170+
# create queue for data subscriptions
171+
name = dataSubscriptionQueueName @options.id
172+
options =
173+
exclusive: true
174+
durable: false
175+
autoDelete: true
176+
@channel.assertQueue name, options, (err) =>
177+
return callback err if err
178+
onSubscribedQueueData = (message) =>
179+
exchange = message.fields.exchange
180+
debug 'broker subscriber got message on exchange', exchange
181+
matches = Object.keys(@subscriptions).filter (id) =>
182+
sub = @subscriptions[id]
183+
# XXX: how to account for which queue the message is for
184+
# can we create some identifier when we subscribe?
185+
return sub?.binding.src == exchange
186+
for id in matches
187+
sub = @subscriptions[id]
188+
data = message.content
189+
try
190+
data = JSON.parse message.content.toString()
191+
catch e
192+
null
193+
sub.handler sub.binding, data
194+
195+
subscribeOptions =
196+
noAck: true
197+
@channel.consume name, onSubscribedQueueData, subscribeOptions, (err) ->
198+
debug 'broker created subscription queue', err
199+
return callback err
155200

156201
addBinding: (binding, callback) ->
157-
# TODO: support roundrobin type
158202
debug 'Broker.addBinding', binding
159203
if binding.type == 'pubsub'
160204
@channel.bindQueue binding.tgt, binding.src, '', {}, callback
@@ -190,12 +234,49 @@ class MessageBroker extends Client
190234

191235
else
192236
return callback new Error 'Unsupported binding type: '+binding.type
237+
193238
removeBinding: (binding, callback) ->
194-
# FIXME: implement
195-
return callback null
196-
listBindings: (from, callback) ->
239+
debug 'Broker.removeBinding', binding
240+
if binding.type == 'pubsub'
241+
@channel.unbindQueue binding.tgt, binding.src, '', {}, callback
242+
else if binding.type == 'roundrobin'
243+
return callback new Error "removeBinding() not supported for type='roundrobin'" # TODO:
244+
else
245+
return callback new Error "Unsupported binding type: #{binding.type}"
246+
247+
248+
listBindings: (from, callback) -> # FIXME: implement
249+
# NOTE: probably need to use the RabbitMQ HTTP API for this
197250
return callback null, []
198-
251+
252+
# Data subscriptions
253+
subscribeData: (binding, datahandler, callback) ->
254+
exchange = binding.src
255+
queue = dataSubscriptionQueueName @options.id
256+
options =
257+
autoDelete: true
258+
@channel.bindQueue queue, exchange, '', options, (err) =>
259+
return callback err if err
260+
id = bindingId binding
261+
@subscriptions[id] =
262+
binding: binding
263+
handler: datahandler
264+
return callback null
265+
266+
unsubscribeData: (binding, datahandler, callback) ->
267+
# TODO: also remove the subscription with broker
268+
id = bindingId binding
269+
delete @subscriptions[id]
270+
return callback null
271+
272+
listSubscriptions: (callback) ->
273+
# Is there a way to get this information through AMQP?
274+
# Or do need to use RabbitMQ HTTP API?
275+
subs = []
276+
for id, sub of @subscriptions
277+
subs.push sub.binding
278+
return callback null, subs
279+
199280
# Participant registration
200281
subscribeParticipantChange: (handler) ->
201282
deserialize = (message) =>

src/interfaces.coffee

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class MessagingSystem
3737
class MessagingClient extends MessagingSystem
3838

3939
# Participant registration
40-
registerParticipant: (part) ->
40+
registerParticipant: (part, callback) ->
4141
throw new Error 'Not Implemented'
4242

4343
exports.MessagingClient = MessagingClient
@@ -66,6 +66,14 @@ class MessageBroker extends MessagingSystem
6666
listBindings: (callback) ->
6767
throw new Error 'Not Implemented'
6868

69+
# Subscribing to data on a binding
70+
subscribeData: (binding, datahandler, callback) ->
71+
throw new Error 'Not Implemented'
72+
unsubscribeData: (binding, datahandler, callback) ->
73+
throw new Error 'Not Implemented'
74+
listSubscriptions: (callback) ->
75+
throw new Error 'Not Implemented'
76+
6977
# Participant registration
7078
subscribeParticipantChange: (handler) ->
7179
throw new Error 'Not Implemented'

0 commit comments

Comments
 (0)