Skip to content

Commit 59be1ec

Browse files
authored
feat(app): support string urls and amqps protocol, as well as custom socket options (#83)
for #44
1 parent 626f9c3 commit 59be1ec

File tree

11 files changed

+228
-71
lines changed

11 files changed

+228
-71
lines changed

.github/workflows/e2e.yml

Lines changed: 0 additions & 41 deletions
This file was deleted.

.github/workflows/integration.yml

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
name: Integration Tests
2+
on: push
3+
4+
jobs:
5+
# Label of the container job
6+
integration-tests:
7+
strategy:
8+
matrix:
9+
sslmode: ["nossl", "ssl"]
10+
11+
# Containers must run in Linux based operating systems
12+
runs-on: ubuntu-latest
13+
14+
steps:
15+
#-----------------------------------------------------------
16+
# 1‑‑‑‑ Check out your code & set up Node
17+
#-----------------------------------------------------------
18+
- name: Check out repository code
19+
uses: actions/checkout@v4
20+
21+
- name: Use Node.js 18
22+
uses: actions/setup-node@v3
23+
with:
24+
node-version: '18'
25+
26+
#-----------------------------------------------------------
27+
# 2‑‑‑‑ Generate a local CA + server cert / key
28+
#-----------------------------------------------------------
29+
- name: Generate self-signed certificates
30+
if: matrix.sslmode == 'ssl'
31+
run: |
32+
set -euo pipefail
33+
mkdir -p certs
34+
35+
# CA ───────────────────────────────────────────────────
36+
openssl req -x509 -newkey rsa:2048 -nodes \
37+
-keyout certs/ca.key \
38+
-out certs/ca.crt \
39+
-days 365 \
40+
-subj "/CN=Test-CA"
41+
42+
# Server key & CSR ────────────────────────────────────
43+
openssl req -newkey rsa:2048 -nodes \
44+
-keyout certs/server.key \
45+
-out certs/server.csr \
46+
-subj "/CN=localhost"
47+
48+
# Sign the server cert with the CA ────────────────────
49+
openssl x509 -req -in certs/server.csr \
50+
-CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial \
51+
-out certs/server.crt \
52+
-days 365
53+
54+
# For RabbitMQ the key file must be 0600
55+
chmod 644 certs/*
56+
chmod 755 certs
57+
58+
# list the certs
59+
echo $PWD
60+
ls -l certs
61+
62+
#-----------------------------------------------------------
63+
# 3‑‑‑‑ Write a minimal rabbitmq.conf that enables TLS only
64+
#-----------------------------------------------------------
65+
- name: Write rabbitmq.conf
66+
if: matrix.sslmode == 'ssl'
67+
run: |
68+
cat > rabbitmq.conf <<'EOF'
69+
## disable plain AMQP
70+
listeners.tcp = none
71+
72+
## TLS listener on 5671
73+
listeners.ssl.default = 5671
74+
75+
ssl_options.cacertfile = /etc/rabbitmq/certs/ca.crt
76+
ssl_options.certfile = /etc/rabbitmq/certs/server.crt
77+
ssl_options.keyfile = /etc/rabbitmq/certs/server.key
78+
ssl_options.verify = verify_none
79+
ssl_options.fail_if_no_peer_cert = false
80+
81+
## (optional) secure the management UI on 15671
82+
management.ssl.port = 15671
83+
management.ssl.cacertfile = /etc/rabbitmq/certs/ca.crt
84+
management.ssl.certfile = /etc/rabbitmq/certs/server.crt
85+
management.ssl.keyfile = /etc/rabbitmq/certs/server.key
86+
EOF
87+
88+
#-----------------------------------------------------------
89+
# 4a‑‑‑ Launch RabbitMQ with the certs + config bind‑mounted
90+
#-----------------------------------------------------------
91+
- name: Start RabbitMQ in SSL Mode (via Docker)
92+
if: matrix.sslmode == 'ssl'
93+
run: |
94+
docker run -d --name rabbitmq \
95+
--user rabbitmq \
96+
-e RABBITMQ_ERLANG_COOKIE="your_secret_cookie" \
97+
-v "$PWD/certs":/etc/rabbitmq/certs:ro \
98+
-v "$PWD/rabbitmq.conf":/etc/rabbitmq/rabbitmq.conf:ro \
99+
-p 5671:5671 -p 15671:15671 \
100+
rabbitmq:3-management
101+
# cat /etc/rabbitmq/certs/server.key from inside the container
102+
docker exec rabbitmq ls -l /etc/rabbitmq/certs
103+
104+
#-----------------------------------------------------------
105+
# 4b‑‑‑ Launch RabbitMQ without certs (plain AMQP)
106+
#-----------------------------------------------------------
107+
- name: Start RabbitMQ in NoSSL Mode (via Docker)
108+
if: matrix.sslmode == 'nossl'
109+
run: |
110+
docker run -d --name rabbitmq \
111+
--user rabbitmq \
112+
-e RABBITMQ_ERLANG_COOKIE="your_secret_cookie" \
113+
-p 5672:5672 -p 15672:15672 \
114+
rabbitmq:3-management
115+
116+
#-----------------------------------------------------------
117+
# 5‑‑‑‑ Wait until the broker is healthy
118+
#-----------------------------------------------------------
119+
- name: Wait for RabbitMQ to be ready
120+
run: |
121+
set -e
122+
if [ "${{ matrix.sslmode }}" = "ssl" ]; then
123+
SCHEME="https"
124+
PORT=15671
125+
else
126+
SCHEME="http"
127+
PORT=15672
128+
fi
129+
# wait for RabbitMQ to be healthy
130+
for i in {1..5}; do
131+
if docker exec rabbitmq rabbitmq-diagnostics -q ping; then
132+
if curl -kI "${SCHEME}://localhost:${PORT}/api/auth"; then
133+
echo "RabbitMQ is ready"
134+
exit 0
135+
fi
136+
fi
137+
sleep 3
138+
done
139+
echo "RabbitMQ never became ready" && exit 1
140+
141+
#-----------------------------------------------------------
142+
# 6‑‑‑‑ Install dependencies & run the test suite
143+
#-----------------------------------------------------------
144+
- name: Install dependencies
145+
run: npm install
146+
147+
- name: Run the test suite
148+
env:
149+
RABBITMQ_HOST: localhost # inside the runner
150+
RABBITMQ_PORT: ${{ matrix.sslmode == 'ssl' && 5671 || 5672 }}
151+
RABBITMQ_MANAGEMENT_PORT: ${{ matrix.sslmode == 'ssl' && 15671 || 15672 }}
152+
RABBITMQ_SSL_MODE: ${{ matrix.sslmode == 'ssl' && 'true' || 'false' }}
153+
NODE_TLS_REJECT_UNAUTHORIZED: '0' # ignore the self‑signed CA (tests only!)
154+
run: npm run test:e2e
155+
156+
#-----------------------------------------------------------
157+
# 7‑‑‑‑ (optional) Show logs if the job failed
158+
#-----------------------------------------------------------
159+
- name: Print RabbitMQ logs on failure
160+
if: failure()
161+
run: docker logs rabbitmq
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Run Unit Tests
1+
name: Unit Tests
22

33
on:
44
push:

README.md

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
> A batteries‑included yet lightweight library for running background jobs and distributed task queues in Node.js & TypeScript, powered by RabbitMQ.
44
55
\[[![npm version](https://img.shields.io/npm/v/peanar.svg)](https://www.npmjs.com/package/peanar)]
6-
\[[![Integration Tests](https://github.com/martianboy/peanar/actions/workflows/e2e.yml/badge.svg)](https://github.com/martianboy/peanar/actions)]
7-
\[[![Unit Tests](https://github.com/martianboy/peanar/actions/workflows/unit-test.yml/badge.svg)](https://github.com/martianboy/peanar/actions)]
6+
\[[![Integration Tests](https://github.com/martianboy/peanar/actions/workflows/integration.yml/badge.svg)](https://github.com/martianboy/peanar/actions)]
7+
\[[![Unit Tests](https://github.com/martianboy/peanar/actions/workflows/unit.yml/badge.svg)](https://github.com/martianboy/peanar/actions)]
88

99
---
1010

@@ -171,12 +171,31 @@ Create an application instance.
171171
172172
| Option | Type | Default | Purpose | |
173173
| ------------ | ------------------------------------------------------------------------------- | ------------- | --------------------------------------- | ---------------------------- |
174-
| `connection` | [`IConnectionParams`](https://github.com/amqp-ts/amqp-ts#connection-parameters) | string | `amqp://localhost` | RabbitMQ connection settings |
174+
| `connection` | `IConnectionParams` | string | `amqp://localhost` | RabbitMQ connection settings |
175175
| `poolSize` | `number` | `2` | Channels kept in the internal pool | |
176176
| `prefetch` | `number` | `1` | Basic.qos prefetch for every consumer | |
177177
| `jobClass` | `typeof PeanarJob` | `PeanarJob` | Override the runtime job implementation | |
178178
| `logger` | `(...args: any[]) => void` | `console.log` | Inject custom logging | |
179179
180+
181+
##### `definition` (interface `IConnectionParams`)
182+
183+
| Field | Type | Description |
184+
| --------------- | ---------- | -------------------------------------------------- |
185+
| maxRetries | number | How many times to retry on connection failure. |
186+
| retryDelay | number | Wait before retrying a failed connection. |
187+
| protocol? | `'amqp'` OR `'amqps'` | SSL or not. |
188+
| host | string | Hostname or IP address of the RabbitMQ server. |
189+
| port | number | Port of the RabbitMQ server. |
190+
| username | string | Username for authentication. |
191+
| password | string | Password for authentication. |
192+
| locale | string | Locale for the connection. |
193+
| heartbeat | number | Heartbeat interval in seconds. |
194+
| keepAlive? | boolean | Enable TCP keepalive. |
195+
| keepAliveDelay? | number | Delay between keepalive probes. |
196+
| timeout? | number | Timeout for the connection in milliseconds. |
197+
| vhost | string | Virtual host to connect to. |
198+
180199
---
181200
182201
#### `app.job(definition) ⇒ enqueueFn`

examples/workers.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
import PeanarApp = require('../src');
1+
import PeanarApp from '../src';
22

33
function dummy() {
44
return new Promise((resolve, reject) => {
5-
setTimeout(resolve, 4000);
5+
setTimeout(() => {
6+
console.log('dummy job done');
7+
resolve('done');
8+
}, 4000);
69
});
710
}
811

src/broker.ts

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import {
1515
import Consumer from './consumer';
1616

1717
interface IBrokerOptions {
18-
connection?: IConnectionParams;
18+
connection?: string | IConnectionParams;
19+
socketOptions?: any;
1920
poolSize: number;
2021
prefetch?: number;
2122
}
@@ -47,15 +48,20 @@ export default class NodeAmqpBroker {
4748
try {
4849
const c = this.config || {};
4950

50-
const conn = (this.conn = await amqplib.connect({
51-
hostname: c.connection ? c.connection.host : 'localhost',
52-
port: c.connection ? c.connection.port : 5672,
53-
username: c.connection ? c.connection.username : 'guest',
54-
password: c.connection ? c.connection.password : 'guest',
55-
vhost: c.connection ? c.connection.vhost : '/'
56-
}));
51+
if (typeof c.connection === 'string') {
52+
this.conn = await amqplib.connect(c.connection, c.socketOptions);
53+
} else {
54+
this.conn = await amqplib.connect({
55+
protocol: c.connection?.protocol ?? 'amqp',
56+
hostname: c.connection?.host ?? 'localhost',
57+
port: c.connection?.port ?? 5672,
58+
username: c.connection?.username ?? 'guest',
59+
password: c.connection?.password ?? 'guest',
60+
vhost: c.connection?.vhost ?? '/'
61+
});
62+
}
5763

58-
return conn
64+
return this.conn
5965
} catch (ex: any) {
6066
if (ex.code === 'ECONNREFUSED') {
6167
await timeout(700 * retry);

src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ export interface IPeanarOptions {
115115
export interface IConnectionParams {
116116
maxRetries: number;
117117
retryDelay: number;
118+
protocol?: 'amqp' | 'amqps';
118119
host: string;
119120
port: number;
120121
username: string;

test/broker.test.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
import crypto from 'crypto';
21
import { setTimeout as timeout } from 'timers/promises';
32
import { once } from 'events';
43
import { rejects } from 'assert';
54

65
import sinon from 'sinon';
76
import { expect } from 'chai';
87
import amqplib, { ChannelModel } from 'amqplib';
9-
import { IMessage } from '../src/types';
8+
import { IConnectionParams, IMessage } from '../src/types';
109

1110
import { brokerOptions } from './config';
1211
import Broker from '../src/broker';
@@ -34,23 +33,30 @@ class TestBroker extends Broker {
3433
return this._channelConsumers;
3534
}
3635

36+
get connectionOptions(): IConnectionParams {
37+
if (typeof this.config?.connection !== 'object') {
38+
throw new Error('Connection options are not set');
39+
}
40+
return this.config?.connection;
41+
}
42+
3743
set port(port: number) {
38-
this.config!.connection!.port = port;
44+
this.connectionOptions.port = port;
3945
}
4046
set retryDelay(retryDelay: number) {
41-
this.config!.connection!.retryDelay = retryDelay;
47+
this.connectionOptions.retryDelay = retryDelay;
4248
}
4349
set maxRetries(maxRetries: number) {
44-
this.config!.connection!.maxRetries = maxRetries;
50+
this.connectionOptions.maxRetries = maxRetries;
4551
}
4652
set username(username: string) {
47-
this.config!.connection!.username = username;
53+
this.connectionOptions.username = username;
4854
}
4955
set password(password: string) {
50-
this.config!.connection!.password = password;
56+
this.connectionOptions.password = password;
5157
}
5258
set vhost(vhost: string) {
53-
this.config!.connection!.vhost = vhost;
59+
this.connectionOptions.vhost = vhost;
5460
}
5561
}
5662

test/config.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1+
import { IConnectionParams } from "../src/types";
2+
13
export const brokerOptions = {
24
connection: {
35
host: process.env.RABBITMQ_HOST ?? '127.0.0.1',
46
port: parseInt(process.env.RABBITMQ_PORT ?? '5672'),
7+
protocol: process.env.RABBITMQ_SSL_MODE === 'true' ? 'amqps' : 'amqp',
58
username: 'guest',
69
password: 'guest',
710
vhost: '/',
811
timeout: 10_000,
912
maxRetries: 10,
1013
retryDelay: 1000,
1114
locale: 'en-US'
12-
},
15+
} as IConnectionParams,
1316
poolSize: 1
1417
};

test/pool.test.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@ describe('Pool', () => {
1818
before(async () => {
1919
await createVhost(vhost);
2020
conn = await amqplib.connect({
21-
hostname: brokerOptions.connection.host,
22-
port: brokerOptions.connection.port,
23-
username: brokerOptions.connection.username,
24-
password: brokerOptions.connection.password,
21+
...brokerOptions.connection,
2522
vhost,
2623
});
2724

0 commit comments

Comments
 (0)