Skip to content

Commit a2e601f

Browse files
authored
7 as a library user i want to create a consumer so i can consume messages (#29)
* WIP consumer * added tests * fixed index
1 parent f5a42e8 commit a2e601f

File tree

5 files changed

+260
-0
lines changed

5 files changed

+260
-0
lines changed

src/connection.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,21 @@ import { AmqpManagement, Management } from "./management.js"
33
import { EnvironmentParams } from "./environment.js"
44
import { AmqpPublisher, Publisher } from "./publisher.js"
55
import { DestinationOptions } from "./message.js"
6+
import { AmqpConsumer, Consumer, CreateConsumerParams } from "./consumer.js"
67

78
export interface Connection {
89
close(): Promise<boolean>
910
isOpen(): boolean
1011
management(): Management
1112
createPublisher(options?: DestinationOptions): Promise<Publisher>
1213
get publishers(): Map<string, Publisher>
14+
get consumers(): Map<string, Consumer>
15+
createConsumer(queueName: string, params: CreateConsumerParams): Promise<Consumer>
1316
}
1417

1518
export class AmqpConnection implements Connection {
1619
private _publishers: Map<string, Publisher> = new Map<string, Publisher>()
20+
private _consumers: Map<string, Consumer> = new Map<string, Consumer>()
1721

1822
static async create(params: EnvironmentParams) {
1923
const connection = await AmqpConnection.open(params)
@@ -50,10 +54,17 @@ export class AmqpConnection implements Connection {
5054
})
5155

5256
this._publishers.forEach((p) => p.close())
57+
this._consumers.forEach((p) => p.close())
5358
this.connection.close()
5459
})
5560
}
5661

62+
async createConsumer(queueName: string, params: CreateConsumerParams): Promise<Consumer> {
63+
const consumer = await AmqpConsumer.createFrom(this.connection, this._consumers, queueName, params)
64+
this._consumers.set(consumer.id, consumer)
65+
return consumer
66+
}
67+
5768
management(): Management {
5869
return this.topologyManagement
5970
}
@@ -68,6 +79,10 @@ export class AmqpConnection implements Connection {
6879
return this._publishers
6980
}
7081

82+
public get consumers(): Map<string, Consumer> {
83+
return this._consumers
84+
}
85+
7186
public isOpen(): boolean {
7287
return this.connection ? this.connection.is_open() : false
7388
}

src/consumer.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import {
2+
generate_uuid,
3+
Receiver,
4+
ReceiverEvents,
5+
ReceiverOptions,
6+
Connection,
7+
SenderOptions,
8+
EventContext,
9+
Message,
10+
} from "rhea"
11+
import { openLink } from "./utils.js"
12+
import { createAddressFrom } from "./message.js"
13+
14+
export type ConsumerMessageHandler = (message: Message) => void
15+
16+
export type CreateConsumerParams = {
17+
messageHandler: ConsumerMessageHandler
18+
}
19+
20+
const getConsumerReceiverLinkConfigurationFrom = (
21+
address: string,
22+
consumerId: string
23+
): SenderOptions | ReceiverOptions => ({
24+
snd_settle_mode: 0,
25+
rcv_settle_mode: 0,
26+
name: consumerId,
27+
target: { address, expiry_policy: "SESSION_END", durable: 0, dynamic: false },
28+
source: {
29+
address,
30+
expiry_policy: "LINK_DETACH",
31+
timeout: 0,
32+
dynamic: false,
33+
durable: 0,
34+
},
35+
})
36+
37+
export interface Consumer {
38+
start(): void
39+
close(): void
40+
get id(): string
41+
}
42+
43+
export class AmqpConsumer implements Consumer {
44+
static async createFrom(
45+
connection: Connection,
46+
consumersList: Map<string, Consumer>,
47+
queueName: string,
48+
params: CreateConsumerParams
49+
) {
50+
const id = generate_uuid()
51+
const address = createAddressFrom({ queue: { name: queueName } })
52+
const receiverLink = await AmqpConsumer.openReceiver(connection, address, id)
53+
return new AmqpConsumer(id, connection, consumersList, receiverLink, params)
54+
}
55+
56+
private static async openReceiver(connection: Connection, address: string, consumerId: string): Promise<Receiver> {
57+
return openLink<Receiver>(
58+
connection,
59+
ReceiverEvents.receiverOpen,
60+
ReceiverEvents.receiverError,
61+
connection.open_receiver.bind(connection),
62+
getConsumerReceiverLinkConfigurationFrom(address, consumerId)
63+
)
64+
}
65+
66+
constructor(
67+
private readonly _id: string,
68+
private readonly connection: Connection,
69+
private readonly consumersList: Map<string, Consumer>,
70+
private readonly receiverLink: Receiver,
71+
private readonly params: CreateConsumerParams
72+
) {
73+
console.log(this.connection.container_id)
74+
}
75+
76+
get id() {
77+
return this._id
78+
}
79+
80+
start() {
81+
this.receiverLink.on(ReceiverEvents.message, (context: EventContext) => {
82+
console.log("message received", context.message?.body)
83+
if (context.message && context.delivery) {
84+
console.log("message accepted")
85+
try {
86+
this.params.messageHandler(context.message)
87+
context.delivery.accept()
88+
console.log("message consumed")
89+
} catch (e) {
90+
context.delivery.reject({ condition: "Message Handler error", info: e })
91+
console.log("message rejected")
92+
}
93+
}
94+
})
95+
}
96+
97+
close() {
98+
this.receiverLink.removeAllListeners()
99+
if (this.receiverLink.is_open()) this.receiverLink.close()
100+
if (this.consumersList.has(this._id)) this.consumersList.delete(this._id)
101+
}
102+
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ export { Management, AmqpManagement } from "./management.js"
22
export { Environment, AmqpEnvironment } from "./environment.js"
33
export { Connection, AmqpConnection } from "./connection.js"
44
export { Publisher, AmqpPublisher } from "./publisher.js"
5+
export { Consumer, AmqpConsumer } from "./consumer.js"
56
export { createAmqpMessage } from "./message.js"

test/e2e/connection.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,39 @@ describe("Connection", () => {
106106

107107
expect(newConnection.publishers.size).eql(0)
108108
})
109+
110+
test("create a consumer linked to a queue", async () => {
111+
await connection.createConsumer(queueName, {
112+
messageHandler: async (msg) => {
113+
console.log(msg)
114+
},
115+
})
116+
117+
expect(connection.consumers.size).eql(1)
118+
})
119+
120+
test("close a consumer", async () => {
121+
const consumer = await connection.createConsumer(queueName, {
122+
messageHandler: async (msg) => {
123+
console.log(msg)
124+
},
125+
})
126+
127+
consumer.close()
128+
129+
expect(connection.consumers.size).eql(0)
130+
})
131+
132+
test("closing the connection also closes the consumer", async () => {
133+
const newConnection = await environment.createConnection()
134+
await newConnection.createConsumer(queueName, {
135+
messageHandler: async (msg) => {
136+
console.log(msg)
137+
},
138+
})
139+
140+
await newConnection.close()
141+
142+
expect(newConnection.consumers.size).eql(0)
143+
})
109144
})

