Skip to content

Commit 45b0d04

Browse files
author
magne
committed
feat: add publisher support
1 parent 4bde5c3 commit 45b0d04

File tree

14 files changed

+830
-118
lines changed

14 files changed

+830
-118
lines changed

src/connection.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
import { ConnectionEvents, create_container, Connection as RheaConnection } from "rhea"
22
import { AmqpManagement, Management } from "./management.js"
33
import { EnvironmentParams } from "./environment.js"
4+
import { AmqpPublisher, Publisher } from "./publisher.js"
5+
import { DestinationOptions } from "./message.js"
46

57
export interface Connection {
68
close(): Promise<boolean>
79
isOpen(): boolean
810
management(): Management
11+
createPublisher(options?: DestinationOptions): Promise<Publisher>
12+
get publishers(): Map<string, Publisher>
913
}
1014

1115
export class AmqpConnection implements Connection {
16+
private _publishers: Map<string, Publisher> = new Map<string, Publisher>()
17+
1218
static async create(params: EnvironmentParams) {
1319
const connection = await AmqpConnection.open(params)
1420
const topologyManagement = await AmqpManagement.create(connection)
@@ -43,6 +49,7 @@ export class AmqpConnection implements Connection {
4349
return rej(new Error("Connection error: " + context.connection.error))
4450
})
4551

52+
this._publishers.forEach((p) => p.close())
4653
this.connection.close()
4754
})
4855
}
@@ -51,6 +58,16 @@ export class AmqpConnection implements Connection {
5158
return this.topologyManagement
5259
}
5360

61+
async createPublisher(options?: DestinationOptions): Promise<Publisher> {
62+
const publisher = await AmqpPublisher.createFrom(this.connection, this._publishers, options)
63+
this._publishers.set(publisher.id, publisher)
64+
return publisher
65+
}
66+
67+
public get publishers(): Map<string, Publisher> {
68+
return this._publishers
69+
}
70+
5471
public isOpen(): boolean {
5572
return this.connection ? this.connection.is_open() : false
5673
}

