Skip to content

Commit b535922

Browse files
l4mbymagne
andauthored
[IS-57/chore]: add web socket example (#59)
* wip: websocket tries * wip: add websocket example * chore: add websocket example * chore: revert gitignore * fix: remove ws package --------- Co-authored-by: magne <[email protected]>
1 parent 7e8a1a6 commit b535922

File tree

8 files changed

+144
-5
lines changed

8 files changed

+144
-5
lines changed

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
# Client examples
22

33
- [Basic Usage](./index.js) - Producer and Consumer example without reconnection
4+
- [WebSocket Example](./websocket_example.js) - Sample using node WebSocket connections

examples/package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
"main": "index.js",
66
"scripts": {
77
"start": "node index.js",
8-
"rebuild-source": "cd .. && npm run build && cd - && npm install --force"
8+
"rebuild-source": "cd .. && npm run build && cd - && npm install --force",
9+
"websocket-example": "node websocket_example.js"
910
},
1011
"author": "",
1112
"license": "ISC",

examples/websocket_example.js

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
const rabbit = require("rabbitmq-amqp-js-client")
2+
const { randomUUID } = require("crypto")
3+
4+
const rabbitUser = process.env.RABBITMQ_USER ?? "rabbit"
5+
const rabbitPassword = process.env.RABBITMQ_PASSWORD ?? "rabbit"
6+
const rabbitHost = process.env.RABBITMQ_HOSTNAME ?? "localhost"
7+
const rabbitPort = process.env.RABBITMQ_PORT ?? 5672
8+
9+
async function main() {
10+
const testExchange = `test-exchange-${randomUUID()}`
11+
const testQueue = `test-queue-${randomUUID()}`
12+
const routingKey = `test-key-${randomUUID()}`
13+
14+
console.log("Creating the environment...")
15+
const environment = rabbit.createEnvironment({
16+
host: rabbitHost,
17+
port: rabbitPort,
18+
username: rabbitUser,
19+
password: rabbitPassword,
20+
webSocket: WebSocket,
21+
})
22+
23+
console.log("Opening a connection...")
24+
const connection = await environment.createConnection()
25+
const management = connection.management()
26+
27+
console.log("Creating a queue and an exchange...")
28+
const queue = await management.declareQueue(testQueue)
29+
const exchange = await management.declareExchange(testExchange)
30+
31+
console.log("Binding exchange to queue...")
32+
await management.bind(routingKey, { source: exchange, destination: queue })
33+
34+
console.log("Opening a publisher and publishing 10 messages...")
35+
const publisher = await connection.createPublisher({ exchange: { name: testExchange, routingKey: routingKey } })
36+
for (const i of Array(10).keys()) {
37+
const publishResult = await publisher.publish(rabbit.createAmqpMessage({ body: `Hello - ${i} - ` }))
38+
switch (publishResult.outcome) {
39+
case rabbit.OutcomeState.ACCEPTED:
40+
console.log("Message Accepted")
41+
break
42+
case rabbit.OutcomeState.RELEASED:
43+
console.log("Message Released")
44+
break
45+
case rabbit.OutcomeState.REJECTED:
46+
console.log("Message Rejected")
47+
break
48+
default:
49+
break
50+
}
51+
}
52+
publisher.close()
53+
54+
console.log("Opening a consumer and consuming messages...")
55+
const consumer = await connection.createConsumer({
56+
queue: { name: testQueue },
57+
messageHandler: (context, msg) => {
58+
context.accept()
59+
console.log(`MessageId: ${msg.message_id}; Payload: ${msg.body}`)
60+
},
61+
})
62+
consumer.start()
63+
await sleep(5000)
64+
65+
console.log("Cleaning up...")
66+
consumer.close()
67+
await management.unbind(routingKey, { source: exchange, destination: queue })
68+
await management.deleteExchange(testExchange)
69+
await management.deleteQueue(testQueue)
70+
management.close()
71+
await connection.close()
72+
await environment.close()
73+
}
74+
75+
main()
76+
.then(() => console.log("done!"))
77+
.catch((res) => {
78+
console.log("ERROR ", res)
79+
process.exit(-1)
80+
})
81+
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))