test/e2e/consumer.test.ts

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import { Management } from "../../src/index.js"
2+
import { afterEach, beforeEach, describe, expect, test } from "vitest"
3+
import { eventually, host, password, port, username, cleanRabbit } from "../support/util.js"
4+
import { createEnvironment, Environment } from "../../src/environment.js"
5+
import { Connection } from "../../src/connection.js"
6+
import { Queue } from "../../src/queue.js"
7+
import { Exchange } from "../../src/exchange.js"
8+
import { createAmqpMessage } from "../../src/message.js"
9+
10+
describe("Consumer", () => {
11+
let environment: Environment
12+
let connection: Connection
13+
let management: Management
14+
let queue: Queue
15+
let exchange: Exchange
16+
17+
const exchangeName = "test-exchange"
18+
const queueName = "test-queue"
19+
const bindingKey = "test-binding"
20+
21+
beforeEach(async () => {
22+
environment = createEnvironment({
23+
host,
24+
port,
25+
username,
26+
password,
27+
})
28+
connection = await environment.createConnection()
29+
management = connection.management()
30+
queue = await management.declareQueue(queueName)
31+
exchange = await management.declareExchange(exchangeName)
32+
await management.bind(bindingKey, { source: exchange, destination: queue })
33+
})
34+
35+
afterEach(async () => {
36+
try {
37+
await cleanRabbit({ match: /test-/ })
38+
await connection.close()
39+
await environment.close()
40+
} catch (error) {
41+
console.error(error)
42+
}
43+
})
44+
45+
test("consumer can handle message on exchange", async () => {
46+
const publisher = await connection.createPublisher({ exchange: { name: exchangeName, routingKey: bindingKey } })
47+
const expectedBody = "ciao"
48+
await publisher.publish(createAmqpMessage({ body: expectedBody }))
49+
let received: string = ""
50+
51+
const consumer = await connection.createConsumer(queueName, {
52+
messageHandler: (message) => {
53+
received = message.body
54+
},
55+
})
56+
consumer.start()
57+
58+
await eventually(() => {
59+
expect(received).to.be.eql(expectedBody)
60+
})
61+
})
62+
63+
test.skip("consumer can handle message on exchange, destination on message", async () => {
64+
const publisher = await connection.createPublisher()
65+
const expectedBody = "ciao"
66+
await publisher.publish(
67+
createAmqpMessage({
68+
body: expectedBody,
69+
destination: { exchange: { name: exchangeName, routingKey: bindingKey } },
70+
})
71+
)
72+
let received: string = ""
73+
74+
const consumer = await connection.createConsumer(queueName, {
75+
messageHandler: (message) => {
76+
received = message.body
77+
},
78+
})
79+
consumer.start()
80+
81+
await eventually(() => {
82+
expect(received).to.be.eql(expectedBody)
83+
})
84+
})
85+
86+
test("consumer can handle message on queue", async () => {
87+
const publisher = await connection.createPublisher({ queue: { name: queueName } })
88+
const expectedBody = "ciao"
89+
await publisher.publish(
90+
createAmqpMessage({
91+
body: expectedBody,
92+
})
93+
)
94+
let received: string = ""
95+
96+
const consumer = await connection.createConsumer(queueName, {
97+
messageHandler: (message) => {
98+
received = message.body
99+
},
100+
})
101+
consumer.start()
102+
103+
await eventually(() => {
104+
expect(received).to.be.eql(expectedBody)
105+
})
106+
})
107+
})

0 commit comments

Comments
 (0)