Skip to content

Commit d2d7ed1

Browse files
l4mbymagne
andauthored
fix: publisher not linked can publish messages with address correctly (#31)
Co-authored-by: magne <[email protected]>
1 parent a2e601f commit d2d7ed1

File tree

7 files changed

+65
-15
lines changed

7 files changed

+65
-15
lines changed

src/consumer.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ export class AmqpConsumer implements Consumer {
4949
) {
5050
const id = generate_uuid()
5151
const address = createAddressFrom({ queue: { name: queueName } })
52+
if (!address) throw new Error("Consumer must have an address")
53+
5254
const receiverLink = await AmqpConsumer.openReceiver(connection, address, id)
5355
return new AmqpConsumer(id, connection, consumersList, receiverLink, params)
5456
}

src/message.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,19 @@ type MessageOptions = {
2020

2121
export function createAmqpMessage(options: MessageOptions): RheaMessage {
2222
if (options.destination) {
23-
return { message_id: generate_uuid(), body: options.body, to: createAddressFrom(options.destination) }
23+
return {
24+
message_id: generate_uuid(),
25+
body: options.body,
26+
to: createAddressFrom(options.destination),
27+
durable: true,
28+
}
2429
}
2530

26-
return { message_id: generate_uuid(), body: options.body }
31+
return { message_id: generate_uuid(), body: options.body, durable: true }
2732
}
2833

29-
export function createAddressFrom(options?: DestinationOptions): string {
30-
if (!options) return ""
34+
export function createAddressFrom(options?: DestinationOptions): string | undefined {
35+
if (!options) return undefined
3136
if ("queue" in options) return `/${AmqpEndpoints.Queues}/${options.queue.name}`
3237
if ("exchange" in options) {
3338
return options.exchange.routingKey

src/publisher.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@ import { inspect } from "util"
55
import { createAddressFrom, DestinationOptions } from "./message.js"
66

77
const getPublisherSenderLinkConfigurationFrom = (
8-
address: string,
9-
publisherId: string
8+
publisherId: string,
9+
address?: string
1010
): SenderOptions | ReceiverOptions => ({
1111
snd_settle_mode: 0,
1212
rcv_settle_mode: 0,
1313
name: publisherId,
1414
target: { address, expiry_policy: "SESSION_END", durable: 0, dynamic: false },
1515
source: {
16-
address,
16+
address: address ?? "",
1717
expiry_policy: "LINK_DETACH",
1818
timeout: 0,
1919
dynamic: false,
@@ -49,17 +49,17 @@ export class AmqpPublisher implements Publisher {
4949
): Promise<Publisher> {
5050
const address = createAddressFrom(options)
5151
const id = randomUUID()
52-
const senderLink = await AmqpPublisher.openSender(connection, address, id)
52+
const senderLink = await AmqpPublisher.openSender(connection, id, address)
5353
return new AmqpPublisher(connection, senderLink, id, publishersList)
5454
}
5555

56-
private static async openSender(connection: Connection, address: string, publisherId: string): Promise<Sender> {
56+
private static async openSender(connection: Connection, publisherId: string, address?: string): Promise<Sender> {
5757
return openLink<Sender>(
5858
connection,
5959
SenderEvents.senderOpen,
6060
SenderEvents.senderError,
6161
connection.open_sender.bind(connection),
62-
getPublisherSenderLinkConfigurationFrom(address, publisherId)
62+
getPublisherSenderLinkConfigurationFrom(publisherId, address)
6363
)
6464
}
6565

test/e2e/consumer.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ describe("Consumer", () => {
6060
})
6161
})
6262

63-
test.skip("consumer can handle message on exchange, destination on message", async () => {
63+
test("consumer can handle message on exchange, destination on message", async () => {
6464
const publisher = await connection.createPublisher()
6565
const expectedBody = "ciao"
6666
await publisher.publish(

test/e2e/publisher.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,6 @@ describe("Publisher", () => {
115115
})
116116
)
117117

118-
expect(publishResult.outcome).to.eql(OutcomeState.ACCEPTED)
118+
expect(publishResult.outcome).to.eql(OutcomeState.RELEASED)
119119
})
120120
})

test/support/rhea_utils.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@ const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
4747
properties: { paired: true },
4848
}
4949

50-
const getPublisherNodeConfigurationFrom = (address: string): SenderOptions => ({
50+
const getPublisherNodeConfigurationFrom = (address?: string): SenderOptions => ({
5151
snd_settle_mode: 0,
5252
rcv_settle_mode: 0,
5353
name: "publisher-sender-link",
5454
target: { address, expiry_policy: "SESSION_END", durable: 0, dynamic: false },
5555
source: {
56-
address,
56+
address: address ?? "",
5757
expiry_policy: "LINK_DETACH",
5858
timeout: 0,
5959
dynamic: false,
@@ -116,7 +116,7 @@ async function openLink<T extends Sender | Receiver>(
116116
})
117117
}
118118

119-
export async function openPublisherSender(connection: Connection, address: string) {
119+
export async function openPublisherSender(connection: Connection, address?: string) {
120120
return openLink<Sender>(
121121
connection,
122122
SenderEvents.senderOpen,

test/unit/rhea/publisher.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,47 @@ describe("Creating a publisher through rhea", () => {
123123
})
124124
})
125125
})
126+
127+
describe("anonymous", () => {
128+
test("create a publisher", async () => {
129+
const publisher = await openPublisherSender(connection)
130+
131+
expect(publisher).to.not.eql(null)
132+
})
133+
134+
test("send a message through a publisher", async () => {
135+
const publisher = await openPublisherSender(connection)
136+
let test = false
137+
publisher.on(SenderEvents.accepted, () => {
138+
console.log("accepted")
139+
test = true
140+
})
141+
publisher.on(SenderEvents.sendable, () => {
142+
console.log("sendable")
143+
test = true
144+
})
145+
publisher.on(SenderEvents.modified, () => {
146+
console.log("modified")
147+
test = true
148+
})
149+
publisher.on(SenderEvents.rejected, () => {
150+
console.log("rejected")
151+
test = true
152+
})
153+
publisher.on(SenderEvents.released, () => {
154+
console.log("released")
155+
test = true
156+
})
157+
publisher.on(SenderEvents.settled, () => {
158+
console.log("settled")
159+
test = true
160+
})
161+
162+
publisher.send({ message_id: randomUUID(), body: "Hello world!", to: `/queues/${testQueueName}` })
163+
164+
await eventually(async () => {
165+
expect(test).eql(true)
166+
})
167+
})
168+
})
126169
})

0 commit comments

Comments
 (0)