Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ConnectionEvents, create_container, Connection as RheaConnection } from "rhea"
import { ConnectionEvents, ConnectionOptions, create_container, Connection as RheaConnection } from "rhea"
import { AmqpManagement, Management } from "./management.js"
import { EnvironmentParams } from "./environment.js"
import { AmqpPublisher, Publisher } from "./publisher.js"
Expand All @@ -15,17 +15,26 @@ export interface Connection {
createConsumer(params: CreateConsumerParams): Promise<Consumer>
}

export type ConnectionParams =
| { reconnect: false }
| {
reconnect: true | number
initialReconnectDelay?: number
maxReconnectDelay?: number
reconnectLimit?: number
}

export class AmqpConnection implements Connection {
private _publishers: Map<string, Publisher> = new Map<string, Publisher>()
private _consumers: Map<string, Consumer> = new Map<string, Consumer>()

static async create(params: EnvironmentParams) {
const connection = await AmqpConnection.open(params)
static async create(envParams: EnvironmentParams, connParams?: ConnectionParams) {
const connection = await AmqpConnection.open(envParams, connParams)
const topologyManagement = await AmqpManagement.create(connection)
return new AmqpConnection(connection, topologyManagement)
}

private static async open(params: EnvironmentParams): Promise<RheaConnection> {
private static async open(envParams: EnvironmentParams, connParams?: ConnectionParams): Promise<RheaConnection> {
return new Promise((res, rej) => {
const container = create_container()
container.once(ConnectionEvents.connectionOpen, (context) => {
Expand All @@ -35,7 +44,7 @@ export class AmqpConnection implements Connection {
return rej(context.error ?? new Error("Connection error occurred"))
})

container.connect(params)
container.connect(buildConnectParams(envParams, connParams))
})
}

Expand Down Expand Up @@ -87,3 +96,25 @@ export class AmqpConnection implements Connection {
return this.connection ? this.connection.is_open() : false
}
}

function buildConnectParams(envParams: EnvironmentParams, connParams?: ConnectionParams): ConnectionOptions {
return {
...envParams,
...buildReconnectParams(connParams),
}
}

function buildReconnectParams(connParams?: ConnectionParams) {
if (connParams && connParams.reconnect) {
return {
reconnect: connParams.reconnect,
initial_reconnect_delay: connParams.initialReconnectDelay,
max_reconnect_delay: connParams.maxReconnectDelay,
reconnect_limit: connParams.reconnectLimit,
}
}

if (connParams && !connParams.reconnect) return { reconnect: false }

return { reconnect: true }
}
8 changes: 4 additions & 4 deletions src/environment.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AmqpConnection, Connection } from "./connection.js"
import { AmqpConnection, Connection, ConnectionParams } from "./connection.js"

export interface Environment {
createConnection(): Promise<Connection>
createConnection(params?: ConnectionParams): Promise<Connection>
close(): Promise<void>
}

Expand All @@ -18,8 +18,8 @@ export class AmqpEnvironment implements Environment {
private readonly connections: Connection[] = []
) {}

async createConnection(): Promise<Connection> {
const connection = await AmqpConnection.create(this.params)
async createConnection(params?: ConnectionParams): Promise<Connection> {
const connection = await AmqpConnection.create(this.params, params)
this.connections.push(connection)
return connection
}
Expand Down
52 changes: 51 additions & 1 deletion test/e2e/environment.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { afterEach, beforeEach, describe, expect, test } from "vitest"
import { createEnvironment, Environment } from "../../src/environment.js"
import { host, port, username, password, numberOfConnections, eventually } from "../support/util.js"
import {
host,
port,
username,
password,
numberOfConnections,
eventually,
closeAllConnections,
wait,
} from "../support/util.js"

describe("Environment", () => {
let environment: Environment
Expand All @@ -25,4 +34,45 @@ describe("Environment", () => {
expect(await numberOfConnections()).to.eql(1)
})
})

test("a connection with reconnect false does not reconnect", async () => {
await environment.createConnection({ reconnect: false })

await closeAllConnections()
await wait(2000)

expect(await numberOfConnections()).to.eql(0)
})

test("a connection with reconnect true reconnects", async () => {
await environment.createConnection({ reconnect: true })

await closeAllConnections()

await eventually(async () => {
expect(await numberOfConnections()).to.eql(1)
})
}, 10000)

test("a connection reconnects by default", async () => {
await environment.createConnection()

await closeAllConnections()

await eventually(async () => {
expect(await numberOfConnections()).to.eql(1)
})
}, 10000)

test("a connection with reconnect set to number retries after number ms", async () => {
await environment.createConnection({ reconnect: 2000, initialReconnectDelay: 2000 })

await closeAllConnections()
await wait(1900)

expect(await numberOfConnections()).to.eql(0)
await eventually(async () => {
expect(await numberOfConnections()).to.eql(1)
})
}, 10000)
})
22 changes: 22 additions & 0 deletions test/support/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ export const vhost = encodeURIComponent("/")
export const username = process.env.RABBITMQ_USER ?? "rabbit"
export const password = process.env.RABBITMQ_PASSWORD ?? "rabbit"

export async function getConnections(): Promise<ConnectionInfoResponse[]> {
const ret = await got.get<ConnectionInfoResponse[]>(`http://${host}:${managementPort}/api/connections`, {
username,
password,
responseType: "json",
})
return ret.body
}

export async function numberOfConnections(): Promise<number> {
const response = await got.get<ConnectionInfoResponse[]>(`http://${host}:${managementPort}/api/connections`, {
username,
Expand All @@ -54,6 +63,19 @@ export async function numberOfConnections(): Promise<number> {
return response.body.length
}

export async function closeAllConnections(): Promise<void> {
const l = await getConnections()
await Promise.all(l.map((c) => closeConnection(c.name)))
}

export async function closeConnection(name: string) {
return got.delete(`http://${host}:${managementPort}/api/connections/${name}`, {
username,
password,
responseType: "json",
})
}

export async function existsQueue(queueName: string): Promise<boolean> {
const response = await getQueueInfo(queueName)

Expand Down
2 changes: 1 addition & 1 deletion test/unit/rhea/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe("Rhea connections", () => {
})

afterEach(async () => {
await closeConnection(connection)
if (connection) await closeConnection(connection)
})

test("create a connection", async () => {
Expand Down