Skip to content

Commit fc79c3c

Browse files
l4mbymagne
andauthored
feat: streams support for publishers (#49)
* feat: add annotations to message * chore: remove console logs --------- Co-authored-by: magne <[email protected]>
1 parent 5e57dc9 commit fc79c3c

File tree

4 files changed

+26
-7
lines changed

4 files changed

+26
-7
lines changed

conf/enabled_plugins

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
[rabbitmq_stream_management,rabbitmq_consistent_hash_exchange].
1+
[rabbit_stream,rabbitmq_stream_management,rabbitmq_consistent_hash_exchange].

src/consumer.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,12 @@ export class AmqpConsumer implements Consumer {
8181

8282
start() {
8383
this.receiverLink.on(ReceiverEvents.message, (context: EventContext) => {
84-
console.log("message received", context.message?.body)
8584
if (context.message && context.delivery) {
86-
console.log("message accepted")
8785
try {
8886
this.params.messageHandler(context.message)
8987
context.delivery.accept()
90-
console.log("message consumed")
9188
} catch (e) {
9289
context.delivery.reject({ condition: "Message Handler error", info: e })
93-
console.log("message rejected")
9490
}
9591
}
9692
})

src/message.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { generate_uuid, Message as RheaMessage } from "rhea"
1+
import { generate_uuid, MessageAnnotations, Message as RheaMessage } from "rhea"
22
import { AmqpEndpoints } from "./link_message_builder.js"
33
import { inspect } from "util"
44

@@ -16,6 +16,7 @@ export type DestinationOptions = { exchange: ExchangeOptions } | { queue: QueueO
1616
type MessageOptions = {
1717
body: string
1818
destination?: DestinationOptions
19+
annotations?: MessageAnnotations
1920
}
2021

2122
export function createAmqpMessage(options: MessageOptions): RheaMessage {
@@ -25,10 +26,16 @@ export function createAmqpMessage(options: MessageOptions): RheaMessage {
2526
body: options.body,
2627
to: createAddressFrom(options.destination),
2728
durable: true,
29+
message_annotations: options.annotations ?? {},
2830
}
2931
}
3032

31-
return { message_id: generate_uuid(), body: options.body, durable: true }
33+
return {
34+
message_id: generate_uuid(),
35+
body: options.body,
36+
durable: true,
37+
message_annotations: options.annotations ?? {},
38+
}
3239
}
3340

3441
export function createAddressFrom(options?: DestinationOptions): string | undefined {

test/e2e/publisher.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ describe("Publisher", () => {
2424
const queueName2 = "test-queue-2"
2525
const bindingKey = "test-key"
2626
const bindingKey2 = "test-key-2"
27+
const streamName = "test-stream"
2728

2829
beforeEach(async () => {
2930
environment = createEnvironment({
@@ -117,4 +118,19 @@ describe("Publisher", () => {
117118

118119
expect(publishResult.outcome).to.eql(OutcomeState.RELEASED)
119120
})
121+
122+
test("publish a message to a stream", async () => {
123+
const management = connection.management()
124+
await management.declareQueue(streamName, { type: "stream" })
125+
const publisher = await connection.createPublisher({ queue: { name: streamName } })
126+
127+
const publishResult = await publisher.publish(
128+
createAmqpMessage({
129+
body: "Hello World!",
130+
annotations: { "x-stream-filter-value": "invoices" },
131+
})
132+
)
133+
134+
expect(publishResult.outcome).to.eql(OutcomeState.ACCEPTED)
135+
})
120136
})

0 commit comments

Comments
 (0)