src/connection.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { ConnectionEvents, ConnectionOptions, create_container, Connection as RheaConnection } from "rhea"
1+
import {
2+
ConnectionEvents,
3+
ConnectionOptions,
4+
create_container,
5+
Connection as RheaConnection,
6+
websocket_connect,
7+
} from "rhea"
28
import { AmqpManagement, Management } from "./management.js"
39
import { EnvironmentParams } from "./environment.js"
410
import { AmqpPublisher, Publisher } from "./publisher.js"
@@ -98,9 +104,31 @@ export class AmqpConnection implements Connection {
98104
}
99105

100106
function buildConnectParams(envParams: EnvironmentParams, connParams?: ConnectionParams): ConnectionOptions {
107+
const reconnectParams = buildReconnectParams(connParams)
108+
if (envParams.webSocket) {
109+
const ws = websocket_connect(envParams.webSocket)
110+
const connectionDetails = ws(
111+
`ws://${envParams.username}:${envParams.password}@${envParams.host}:${envParams.port}`,
112+
"amqp",
113+
{}
114+
)
115+
return {
116+
connection_details: () => {
117+
return {
118+
...connectionDetails(),
119+
host: envParams.host,
120+
port: envParams.port,
121+
}
122+
},
123+
host: envParams.host,
124+
port: envParams.port,
125+
transport: "tcp",
126+
}
127+
}
128+
101129
return {
102130
...envParams,
103-
...buildReconnectParams(connParams),
131+
...reconnectParams,
104132
}
105133
}
106134

src/environment.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { WebSocketImpl } from "rhea"
12
import { AmqpConnection, Connection, ConnectionParams } from "./connection.js"
23

34
export interface Environment {
@@ -10,6 +11,7 @@ export type EnvironmentParams = {
1011
port: number
1112
username: string
1213
password: string
14+
webSocket?: WebSocketImpl
1315
}
1416

1517
export class AmqpEnvironment implements Environment {

test/support/rhea_utils.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,24 @@ export async function openConnection(container: Container, params: ConnectionOpt
2626
})
2727
}
2828

29+
export async function openWebSocketConnection(container: Container, url: string): Promise<Connection> {
30+
const ws = container.websocket_connect(WebSocket)
31+
return new Promise((res, rej) => {
32+
container.once(ConnectionEvents.connectionOpen, (context) => {
33+
return res(context.connection)
34+
})
35+
container.once(ConnectionEvents.error, (context) => {
36+
return rej(context.connection.error)
37+
})
38+
container.connect({
39+
connection_details: () => ({ ...ws(url, ["amqp"], {}), host: "localhost", port: 5672 }),
40+
transport: "tcp",
41+
host: "localhost",
42+
port: 5672,
43+
})
44+
})
45+
}
46+
2947
export async function closeConnection(connection: Connection): Promise<void> {
3048
return new Promise((res, rej) => {
3149
connection.once(ConnectionEvents.connectionClose, () => {

test/unit/rhea/connection.test.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { afterEach, beforeEach, describe, expect, test } from "vitest"
22
import { host, port, username, password, numberOfConnections, eventually } from "../../support/util.js"
33
import { Connection, Container, create_container } from "rhea"
4-
import { closeConnection, openConnection, openManagement } from "../../support/rhea_utils.js"
4+
import { closeConnection, openConnection, openManagement, openWebSocketConnection } from "../../support/rhea_utils.js"
55

66
describe("Rhea connections", () => {
77
let container: Container
@@ -28,6 +28,14 @@ describe("Rhea connections", () => {
2828
})
2929
})
3030

31+
test.skip("create a connection through a websocket", async () => {
32+
connection = await openWebSocketConnection(container, `ws://${username}:${password}@${host}:${port}`)
33+
34+
await eventually(async () => {
35+
expect(await numberOfConnections()).to.eql(1)
36+
})
37+
})
38+
3139
test("connect to the management", async () => {
3240
connection = await openConnection(container, {
3341
host,

0 commit comments

Comments
 (0)