Skip to content

Commit cf557e0

Browse files
committed
amqp: Implement queue introspection
1 parent a3639b9 commit cf557e0

File tree

1 file changed

+69
-6
lines changed

1 file changed

+69
-6
lines changed

src/amqp.coffee

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11

22
debug = require('debug')('msgflo:amqp')
33
async = require 'async'
4+
uuid = require 'uuid'
5+
46
interfaces = require './interfaces'
57

68
try
79
amqp = require 'amqplib/callback_api'
810
catch e
911
amqp = e
1012

11-
1213
class Client extends interfaces.MessagingClient
1314
constructor: (@address, @options={}) ->
1415
@connection = null
@@ -149,9 +150,51 @@ class Client extends interfaces.MessagingClient
149150
@channel.sendToQueue 'fbp', data
150151
return callback null
151152

153+
154+
dataSubscriptionQueueName = (id) ->
155+
throw new Error("Missing id") if not id
156+
return ".msgflo-broker-subscriptions-#{id}"
157+
158+
bindingId = (b) ->
159+
return "[#{b.src}]->[#{b.tgt}]"
160+
152161
class MessageBroker extends Client
153162
constructor: (address, options) ->
154163
super address, options
164+
@options.id = uuid.v4() if not @options.id
165+
@subscriptions = {}
166+
167+
connect: (callback) ->
168+
super (err) =>
169+
return callback err if err
170+
# create queue for data subscriptions
171+
name = dataSubscriptionQueueName @options.id
172+
options =
173+
exclusive: true
174+
durable: false
175+
autoDelete: true
176+
@channel.assertQueue name, options, (err) =>
177+
return callback err if err
178+
onSubscribedQueueData = (message) =>
179+
exchange = message.fields.exchange
180+
debug 'broker subscriber got message on exchange', exchange
181+
matches = Object.keys(@subscriptions).filter (id) =>
182+
sub = @subscriptions[id]
183+
# XXX: how to account for which queue the message is for
184+
# can we create some identifier when we subscribe?
185+
return sub?.binding.src == exchange
186+
for id in matches
187+
sub = @subscriptions[id]
188+
data = message.content
189+
try
190+
data = JSON.parse message.content.toString()
191+
catch e
192+
null
193+
sub.handler sub.binding, data
194+
195+
@channel.consume name, onSubscribedQueueData, {}, (err) ->
196+
debug 'broker created subscription queue', err
197+
return callback err
155198

156199
addBinding: (binding, callback) ->
157200
# TODO: support roundrobin type
@@ -197,12 +240,32 @@ class MessageBroker extends Client
197240
return callback null, []
198241

199242
# Data subscriptions
200-
subscribeData: (binding, datahandler, callback) -> # TODO: implement
201-
return callback null
202-
unsubscribeData: (binding, datahandler, callback) -> # TODO: implement
243+
subscribeData: (binding, datahandler, callback) ->
244+
exchange = binding.src
245+
queue = dataSubscriptionQueueName @options.id
246+
options =
247+
autoDelete: true
248+
@channel.bindQueue queue, exchange, '', options, (err) =>
249+
return callback err if err
250+
id = bindingId binding
251+
@subscriptions[id] =
252+
binding: binding
253+
handler: datahandler
254+
return callback null
255+
256+
unsubscribeData: (binding, datahandler, callback) ->
257+
# TODO: also remove the subscription with broker
258+
id = bindingId binding
259+
delete @subscriptions[id]
203260
return callback null
204-
listSubscriptions: (callback) -> # TODO: implement
205-
return callback null, []
261+
262+
listSubscriptions: (callback) ->
263+
# Is there a way to get this information through AMQP?
264+
# Or do need to use RabbitMQ HTTP API?
265+
subs = []
266+
for id, sub of @subscriptions
267+
subs.push sub.binding
268+
return callback null, subs
206269

207270
# Participant registration
208271
subscribeParticipantChange: (handler) ->

0 commit comments

Comments
 (0)