src/message_builder.ts renamed to src/link_message_builder.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export enum AmqpEndpoints {
1515

1616
export const ME = "$me"
1717

18-
export class MessageBuilder {
18+
export class LinkMessageBuilder {
1919
private messageId: string = generate_uuid()
2020
private to: string = ""
2121
private replyTo: string = ME

src/management.ts

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
SenderEvents,
1111
SenderOptions,
1212
} from "rhea"
13-
import { AmqpEndpoints, AmqpMethods, MessageBuilder, ME } from "./message_builder.js"
13+
import { AmqpEndpoints, AmqpMethods, LinkMessageBuilder, ME } from "./link_message_builder.js"
1414
import {
1515
CreateBindingResponseDecoder,
1616
CreateExchangeResponseDecoder,
@@ -21,12 +21,7 @@ import {
2121
} from "./response_decoder.js"
2222
import { AmqpBinding, Binding, BindingInfo, BindingOptions } from "./binding.js"
2323
import { randomUUID } from "crypto"
24-
25-
type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
26-
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
27-
type OpenLinkMethods =
28-
| ((options?: SenderOptions | string) => Sender)
29-
| ((options?: ReceiverOptions | string) => Receiver)
24+
import { openLink } from "./utils.js"
3025

3126
const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
3227
snd_settle_mode: 1,
@@ -61,7 +56,7 @@ export class AmqpManagement implements Management {
6156
) {}
6257

6358
private static async openReceiver(connection: RheaConnection): Promise<Receiver> {
64-
return AmqpManagement.openLink<Receiver>(
59+
return openLink<Receiver>(
6560
connection,
6661
ReceiverEvents.receiverOpen,
6762
ReceiverEvents.receiverError,
@@ -71,7 +66,7 @@ export class AmqpManagement implements Management {
7166
}
7267

7368
private static async openSender(connection: RheaConnection): Promise<Sender> {
74-
return AmqpManagement.openLink<Sender>(
69+
return openLink<Sender>(
7570
connection,
7671
SenderEvents.senderOpen,
7772
SenderEvents.senderError,
@@ -80,24 +75,6 @@ export class AmqpManagement implements Management {
8075
)
8176
}
8277

83-
private static async openLink<T extends Sender | Receiver>(
84-
connection: RheaConnection,
85-
successEvent: LinkOpenEvents,
86-
errorEvent: LinkErrorEvents,
87-
openMethod: OpenLinkMethods,
88-
config?: SenderOptions | ReceiverOptions | string
89-
): Promise<T> {
90-
return new Promise((res, rej) => {
91-
connection.once(successEvent, (context) => {
92-
return res(context.receiver || context.sender)
93-
})
94-
connection.once(errorEvent, (context) => {
95-
return rej(context.connection.error)
96-
})
97-
openMethod(config)
98-
})
99-
}
100-
10178
close(): void {
10279
if (this.connection.is_closed()) return
10380

@@ -128,7 +105,7 @@ export class AmqpManagement implements Management {
128105
return res(new AmqpQueue(response.body))
129106
})
130107

131-
const message = new MessageBuilder()
108+
const message = new LinkMessageBuilder()
132109
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
133110
.setReplyTo(ME)
134111
.setAmqpMethod(AmqpMethods.PUT)
@@ -158,7 +135,7 @@ export class AmqpManagement implements Management {
158135
return res(true)
159136
})
160137

161-
const message = new MessageBuilder()
138+
const message = new LinkMessageBuilder()
162139
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
163140
.setReplyTo(ME)
164141
.setAmqpMethod(AmqpMethods.DELETE)
@@ -189,7 +166,7 @@ export class AmqpManagement implements Management {
189166
return res(new AmqpExchange(exchangeInfo))
190167
})
191168

192-
const message = new MessageBuilder()
169+
const message = new LinkMessageBuilder()
193170
.sendTo(`/${AmqpEndpoints.Exchanges}/${encodeURIComponent(exchangeName)}`)
194171
.setReplyTo(ME)
195172
.setAmqpMethod(AmqpMethods.PUT)
@@ -219,7 +196,7 @@ export class AmqpManagement implements Management {
219196
return res(true)
220197
})
221198

222-
const message = new MessageBuilder()
199+
const message = new LinkMessageBuilder()
223200
.sendTo(`/${AmqpEndpoints.Exchanges}/${encodeURIComponent(exchangeName)}`)
224201
.setReplyTo(ME)
225202
.setAmqpMethod(AmqpMethods.DELETE)
@@ -249,7 +226,7 @@ export class AmqpManagement implements Management {
249226
return res(new AmqpBinding(bindingInfo))
250227
})
251228

252-
const message = new MessageBuilder()
229+
const message = new LinkMessageBuilder()
253230
.sendTo(`/${AmqpEndpoints.Bindings}`)
254231
.setReplyTo(ME)
255232
.setAmqpMethod(AmqpMethods.POST)
@@ -279,7 +256,7 @@ export class AmqpManagement implements Management {
279256
return res(true)
280257
})
281258

282-
const message = new MessageBuilder()
259+
const message = new LinkMessageBuilder()
283260
.sendTo(
284261
`/${AmqpEndpoints.Bindings}/${buildUnbindEndpointFrom({ source: options.source, destination: options.destination, key })}`
285262
)

src/message.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { generate_uuid, Message as RheaMessage } from "rhea"
2+
import { AmqpEndpoints } from "./link_message_builder.js"
3+
import { inspect } from "util"
4+
5+
export type ExchangeOptions = {
6+
name: string
7+
routingKey?: string
8+
}
9+
10+
export type QueueOptions = {
11+
name: string
12+
}
13+
14+
export type DestinationOptions = { exchange: ExchangeOptions } | { queue: QueueOptions }
15+
16+
type MessageOptions = {
17+
body: string
18+
destination?: DestinationOptions
19+
}
20+
21+
export function createAmqpMessage(options: MessageOptions): RheaMessage {
22+
if (options.destination) {
23+
return { message_id: generate_uuid(), body: options.body, to: createAddressFrom(options.destination) }
24+
}
25+
26+
return { message_id: generate_uuid(), body: options.body }
27+
}
28+
29+
export function createAddressFrom(options?: DestinationOptions): string {
30+
if (!options) return ""
31+
if ("queue" in options) return `/${AmqpEndpoints.Queues}/${options.queue.name}`
32+
if ("exchange" in options) {
33+
return options.exchange.routingKey
34+
? `/${AmqpEndpoints.Exchanges}/${options.exchange.name}/${options.exchange.routingKey}`
35+
: `/${AmqpEndpoints.Exchanges}/${options.exchange.name}`
36+
}
37+
38+
throw new Error(`Unknown publisher options -- ${inspect(options)}`)
39+
}

src/publisher.ts

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import { Connection, Delivery, EventContext, Message, ReceiverOptions, Sender, SenderEvents, SenderOptions } from "rhea"
2+
import { openLink, OutcomeState } from "./utils.js"
3+
import { randomUUID } from "crypto"
4+
import { inspect } from "util"
5+
import { createAddressFrom, DestinationOptions } from "./message.js"
6+
7+
const getPublisherSenderLinkConfigurationFrom = (
8+
address: string,
9+
publisherId: string
10+
): SenderOptions | ReceiverOptions => ({
11+
snd_settle_mode: 0,
12+
rcv_settle_mode: 0,
13+
name: publisherId,
14+
target: { address, expiry_policy: "SESSION_END", durable: 0, dynamic: false },
15+
source: {
16+
address,
17+
expiry_policy: "LINK_DETACH",
18+
timeout: 0,
19+
dynamic: false,
20+
durable: 0,
21+
},
22+
})
23+
24+
interface PublishResult {
25+
delivery: Delivery
26+
outcome: OutcomeState
27+
}
28+
29+
type ResolvableSenderEvents = SenderEvents.accepted | SenderEvents.rejected | SenderEvents.released
30+
type RejectableSenderEvents = SenderEvents.senderError
31+
type SenderSenderEventHandler = (senderEvent: ResolvableSenderEvents) => (context: EventContext) => void
32+
type ErrorSenderEventHandler = (context: EventContext, errorEvent: RejectableSenderEvents, deliveryId: number) => void
33+
type MessageHandlerPromise = {
34+
resolve: (publishResult: PublishResult) => void
35+
reject: (error: Error) => void
36+
}
37+
38+
export interface Publisher {
39+
publish(message: Message): Promise<PublishResult>
40+
close(): void
41+
get id(): string
42+
}
43+
44+
export class AmqpPublisher implements Publisher {
45+
static async createFrom(
46+
connection: Connection,
47+
publishersList: Map<string, Publisher>,
48+
options?: DestinationOptions
49+
): Promise<Publisher> {
50+
const address = createAddressFrom(options)
51+
const id = randomUUID()
52+
const senderLink = await AmqpPublisher.openSender(connection, address, id)
53+
return new AmqpPublisher(connection, senderLink, id, publishersList)
54+
}
55+
56+
private static async openSender(connection: Connection, address: string, publisherId: string): Promise<Sender> {
57+
return openLink<Sender>(
58+
connection,
59+
SenderEvents.senderOpen,
60+
SenderEvents.senderError,
61+
connection.open_sender.bind(connection),
62+
getPublisherSenderLinkConfigurationFrom(address, publisherId)
63+
)
64+
}
65+
66+
private successMessageHandler: SenderSenderEventHandler
67+
private errorMessageHandler: ErrorSenderEventHandler
68+
private promiseMessagesHandler: Map<number, MessageHandlerPromise> = new Map<number, MessageHandlerPromise>()
69+
70+
constructor(
71+
private readonly connection: Connection,
72+
private senderLink: Sender,
73+
private readonly _id: string,
74+
private publishersList: Map<string, Publisher>
75+
) {
76+
console.log(this.connection.container_id)
77+
this.successMessageHandler = (senderEvent: ResolvableSenderEvents) => {
78+
return (context: EventContext) => {
79+
if (context.delivery) {
80+
const promise = this.promiseMessagesHandler.get(context.delivery.id)
81+
if (promise) {
82+
this.promiseMessagesHandler.delete(context.delivery.id)
83+
return promise.resolve({ delivery: context.delivery, outcome: getOutcomeStateFrom(senderEvent) })
84+
}
85+
}
86+
return Promise.reject("Message cannot be handled successfully")
87+
}
88+
}
89+
this.errorMessageHandler = (context: EventContext, errorEvent: RejectableSenderEvents, deliveryId: number) => {
90+
const promise = this.promiseMessagesHandler.get(deliveryId)
91+
if (promise) {
92+
this.promiseMessagesHandler.delete(deliveryId)
93+
const error = new Error(`SenderLink error ${errorEvent}: ${inspect(context.error)}`)
94+
return promise.reject(error)
95+
}
96+
return Promise.reject("Unhandled error")
97+
}
98+
99+
this.registerEventListeners()
100+
}
101+
102+
private registerEventListeners(): void {
103+
this.senderLink.on(SenderEvents.accepted, this.successMessageHandler(SenderEvents.accepted))
104+
this.senderLink.on(SenderEvents.rejected, this.successMessageHandler(SenderEvents.rejected))
105+
this.senderLink.on(SenderEvents.released, this.successMessageHandler(SenderEvents.released))
106+
this.senderLink.on(SenderEvents.senderError, (context: EventContext) => {
107+
for (const id of this.promiseMessagesHandler.keys())
108+
this.errorMessageHandler(context, SenderEvents.senderError, id)
109+
})
110+
}
111+
112+
private removeEventListeners(): void {
113+
this.senderLink.removeAllListeners()
114+
}
115+
116+
async publish(message: Message): Promise<PublishResult> {
117+
return new Promise<PublishResult>((res, rej) => {
118+
const delivery = this.senderLink.send(message)
119+
this.promiseMessagesHandler.set(delivery.id, {
120+
resolve: (publishResult: PublishResult) => res(publishResult),
121+
reject: (error: Error) => rej(error),
122+
})
123+
})
124+
}
125+
126+
close(): void {
127+
this.removeEventListeners()
128+
129+
if (this.senderLink.is_open()) this.senderLink.close()
130+
if (this.publishersList.has(this._id)) this.publishersList.delete(this._id)
131+
}
132+
133+
get id(): string {
134+
return this._id
135+
}
136+
}
137+
138+
function getOutcomeStateFrom(event: ResolvableSenderEvents): OutcomeState {
139+
switch (event) {
140+
case SenderEvents.released:
141+
return OutcomeState.RELEASED
142+
case SenderEvents.rejected:
143+
return OutcomeState.REJECTED
144+
case SenderEvents.accepted:
145+
return OutcomeState.ACCEPTED
146+
}
147+
}

0 commit comments

Comments
 (0)