Skip to content

Commit 6e74f00

Browse files
authored
41 as a library user i want to handle autoreconnect of a connection (#52)
* passing reconnect params to rhea * fixed param types passed to rhea * tests * added timeout * more timeouts in tests
1 parent d4715e2 commit 6e74f00

File tree

5 files changed

+114
-11
lines changed

5 files changed

+114
-11
lines changed

src/connection.ts

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ConnectionEvents, create_container, Connection as RheaConnection } from "rhea"
1+
import { ConnectionEvents, ConnectionOptions, create_container, Connection as RheaConnection } from "rhea"
22
import { AmqpManagement, Management } from "./management.js"
33
import { EnvironmentParams } from "./environment.js"
44
import { AmqpPublisher, Publisher } from "./publisher.js"
@@ -15,17 +15,26 @@ export interface Connection {
1515
createConsumer(params: CreateConsumerParams): Promise<Consumer>
1616
}
1717

18+
export type ConnectionParams =
19+
| { reconnect: false }
20+
| {
21+
reconnect: true | number
22+
initialReconnectDelay?: number
23+
maxReconnectDelay?: number
24+
reconnectLimit?: number
25+
}
26+
1827
export class AmqpConnection implements Connection {
1928
private _publishers: Map<string, Publisher> = new Map<string, Publisher>()
2029
private _consumers: Map<string, Consumer> = new Map<string, Consumer>()
2130

22-
static async create(params: EnvironmentParams) {
23-
const connection = await AmqpConnection.open(params)
31+
static async create(envParams: EnvironmentParams, connParams?: ConnectionParams) {
32+
const connection = await AmqpConnection.open(envParams, connParams)
2433
const topologyManagement = await AmqpManagement.create(connection)
2534
return new AmqpConnection(connection, topologyManagement)
2635
}
2736

28-
private static async open(params: EnvironmentParams): Promise<RheaConnection> {
37+
private static async open(envParams: EnvironmentParams, connParams?: ConnectionParams): Promise<RheaConnection> {
2938
return new Promise((res, rej) => {
3039
const container = create_container()
3140
container.once(ConnectionEvents.connectionOpen, (context) => {
@@ -35,7 +44,7 @@ export class AmqpConnection implements Connection {
3544
return rej(context.error ?? new Error("Connection error occurred"))
3645
})
3746

38-
container.connect(params)
47+
container.connect(buildConnectParams(envParams, connParams))
3948
})
4049
}
4150

@@ -87,3 +96,25 @@ export class AmqpConnection implements Connection {
8796
return this.connection ? this.connection.is_open() : false
8897
}
8998
}
99+
100+
function buildConnectParams(envParams: EnvironmentParams, connParams?: ConnectionParams): ConnectionOptions {
101+
return {
102+
...envParams,
103+
...buildReconnectParams(connParams),
104+
}
105+
}
106+
107+
function buildReconnectParams(connParams?: ConnectionParams) {
108+
if (connParams && connParams.reconnect) {
109+
return {
110+
reconnect: connParams.reconnect,
111+
initial_reconnect_delay: connParams.initialReconnectDelay,
112+
max_reconnect_delay: connParams.maxReconnectDelay,
113+
reconnect_limit: connParams.reconnectLimit,
114+
}
115+
}
116+
117+
if (connParams && !connParams.reconnect) return { reconnect: false }
118+
119+
return { reconnect: true }
120+
}

src/environment.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { AmqpConnection, Connection } from "./connection.js"
1+
import { AmqpConnection, Connection, ConnectionParams } from "./connection.js"
22

33
export interface Environment {
4-
createConnection(): Promise<Connection>
4+
createConnection(params?: ConnectionParams): Promise<Connection>
55
close(): Promise<void>
66
}
77

@@ -18,8 +18,8 @@ export class AmqpEnvironment implements Environment {
1818
private readonly connections: Connection[] = []
1919
) {}
2020

21-
async createConnection(): Promise<Connection> {
22-
const connection = await AmqpConnection.create(this.params)
21+
async createConnection(params?: ConnectionParams): Promise<Connection> {
22+
const connection = await AmqpConnection.create(this.params, params)
2323
this.connections.push(connection)
2424
return connection
2525
}

test/e2e/environment.test.ts

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
import { afterEach, beforeEach, describe, expect, test } from "vitest"
22
import { createEnvironment, Environment } from "../../src/environment.js"
3-
import { host, port, username, password, numberOfConnections, eventually } from "../support/util.js"
3+
import {
4+
host,
5+
port,
6+
username,
7+
password,
8+
numberOfConnections,
9+
eventually,
10+
closeAllConnections,
11+
wait,
12+
} from "../support/util.js"
413

514
describe("Environment", () => {
615
let environment: Environment
@@ -25,4 +34,45 @@ describe("Environment", () => {
2534
expect(await numberOfConnections()).to.eql(1)
2635
})
2736
})
37+
38+
test("a connection with reconnect false does not reconnect", async () => {
39+
await environment.createConnection({ reconnect: false })
40+
41+
await closeAllConnections()
42+
await wait(2000)
43+
44+
expect(await numberOfConnections()).to.eql(0)
45+
})
46+
47+
test("a connection with reconnect true reconnects", async () => {
48+
await environment.createConnection({ reconnect: true })
49+
50+
await closeAllConnections()
51+
52+
await eventually(async () => {
53+
expect(await numberOfConnections()).to.eql(1)
54+
})
55+
}, 10000)
56+
57+
test("a connection reconnects by default", async () => {
58+
await environment.createConnection()
59+
60+
await closeAllConnections()
61+
62+
await eventually(async () => {
63+
expect(await numberOfConnections()).to.eql(1)
64+
})
65+
}, 10000)
66+
67+
test("a connection with reconnect set to number retries after number ms", async () => {
68+
await environment.createConnection({ reconnect: 2000, initialReconnectDelay: 2000 })
69+
70+
await closeAllConnections()
71+
await wait(1900)
72+
73+
expect(await numberOfConnections()).to.eql(0)
74+
await eventually(async () => {
75+
expect(await numberOfConnections()).to.eql(1)
76+
})
77+
}, 10000)
2878
})

test/support/util.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ export const vhost = encodeURIComponent("/")
4444
export const username = process.env.RABBITMQ_USER ?? "rabbit"
4545
export const password = process.env.RABBITMQ_PASSWORD ?? "rabbit"
4646

47+
export async function getConnections(): Promise<ConnectionInfoResponse[]> {
48+
const ret = await got.get<ConnectionInfoResponse[]>(`http://${host}:${managementPort}/api/connections`, {
49+
username,
50+
password,
51+
responseType: "json",
52+
})
53+
return ret.body
54+
}
55+
4756
export async function numberOfConnections(): Promise<number> {
4857
const response = await got.get<ConnectionInfoResponse[]>(`http://${host}:${managementPort}/api/connections`, {
4958
username,
@@ -54,6 +63,19 @@ export async function numberOfConnections(): Promise<number> {
5463
return response.body.length
5564
}
5665

66+
export async function closeAllConnections(): Promise<void> {
67+
const l = await getConnections()
68+
await Promise.all(l.map((c) => closeConnection(c.name)))
69+
}
70+
71+
export async function closeConnection(name: string) {
72+
return got.delete(`http://${host}:${managementPort}/api/connections/${name}`, {
73+
username,
74+
password,
75+
responseType: "json",
76+
})
77+
}
78+
5779
export async function existsQueue(queueName: string): Promise<boolean> {
5880
const response = await getQueueInfo(queueName)
5981

test/unit/rhea/connection.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ describe("Rhea connections", () => {
1212
})
1313

1414
afterEach(async () => {
15-
await closeConnection(connection)
15+
if (connection) await closeConnection(connection)
1616
})
1717

1818
test("create a connection", async () => {

0 commit comments

Comments
 (0)