Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ export class AmqpConsumer implements Consumer {
) {
const id = generate_uuid()
const address = createAddressFrom({ queue: { name: queueName } })
if (!address) throw new Error("Consumer must have an address")

const receiverLink = await AmqpConsumer.openReceiver(connection, address, id)
return new AmqpConsumer(id, connection, consumersList, receiverLink, params)
}
Expand Down
13 changes: 9 additions & 4 deletions src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@ type MessageOptions = {

export function createAmqpMessage(options: MessageOptions): RheaMessage {
if (options.destination) {
return { message_id: generate_uuid(), body: options.body, to: createAddressFrom(options.destination) }
return {
message_id: generate_uuid(),
body: options.body,
to: createAddressFrom(options.destination),
durable: true,
}
}

return { message_id: generate_uuid(), body: options.body }
return { message_id: generate_uuid(), body: options.body, durable: true }
}

export function createAddressFrom(options?: DestinationOptions): string {
if (!options) return ""
export function createAddressFrom(options?: DestinationOptions): string | undefined {
if (!options) return undefined
if ("queue" in options) return `/${AmqpEndpoints.Queues}/${options.queue.name}`
if ("exchange" in options) {
return options.exchange.routingKey
Expand Down
12 changes: 6 additions & 6 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import { inspect } from "util"
import { createAddressFrom, DestinationOptions } from "./message.js"

const getPublisherSenderLinkConfigurationFrom = (
address: string,
publisherId: string
publisherId: string,
address?: string
): SenderOptions | ReceiverOptions => ({
snd_settle_mode: 0,
rcv_settle_mode: 0,
name: publisherId,
target: { address, expiry_policy: "SESSION_END", durable: 0, dynamic: false },
source: {
address,
address: address ?? "",
expiry_policy: "LINK_DETACH",
timeout: 0,
dynamic: false,
Expand Down Expand Up @@ -49,17 +49,17 @@ export class AmqpPublisher implements Publisher {
): Promise<Publisher> {
const address = createAddressFrom(options)
const id = randomUUID()
const senderLink = await AmqpPublisher.openSender(connection, address, id)
const senderLink = await AmqpPublisher.openSender(connection, id, address)
return new AmqpPublisher(connection, senderLink, id, publishersList)
}

private static async openSender(connection: Connection, address: string, publisherId: string): Promise<Sender> {
private static async openSender(connection: Connection, publisherId: string, address?: string): Promise<Sender> {
return openLink<Sender>(
connection,
SenderEvents.senderOpen,
SenderEvents.senderError,
connection.open_sender.bind(connection),
getPublisherSenderLinkConfigurationFrom(address, publisherId)
getPublisherSenderLinkConfigurationFrom(publisherId, address)
)
}

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe("Consumer", () => {
})
})

test.skip("consumer can handle message on exchange, destination on message", async () => {
test("consumer can handle message on exchange, destination on message", async () => {
const publisher = await connection.createPublisher()
const expectedBody = "ciao"
await publisher.publish(
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,6 @@ describe("Publisher", () => {
})
)

expect(publishResult.outcome).to.eql(OutcomeState.ACCEPTED)
expect(publishResult.outcome).to.eql(OutcomeState.RELEASED)
})
})
6 changes: 3 additions & 3 deletions test/support/rhea_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
properties: { paired: true },
}

const getPublisherNodeConfigurationFrom = (address: string): SenderOptions => ({
const getPublisherNodeConfigurationFrom = (address?: string): SenderOptions => ({
snd_settle_mode: 0,
rcv_settle_mode: 0,
name: "publisher-sender-link",
target: { address, expiry_policy: "SESSION_END", durable: 0, dynamic: false },
source: {
address,
address: address ?? "",
expiry_policy: "LINK_DETACH",
timeout: 0,
dynamic: false,
Expand Down Expand Up @@ -116,7 +116,7 @@ async function openLink<T extends Sender | Receiver>(
})
}

export async function openPublisherSender(connection: Connection, address: string) {
export async function openPublisherSender(connection: Connection, address?: string) {
return openLink<Sender>(
connection,
SenderEvents.senderOpen,
Expand Down
43 changes: 43 additions & 0 deletions test/unit/rhea/publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,47 @@ describe("Creating a publisher through rhea", () => {
})
})
})

describe("anonymous", () => {
test("create a publisher", async () => {
const publisher = await openPublisherSender(connection)

expect(publisher).to.not.eql(null)
})

test("send a message through a publisher", async () => {
const publisher = await openPublisherSender(connection)
let test = false
publisher.on(SenderEvents.accepted, () => {
console.log("accepted")
test = true
})
publisher.on(SenderEvents.sendable, () => {
console.log("sendable")
test = true
})
publisher.on(SenderEvents.modified, () => {
console.log("modified")
test = true
})
publisher.on(SenderEvents.rejected, () => {
console.log("rejected")
test = true
})
publisher.on(SenderEvents.released, () => {
console.log("released")
test = true
})
publisher.on(SenderEvents.settled, () => {
console.log("settled")
test = true
})

publisher.send({ message_id: randomUUID(), body: "Hello world!", to: `/queues/${testQueueName}` })

await eventually(async () => {
expect(test).eql(true)
})
})
})
})