Skip to content

Commit 6f0a4eb

Browse files
authored
Merge pull request #12 from coders51/9-as-a-library-user-i-want-to-create-the-environment-in-order-to-allocate-resources-for-the-connections
[IS-9/feat]: as a library user i want to create the environment in order to allocate resources for the connections
2 parents f9dcff2 + b35b006 commit 6f0a4eb

File tree

10 files changed

+322
-8
lines changed

10 files changed

+322
-8
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020

2121
services:
2222
rabbitmq:
23-
image: rabbitmq:4.1.0-management
23+
image: rabbitmq:4.0.9-management
2424
options: --hostname test-node --name test-node
2525
env:
2626
RABBITMQ_HOSTNAME: "test-node"

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
rabbitmq-rhea:
3-
image: rabbitmq:4.1.0-management
3+
image: rabbitmq:4.0.9-management
44
container_name: rabbitmq-rhea
55
restart: unless-stopped
66
hostname: "rabbitmq"

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
"vitest": "^3.1.3"
6666
},
6767
"dependencies": {
68+
"assertion-error": "^2.0.1",
6869
"rhea": "^3.0.4"
6970
}
7071
}

src/connection.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { ConnectionEvents, Connection as RheaConnection } from "rhea"
2+
3+
export interface Connection {
4+
close(): Promise<boolean>
5+
isOpen(): boolean
6+
}
7+
8+
export class AmqpConnection implements Connection {
9+
private readonly rheaConnection: RheaConnection
10+
11+
constructor(connection: RheaConnection) {
12+
this.rheaConnection = connection
13+
}
14+
15+
async close(): Promise<boolean> {
16+
return new Promise((res, rej) => {
17+
this.rheaConnection.once(ConnectionEvents.connectionClose, () => {
18+
return res(true)
19+
})
20+
this.rheaConnection.once(ConnectionEvents.connectionError, (context) => {
21+
return rej(new Error("Connection error: " + context.connection.error))
22+
})
23+
24+
this.rheaConnection.close()
25+
})
26+
}
27+
28+
public isOpen(): boolean {
29+
return this.rheaConnection.is_open()
30+
}
31+
}

src/environment.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { ConnectionEvents, Container, create_container } from "rhea"
2+
import { AmqpConnection, Connection } from "./connection.js"
3+
import { Connection as RheaConnection } from "rhea"
4+
5+
export interface Environment {
6+
createConnection(): Promise<Connection>
7+
close(): Promise<void>
8+
}
9+
10+
export type EnvironmentParams = {
11+
host: string
12+
port: number
13+
username: string
14+
password: string
15+
}
16+
17+
export class AmqpEnvironment implements Environment {
18+
private readonly host: string
19+
private readonly port: number
20+
private readonly username: string
21+
private readonly password: string
22+
private readonly container: Container
23+
private connections: Connection[] = []
24+
25+
constructor({ host, port, username, password }: EnvironmentParams) {
26+
this.host = host
27+
this.port = port
28+
this.username = username
29+
this.password = password
30+
this.container = create_container()
31+
}
32+
33+
async createConnection(): Promise<Connection> {
34+
const rheaConnection = await this.openConnection()
35+
const connection = new AmqpConnection(rheaConnection)
36+
this.connections.push(connection)
37+
38+
return connection
39+
}
40+
41+
private async openConnection(): Promise<RheaConnection> {
42+
return new Promise((res, rej) => {
43+
this.container.once(ConnectionEvents.connectionOpen, (context) => {
44+
return res(context.connection)
45+
})
46+
this.container.once(ConnectionEvents.error, (context) => {
47+
return rej(context.error ?? new Error("Connection error occurred"))
48+
})
49+
50+
this.container.connect({ host: this.host, port: this.port, username: this.username, password: this.password })
51+
})
52+
}
53+
54+
async close(): Promise<void> {
55+
await this.closeConnections()
56+
this.connections = []
57+
}
58+
59+
private async closeConnections(): Promise<void> {
60+
await Promise.allSettled(
61+
this.connections.map(async (c) => {
62+
if (c.isOpen()) await c.close()
63+
})
64+
)
65+
}
66+
}
67+
68+
export function createEnvironment(params: EnvironmentParams): Environment {
69+
return new AmqpEnvironment(params)
70+
}

