@@ -12,11 +12,15 @@ import {
1212} from "rhea"
1313import { AmqpEndpoints , AmqpMethods , MessageBuilder , ME } from "./message_builder.js"
1414import {
15+ CreateBindingResponseDecoder ,
1516 CreateExchangeResponseDecoder ,
1617 CreateQueueResponseDecoder ,
18+ DeleteBindingResponseDecoder ,
1719 DeleteExchangeResponseDecoder ,
1820 DeleteQueueResponseDecoder ,
1921} from "./response_decoder.js"
22+ import { AmqpBinding , Binding , BindingInfo , BindingOptions } from "./binding.js"
23+ import { randomUUID } from "crypto"
2024
2125type LinkOpenEvents = SenderEvents . senderOpen | ReceiverEvents . receiverOpen
2226type LinkErrorEvents = SenderEvents . senderError | ReceiverEvents . receiverError
@@ -36,8 +40,10 @@ const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
3640export interface Management {
3741 declareQueue : ( queueName : string , options ?: Partial < QueueOptions > ) => Promise < Queue >
3842 deleteQueue : ( queueName : string ) => Promise < boolean >
39- declareExchange : ( exchangeName : string , options : Partial < ExchangeOptions > ) => Promise < Exchange >
43+ declareExchange : ( exchangeName : string , options ? : Partial < ExchangeOptions > ) => Promise < Exchange >
4044 deleteExchange : ( exchangeName : string ) => Promise < boolean >
45+ bind : ( key : string , options : BindingOptions ) => Promise < Binding >
46+ unbind : ( key : string , options : BindingOptions ) => Promise < boolean >
4147 close : ( ) => void
4248}
4349
@@ -52,9 +58,7 @@ export class AmqpManagement implements Management {
5258 private readonly connection : RheaConnection ,
5359 private senderLink : Sender ,
5460 private receiverLink : Receiver
55- ) {
56- console . log ( this . receiverLink . is_open ( ) )
57- }
61+ ) { }
5862
5963 private static async openReceiver ( connection : RheaConnection ) : Promise < Receiver > {
6064 return AmqpManagement . openLink < Receiver > (
@@ -163,12 +167,12 @@ export class AmqpManagement implements Management {
163167 } )
164168 }
165169
166- declareExchange ( exchangeName : string , options : Partial < ExchangeOptions > = { } ) : Promise < Exchange > {
170+ async declareExchange ( exchangeName : string , options : Partial < ExchangeOptions > = { } ) : Promise < Exchange > {
167171 const exchangeInfo : ExchangeInfo = {
168172 type : options . type ?? "direct" ,
169173 arguments : options . arguments ?? { } ,
170174 autoDelete : options . auto_delete ?? false ,
171- durable : options . durable ?? false ,
175+ durable : options . durable ?? true ,
172176 name : exchangeName ,
173177 }
174178 return new Promise ( ( res , rej ) => {
@@ -190,8 +194,8 @@ export class AmqpManagement implements Management {
190194 . setReplyTo ( ME )
191195 . setAmqpMethod ( AmqpMethods . PUT )
192196 . setBody ( {
193- type : options . type ,
194- durable : options . durable ?? false ,
197+ type : options . type ?? "direct" ,
198+ durable : options . durable ?? true ,
195199 auto_delete : options . auto_delete ?? false ,
196200 } )
197201 . build ( )
@@ -200,7 +204,7 @@ export class AmqpManagement implements Management {
200204 } )
201205 }
202206
203- deleteExchange ( exchangeName : string ) : Promise < boolean > {
207+ async deleteExchange ( exchangeName : string ) : Promise < boolean > {
204208 return new Promise ( ( res , rej ) => {
205209 this . receiverLink . once ( ReceiverEvents . message , ( context : EventContext ) => {
206210 if ( ! context . message ) {
@@ -223,8 +227,94 @@ export class AmqpManagement implements Management {
223227 this . senderLink . send ( message )
224228 } )
225229 }
230+
231+ async bind ( key : string , options : BindingOptions ) : Promise < Binding > {
232+ const bindingInfo : BindingInfo = {
233+ id : randomUUID ( ) ,
234+ source : options . source . getInfo . name ,
235+ destination : options . destination . getInfo . name ,
236+ arguments : options . arguments ?? { } ,
237+ }
238+ return new Promise ( ( res , rej ) => {
239+ this . receiverLink . once ( ReceiverEvents . message , ( context : EventContext ) => {
240+ if ( ! context . message ) {
241+ return rej ( new Error ( "Receiver has not received any message" ) )
242+ }
243+
244+ const response = new CreateBindingResponseDecoder ( ) . decodeFrom ( context . message , String ( message . message_id ) )
245+ if ( response . status === "error" ) {
246+ return rej ( response . error )
247+ }
248+
249+ return res ( new AmqpBinding ( bindingInfo ) )
250+ } )
251+
252+ const message = new MessageBuilder ( )
253+ . sendTo ( `/${ AmqpEndpoints . Bindings } ` )
254+ . setReplyTo ( ME )
255+ . setAmqpMethod ( AmqpMethods . POST )
256+ . setBody ( {
257+ source : options . source . getInfo . name ,
258+ binding_key : key ,
259+ arguments : options . arguments ?? { } ,
260+ ...buildBindingDestinationFrom ( options . destination ) ,
261+ } )
262+ . build ( )
263+ this . senderLink . send ( message )
264+ } )
265+ }
266+
267+ async unbind ( key : string , options : BindingOptions ) : Promise < boolean > {
268+ return new Promise ( ( res , rej ) => {
269+ this . receiverLink . once ( ReceiverEvents . message , ( context : EventContext ) => {
270+ if ( ! context . message ) {
271+ return rej ( new Error ( "Receiver has not received any message" ) )
272+ }
273+
274+ const response = new DeleteBindingResponseDecoder ( ) . decodeFrom ( context . message , String ( message . message_id ) )
275+ if ( response . status === "error" ) {
276+ return rej ( response . error )
277+ }
278+
279+ return res ( true )
280+ } )
281+
282+ const message = new MessageBuilder ( )
283+ . sendTo (
284+ `/${ AmqpEndpoints . Bindings } /${ buildUnbindEndpointFrom ( { source : options . source , destination : options . destination , key } ) } `
285+ )
286+ . setReplyTo ( ME )
287+ . setAmqpMethod ( AmqpMethods . DELETE )
288+ . build ( )
289+ this . senderLink . send ( message )
290+ } )
291+ }
226292}
227293
228294function buildArgumentsFrom ( queueType ?: QueueType , queueOptions ?: Record < string , string > ) {
229295 return { ...( queueOptions ?? { } ) , ...( queueType ? { "x-queue-type" : queueType } : { } ) }
230296}
297+
298+ function buildUnbindEndpointFrom ( {
299+ source,
300+ destination,
301+ key,
302+ } : {
303+ source : Exchange
304+ destination : Exchange | Queue
305+ key : string
306+ } ) : string {
307+ if ( destination instanceof AmqpExchange ) {
308+ return `src=${ encodeURIComponent ( source . getInfo . name ) } ;dste=${ encodeURIComponent ( destination . getInfo . name ) } ;key=${ encodeURIComponent ( key ) } ;args=`
309+ }
310+
311+ return `src=${ encodeURIComponent ( source . getInfo . name ) } ;dstq=${ encodeURIComponent ( destination . getInfo . name ) } ;key=${ encodeURIComponent ( key ) } ;args=`
312+ }
313+
314+ function buildBindingDestinationFrom ( destination : Exchange | Queue ) {
315+ if ( destination instanceof AmqpExchange ) {
316+ return { destination_exchange : destination . getInfo . name }
317+ }
318+
319+ return { destination_queue : destination . getInfo . name }
320+ }
0 commit comments