Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Client examples

- [Basic Usage](./index.js) - Producer and Consumer example without reconnection
- [WebSocket Example](./websocket_example.js) - Sample using node WebSocket connections
2 changes: 1 addition & 1 deletion examples/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"main": "index.js",
"scripts": {
"start": "node index.js",
"rebuild-source": "cd .. && npm run build && cd - && npm install --force"
"rebuild-source": "cd .. && npm run build && cd - && npm install --force",
"websocket-example": "node websocket_example.js"
},
"author": "",
"license": "ISC",
Expand Down
81 changes: 81 additions & 0 deletions examples/websocket_example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
const rabbit = require("rabbitmq-amqp-js-client")
const { randomUUID } = require("crypto")

const rabbitUser = process.env.RABBITMQ_USER ?? "rabbit"
const rabbitPassword = process.env.RABBITMQ_PASSWORD ?? "rabbit"
const rabbitHost = process.env.RABBITMQ_HOSTNAME ?? "localhost"
const rabbitPort = process.env.RABBITMQ_PORT ?? 5672

async function main() {
const testExchange = `test-exchange-${randomUUID()}`
const testQueue = `test-queue-${randomUUID()}`
const routingKey = `test-key-${randomUUID()}`

console.log("Creating the environment...")
const environment = rabbit.createEnvironment({
host: rabbitHost,
port: rabbitPort,
username: rabbitUser,
password: rabbitPassword,
webSocket: WebSocket,
})

console.log("Opening a connection...")
const connection = await environment.createConnection()
const management = connection.management()

console.log("Creating a queue and an exchange...")
const queue = await management.declareQueue(testQueue)
const exchange = await management.declareExchange(testExchange)

console.log("Binding exchange to queue...")
await management.bind(routingKey, { source: exchange, destination: queue })

console.log("Opening a publisher and publishing 10 messages...")
const publisher = await connection.createPublisher({ exchange: { name: testExchange, routingKey: routingKey } })
for (const i of Array(10).keys()) {
const publishResult = await publisher.publish(rabbit.createAmqpMessage({ body: `Hello - ${i} - ` }))
switch (publishResult.outcome) {
case rabbit.OutcomeState.ACCEPTED:
console.log("Message Accepted")
break
case rabbit.OutcomeState.RELEASED:
console.log("Message Released")
break
case rabbit.OutcomeState.REJECTED:
console.log("Message Rejected")
break
default:
break
}
}
publisher.close()

console.log("Opening a consumer and consuming messages...")
const consumer = await connection.createConsumer({
queue: { name: testQueue },
messageHandler: (context, msg) => {
context.accept()
console.log(`MessageId: ${msg.message_id}; Payload: ${msg.body}`)
},
})
consumer.start()
await sleep(5000)

console.log("Cleaning up...")
consumer.close()
await management.unbind(routingKey, { source: exchange, destination: queue })
await management.deleteExchange(testExchange)
await management.deleteQueue(testQueue)
management.close()
await connection.close()
await environment.close()
}

main()
.then(() => console.log("done!"))
.catch((res) => {
console.log("ERROR ", res)
process.exit(-1)
})
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))
32 changes: 30 additions & 2 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { ConnectionEvents, ConnectionOptions, create_container, Connection as RheaConnection } from "rhea"
import {
ConnectionEvents,
ConnectionOptions,
create_container,
Connection as RheaConnection,
websocket_connect,
} from "rhea"
import { AmqpManagement, Management } from "./management.js"
import { EnvironmentParams } from "./environment.js"
import { AmqpPublisher, Publisher } from "./publisher.js"
Expand Down Expand Up @@ -98,9 +104,31 @@ export class AmqpConnection implements Connection {
}

function buildConnectParams(envParams: EnvironmentParams, connParams?: ConnectionParams): ConnectionOptions {
const reconnectParams = buildReconnectParams(connParams)
if (envParams.webSocket) {
const ws = websocket_connect(envParams.webSocket)
const connectionDetails = ws(
`ws://${envParams.username}:${envParams.password}@${envParams.host}:${envParams.port}`,
"amqp",
{}
)
return {
connection_details: () => {
return {
...connectionDetails(),
host: envParams.host,
port: envParams.port,
}
},
host: envParams.host,
port: envParams.port,
transport: "tcp",
}
}

return {
...envParams,
...buildReconnectParams(connParams),
...reconnectParams,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/environment.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { WebSocketImpl } from "rhea"
import { AmqpConnection, Connection, ConnectionParams } from "./connection.js"

export interface Environment {
Expand All @@ -10,6 +11,7 @@ export type EnvironmentParams = {
port: number
username: string
password: string
webSocket?: WebSocketImpl
}

export class AmqpEnvironment implements Environment {
Expand Down
18 changes: 18 additions & 0 deletions test/support/rhea_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ export async function openConnection(container: Container, params: ConnectionOpt
})
}

export async function openWebSocketConnection(container: Container, url: string): Promise<Connection> {
const ws = container.websocket_connect(WebSocket)
return new Promise((res, rej) => {
container.once(ConnectionEvents.connectionOpen, (context) => {
return res(context.connection)
})
container.once(ConnectionEvents.error, (context) => {
return rej(context.connection.error)
})
container.connect({
connection_details: () => ({ ...ws(url, ["amqp"], {}), host: "localhost", port: 5672 }),
transport: "tcp",
host: "localhost",
port: 5672,
})
})
}

export async function closeConnection(connection: Connection): Promise<void> {
return new Promise((res, rej) => {
connection.once(ConnectionEvents.connectionClose, () => {
Expand Down
10 changes: 9 additions & 1 deletion test/unit/rhea/connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { afterEach, beforeEach, describe, expect, test } from "vitest"
import { host, port, username, password, numberOfConnections, eventually } from "../../support/util.js"
import { Connection, Container, create_container } from "rhea"
import { closeConnection, openConnection, openManagement } from "../../support/rhea_utils.js"
import { closeConnection, openConnection, openManagement, openWebSocketConnection } from "../../support/rhea_utils.js"

describe("Rhea connections", () => {
let container: Container
Expand All @@ -28,6 +28,14 @@ describe("Rhea connections", () => {
})
})

test.skip("create a connection through a websocket", async () => {
connection = await openWebSocketConnection(container, `ws://${username}:${password}@${host}:${port}`)

await eventually(async () => {
expect(await numberOfConnections()).to.eql(1)
})
})

test("connect to the management", async () => {
connection = await openConnection(container, {
host,
Expand Down