test/e2e/environment.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { afterEach, beforeEach, describe, test } from "vitest"
2+
import { use, expect } from "chai"
3+
import chaiAsPromised from "chai-as-promised"
4+
import { createEnvironment, Environment } from "../../src/environment.js"
5+
import { host, port, username, password, numberOfConnections, eventually } from "../support/util.js"
6+
7+
use(chaiAsPromised)
8+
9+
describe("Environment", () => {
10+
let environment: Environment
11+
12+
beforeEach(async () => {
13+
environment = createEnvironment({
14+
host,
15+
port,
16+
username,
17+
password,
18+
})
19+
})
20+
21+
afterEach(async () => {
22+
await environment.close()
23+
})
24+
25+
test("create a connection through the environment", async () => {
26+
await environment.createConnection()
27+
28+
await eventually(async () => {
29+
expect(await numberOfConnections()).to.eql(1)
30+
})
31+
})
32+
})

test/e2e/management.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import chaiAsPromised from "chai-as-promised"
66

77
use(chaiAsPromised)
88

9-
describe("Management", () => {
9+
describe.skip("Management", () => {
1010
let management: Management
1111

1212
beforeEach(() => {
@@ -17,7 +17,7 @@ describe("Management", () => {
1717
management.close()
1818
})
1919

20-
test.skip("create a queue through the management", async () => {
20+
test("create a queue through the management", async () => {
2121
const queue = management.queue("test-coda").exclusive(true).autoDelete(true).declare()
2222

2323
expect(await existsQueue(queue.name)).to.eql(true)

test/support/util.ts

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,33 @@
11
import { inspect } from "util"
22
import got, { Response } from "got"
3+
import { expect } from "chai"
4+
import { AssertionError } from "assertion-error"
5+
6+
export type ConnectionInfoResponse = {
7+
name: string
8+
}
39

410
export type QueueInfoResponse = {
511
name: string
612
}
713

8-
const host = process.env.RABBITMQ_HOSTNAME ?? "localhost"
9-
const managementPort = 15672
10-
const vhost = encodeURIComponent("/")
14+
export const host = process.env.RABBITMQ_HOSTNAME ?? "localhost"
15+
export const port = parseInt(process.env.RABBITMQ_PORT ?? "5672")
16+
export const managementPort = 15672
17+
export const vhost = encodeURIComponent("/")
1118
export const username = process.env.RABBITMQ_USER ?? "rabbit"
1219
export const password = process.env.RABBITMQ_PASSWORD ?? "rabbit"
1320

21+
export async function numberOfConnections(): Promise<number> {
22+
const response = await got.get<ConnectionInfoResponse[]>(`http://${host}:${managementPort}/api/connections`, {
23+
username,
24+
password,
25+
responseType: "json",
26+
})
27+
28+
return response.body.length
29+
}
30+
1431
export async function existsQueue(queueName: string): Promise<boolean> {
1532
const response = await getQueueInfo(queueName)
1633

@@ -34,3 +51,30 @@ async function getQueueInfo(queue: string): Promise<Response<QueueInfoResponse>>
3451

3552
return response
3653
}
54+
55+
export async function wait(ms: number) {
56+
return new Promise((res) => {
57+
setTimeout(() => res(true), ms)
58+
})
59+
}
60+
61+
export function elapsedFrom(from: number): number {
62+
return Date.now() - from
63+
}
64+
65+
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
66+
export async function eventually(fn: Function, timeout = 5000) {
67+
const start = Date.now()
68+
while (true) {
69+
try {
70+
await fn()
71+
return
72+
} catch (error) {
73+
if (elapsedFrom(start) > timeout) {
74+
if (error instanceof AssertionError) throw error
75+
expect.fail(error instanceof Error ? error.message : String(error))
76+
}
77+
await wait(5)
78+
}
79+
}
80+
}

test/unit/rhea/connection.test.ts

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import { afterEach, beforeEach, describe, test } from "vitest"
2+
import { use, expect } from "chai"
3+
import chaiAsPromised from "chai-as-promised"
4+
import { host, port, username, password, numberOfConnections, eventually } from "../../support/util.js"
5+
import {
6+
Connection,
7+
ConnectionEvents,
8+
ConnectionOptions,
9+
Container,
10+
create_container,
11+
Receiver,
12+
ReceiverEvents,
13+
ReceiverOptions,
14+
Sender,
15+
SenderEvents,
16+
SenderOptions,
17+
} from "rhea"
18+
19+
use(chaiAsPromised)
20+
21+
describe("Rhea tests", () => {
22+
let container: Container
23+
let connection: Connection
24+
25+
beforeEach(async () => {
26+
container = create_container()
27+
})
28+
29+
afterEach(async () => {
30+
await close(connection)
31+
})
32+
33+
test("create a connection", async () => {
34+
connection = await open(container, {
35+
host,
36+
port,
37+
username,
38+
password,
39+
})
40+
41+
await eventually(async () => {
42+
expect(await numberOfConnections()).to.eql(1)
43+
})
44+
})
45+
46+
test("connect to the management", async () => {
47+
connection = await open(container, {
48+
host,
49+
port,
50+
username,
51+
password,
52+
})
53+
54+
await eventually(async () => {
55+
await openSender(connection)
56+
await openReceiver(connection)
57+
}, 4000)
58+
})
59+
})
60+
61+
async function open(container: Container, params: ConnectionOptions): Promise<Connection> {
62+
return new Promise((res, rej) => {
63+
container.once(ConnectionEvents.connectionOpen, (context) => {
64+
return res(context.connection)
65+
})
66+
container.once(ConnectionEvents.error, (context) => {
67+
return rej(context.connection.error)
68+
})
69+
container.connect(params)
70+
})
71+
}
72+
73+
async function close(connection: Connection): Promise<void> {
74+
return new Promise((res, rej) => {
75+
connection.once(ConnectionEvents.connectionClose, () => {
76+
res()
77+
})
78+
connection.once(ConnectionEvents.connectionError, (context) => {
79+
rej(new Error("Connection error: " + context.connection.error))
80+
})
81+
connection.close()
82+
})
83+
}
84+
85+
const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
86+
snd_settle_mode: 1,
87+
rcv_settle_mode: 0,
88+
name: "management-link-pair",
89+
target: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false },
90+
source: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false, durable: 0 },
91+
properties: { paired: true },
92+
}
93+
94+
async function openReceiver(connection: Connection) {
95+
return openLink(
96+
connection,
97+
ReceiverEvents.receiverOpen,
98+
ReceiverEvents.receiverError,
99+
connection.open_receiver.bind(connection),
100+
MANAGEMENT_NODE_CONFIGURATION
101+
)
102+
}
103+
104+
async function openSender(connection: Connection) {
105+
return openLink(
106+
connection,
107+
SenderEvents.senderOpen,
108+
SenderEvents.senderError,
109+
connection.open_sender.bind(connection),
110+
MANAGEMENT_NODE_CONFIGURATION
111+
)
112+
}
113+
114+
type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
115+
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
116+
type OpenLinkMethods =
117+
| ((options?: SenderOptions | string) => Sender)
118+
| ((options?: ReceiverOptions | string) => Receiver)
119+
120+
async function openLink(
121+
connection: Connection,
122+
successEvent: LinkOpenEvents,
123+
errorEvent: LinkErrorEvents,
124+
openMethod: OpenLinkMethods,
125+
config?: SenderOptions | ReceiverOptions | string
126+
): Promise<Sender | Receiver> {
127+
return new Promise((res, rej) => {
128+
connection.once(successEvent, (context) => {
129+
return res(context.receiver || context.sender)
130+
})
131+
connection.once(errorEvent, (context) => {
132+
return rej(context.connection.error)
133+
})
134+
openMethod(config)
135+
})
136+
}

0 commit comments

Comments
 (0)