Skip to content

Commit 0441145

Browse files
committed
fix: tests
1 parent dd3ea04 commit 0441145

7 files changed

+87
-405
lines changed

src/UDPClusterManager.ts

Lines changed: 34 additions & 188 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import { ClusterManager, ICluster } from './ClusterManager';
2525
import { Worker } from 'worker_threads';
2626
import * as path from 'path';
27-
import { createSocket } from 'dgram';
28-
import { networkInterfaces } from 'os';
2927

3028
process.setMaxListeners(10000);
3129

@@ -71,40 +69,11 @@ export const DEFAULT_UDP_CLUSTER_MANAGER_OPTIONS: UDPClusterManagerOptions = {
7169

7270
export class UDPClusterManager extends ClusterManager {
7371
private static workers: Record<string, Worker> = {};
74-
// Map of active sockets keyed by `${address}:${port}` for cleanup
7572
public static sockets: Record<string, any> = {};
7673
private readonly options: UDPClusterManagerOptions;
7774
private workerKey: string;
7875
private worker: Worker;
7976

80-
// Selects a network interface address matching the broadcast prefix; falls back to 0.0.0.0
81-
public static selectNetworkInterface(options: any = {}): string {
82-
const interfaces = networkInterfaces();
83-
const broadcastAddress = options.broadcastAddress || options.address;
84-
const limited = options.limitedBroadcastAddress || options.limitedAddress;
85-
const defaultAddress = '0.0.0.0';
86-
87-
if (!broadcastAddress || broadcastAddress === limited) {
88-
return defaultAddress;
89-
}
90-
91-
for (const key in interfaces) {
92-
if (!interfaces[key]) {
93-
continue;
94-
}
95-
for (const net of interfaces[key]!) {
96-
const shouldBeSelected = net.family === 'IPv4'
97-
&& typeof net.address === 'string'
98-
&& net.address.startsWith(String(broadcastAddress).replace(/\.255/g, ''));
99-
if (shouldBeSelected) {
100-
return net.address as string;
101-
}
102-
}
103-
}
104-
105-
return defaultAddress;
106-
}
107-
10877
constructor(options?: Partial<UDPClusterManagerOptions>) {
10978
super();
11079

@@ -122,185 +91,63 @@ export class UDPClusterManager extends ClusterManager {
12291

12392
private static async free(): Promise<void> {
12493
const workerKeys = Object.keys(UDPClusterManager.workers);
125-
const socketKeys = Object.keys(UDPClusterManager.sockets || {});
126-
127-
await Promise.all([
128-
...workerKeys.map(
129-
workerKey => UDPClusterManager.destroyWorker(
130-
workerKey,
131-
UDPClusterManager.workers[workerKey],
132-
)),
133-
...socketKeys.map(
134-
socketKey => UDPClusterManager.destroySocket(
135-
socketKey,
136-
UDPClusterManager.sockets[socketKey],
137-
)),
138-
]);
139-
140-
// clear sockets map
141-
for (const key of socketKeys) {
142-
delete UDPClusterManager.sockets[key];
143-
}
94+
95+
await Promise.all(workerKeys.map(
96+
workerKey => UDPClusterManager.destroyWorker(
97+
workerKey,
98+
UDPClusterManager.workers[workerKey],
99+
)),
100+
);
144101
}
145102

146103
private startWorkerListener(): void {
147104
this.workerKey = `${ this.options.address }:${ this.options.port }`;
148105

149106
if (UDPClusterManager.workers[this.workerKey]) {
150107
this.worker = UDPClusterManager.workers[this.workerKey];
151-
} else {
152-
this.worker = new Worker(path.join(__dirname, './UDPWorker.js'), {
153-
workerData: this.options,
154-
});
155-
this.worker.on('message', message => {
156-
const [className, method] = message.type?.split(':');
157-
158-
if (className !== 'cluster') {
159-
return;
160-
}
161-
162-
return this.anyCluster(cluster => {
163-
const clusterMethod = cluster[method as keyof ICluster];
164-
165-
if (!clusterMethod) {
166-
return;
167-
}
168-
169-
clusterMethod(message.server);
170-
});
171-
});
172-
173-
UDPClusterManager.workers[this.workerKey] = this.worker;
174-
}
175-
176-
// Legacy in-process UDP listener for unit tests
177-
{
178-
let socket: any = UDPClusterManager.sockets[this.workerKey];
179-
if (!socket) {
180-
socket = createSocket({ type: 'udp4', reuseAddr: true, reusePort: true });
181-
const address = UDPClusterManager.selectNetworkInterface(this.options);
182-
UDPClusterManager.sockets[this.workerKey] = socket.bind(this.options.port, address);
183-
}
184-
185-
socket.on('message', (buffer: Buffer) => {
186-
try {
187-
const [name, id, type, addr = '', timeout = '0'] = buffer.toString().split('\t');
188-
const [host, port] = addr.split(':');
189-
const message = {
190-
id,
191-
name,
192-
type: String(type || '').toLowerCase(),
193-
host,
194-
port: parseInt(port, 10),
195-
timeout: parseFloat(timeout) * 1000,
196-
};
197-
UDPClusterManager.processMessageOnClusterForAll(this, message);
198-
} catch { /* ignore parse errors in tests */ }
199-
});
200-
}
201-
}
202108

203-
// Backwards-compatible helpers used by unit tests for branch coverage
204-
// Process a message across all initialized clusters
205-
public static async processMessageOnClusterForAll(self: UDPClusterManager, message: any): Promise<void> {
206-
await self.anyCluster(cluster => UDPClusterManager.processMessageOnCluster(cluster, message, self.options.aliveTimeoutCorrection));
207-
}
208-
209-
// Process a single message on the provided cluster instance
210-
public static processMessageOnCluster(cluster: any, message: any, aliveTimeoutCorrection = 0): void {
211-
if (!cluster || !message) {
212109
return;
213110
}
214111

215-
const type = String(message.type || '').toLowerCase();
216-
if (type === 'up') {
217-
const existing = typeof cluster.find === 'function'
218-
? cluster.find(message, true)
219-
: undefined;
220-
if (existing) {
221-
UDPClusterManager.serverAliveWait(cluster, existing, aliveTimeoutCorrection, message);
222-
} else if (typeof cluster.add === 'function') {
223-
cluster.add(message);
224-
}
225-
} else if (type === 'down') {
226-
if (typeof cluster.remove === 'function') {
227-
cluster.remove(message);
228-
}
229-
}
230-
}
112+
this.worker = new Worker(path.join(__dirname, './UDPWorker.js'), {
113+
workerData: this.options,
114+
});
115+
this.worker.on('message', message => {
116+
const [className, method] = message.type?.split(':');
231117

232-
// Starts a timer to verify that the server stays alive; returns early if timeout is non-positive
233-
public static serverAliveWait(cluster: any, server: any, aliveTimeoutCorrection = 0, message?: any): void {
234-
const baseTimeout = message && typeof message.timeout === 'number'
235-
? message.timeout
236-
: 0;
237-
const effective = baseTimeout + (aliveTimeoutCorrection ?? 0);
238-
if (effective <= 0) {
239-
return;
240-
}
118+
if (className !== 'cluster') {
119+
return;
120+
}
241121

242-
server.timer = setTimeout(() => {
243-
// On timer, if server still present, remove it
244-
try {
245-
const exists = typeof cluster?.find === 'function'
246-
? cluster.find(message || server, true)
247-
: server;
248-
if (exists && typeof cluster?.remove === 'function') {
122+
return this.anyCluster(cluster => {
123+
if (method === 'add') {
249124
try {
250-
const maybePromise = cluster.remove(message || server);
251-
if (maybePromise && typeof maybePromise.then === 'function') {
252-
maybePromise.catch(() => { /* swallow in tests */ });
125+
const existing = typeof (cluster as any).find === 'function'
126+
? (cluster as any).find(message.server, true)
127+
: undefined;
128+
if (existing) {
129+
return;
253130
}
254-
} catch { /* ignore sync errors */ }
131+
} catch {/* ignore */}
255132
}
256-
} catch { /* ignore in tests */ }
257-
}, effective);
258-
// Avoid keeping the event loop alive
259-
try {
260-
if (server.timer && typeof (server.timer as any).unref === 'function') {
261-
(server.timer as any).unref();
262-
}
263-
} catch {/* ignore */}
264-
}
265133

266-
// Parses a UDP broadcast message Buffer into a normalized object
267-
public static parseBroadcastedMessage(input: Buffer): any {
268-
const [
269-
name,
270-
id,
271-
type,
272-
address = '',
273-
timeout = '0',
274-
] = input.toString().split('\t');
275-
const [host, port] = address.split(':');
276-
return {
277-
id,
278-
name,
279-
type: String(type || '').toLowerCase(),
280-
host,
281-
port: parseInt(port, 10),
282-
timeout: parseFloat(timeout) * 1000,
283-
};
284-
}
134+
const clusterMethod = (cluster as any)[method as keyof ICluster];
285135

286-
// Backwards-compatible method used by tests to trigger listening
287-
public startListening(options: any = {}): void {
288-
this.listenBroadcastedMessages(options);
289-
}
136+
if (!clusterMethod) {
137+
return;
138+
}
139+
140+
clusterMethod(message.server);
141+
});
142+
});
290143

291-
// Placeholder for test spying; real listening is initialized in constructor
292-
public listenBroadcastedMessages(_options: any): void {
293-
// no-op: socket listeners are set up in startWorkerListener
144+
UDPClusterManager.workers[this.workerKey] = this.worker;
294145
}
295146

296147
public async destroy(): Promise<void> {
297148
await UDPClusterManager.destroyWorker(this.workerKey, this.worker);
298149
}
299150

300-
// Cleans up and destroys a given UDP socket reference if present.
301-
// - Removes all listeners (propagates error if removal throws)
302-
// - If socket has close(): waits for close callback, then unrefs and deletes from map
303-
// - If no close(): resolves immediately
304151
public static async destroySocket(key: string, socket?: any): Promise<void> {
305152
if (!socket) {
306153
return;
@@ -311,7 +158,6 @@ export class UDPClusterManager extends ClusterManager {
311158
socket.removeAllListeners();
312159
}
313160
} catch (e) {
314-
// Reject when removeAllListeners throws inside try-block
315161
throw e;
316162
}
317163

@@ -340,7 +186,7 @@ export class UDPClusterManager extends ClusterManager {
340186
return;
341187
}
342188

343-
return new Promise((resolve) => {
189+
return new Promise(resolve => {
344190
const timeout = setTimeout(() => {
345191
worker.terminate();
346192
resolve();
@@ -359,4 +205,4 @@ export class UDPClusterManager extends ClusterManager {
359205
});
360206
});
361207
}
362-
}
208+
}
Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,36 @@
11
/*!
2-
* UDPClusterManager.destroySocket() branch coverage tests
2+
* UDPClusterManager.destroyWorker() behavior tests aligned with implementation
33
*/
44
import './mocks';
55
import { expect } from 'chai';
66
import { UDPClusterManager } from '../src';
77

8-
describe('UDPClusterManager.destroySocket()', () => {
9-
it('should resolve when socket has no close() function', async () => {
10-
const destroy = (UDPClusterManager as any).destroySocket as Function;
11-
const fakeSocket: any = { /* no close, no removeAllListeners */ };
12-
13-
await destroy('0.0.0.0:63000', fakeSocket);
14-
});
15-
16-
it('should reject when removeAllListeners throws inside try-block', async () => {
17-
const destroy = (UDPClusterManager as any).destroySocket as Function;
18-
const fakeSocket: any = {
19-
removeAllListeners: () => { throw new Error('boom'); },
20-
close: (cb: Function) => cb && cb(),
21-
};
22-
23-
let thrown = null as any;
24-
try {
25-
await destroy('1.1.1.1:63000', fakeSocket);
26-
} catch (e) {
27-
thrown = e;
28-
}
29-
expect(thrown).to.be.instanceOf(Error);
30-
expect((thrown as Error).message).to.equal('boom');
8+
describe('UDPClusterManager.destroyWorker()', () => {
9+
it('should resolve when worker is undefined (no-op)', async () => {
10+
const destroy = (UDPClusterManager as any).destroyWorker as Function;
11+
await destroy('0.0.0.0:63000', undefined);
3112
});
3213

33-
it('should remove socket entry and unref after successful close()', async () => {
34-
const destroy = (UDPClusterManager as any).destroySocket as Function;
35-
const sockets = (UDPClusterManager as any).sockets as Record<string, any>;
36-
const key = '9.9.9.9:65000';
37-
38-
let unrefCalled = false;
39-
const fakeSocket: any = {
40-
removeAllListeners: () => {},
41-
close: (cb: Function) => cb && cb(),
42-
unref: () => { unrefCalled = true; },
14+
it('should terminate worker and remove it from the workers map', async () => {
15+
const destroy = (UDPClusterManager as any).destroyWorker as Function;
16+
const workers = (UDPClusterManager as any).workers as Record<string, any>;
17+
const key = '1.2.3.4:65000';
18+
19+
let terminated = false;
20+
const fakeWorker: any = {
21+
postMessage: () => {},
22+
once: (event: string, cb: Function) => {
23+
if (event === 'message') {
24+
setImmediate(() => cb({ type: 'stopped' }));
25+
}
26+
},
27+
terminate: () => { terminated = true; },
4328
};
4429

45-
sockets[key] = fakeSocket;
46-
await destroy(key, fakeSocket);
30+
workers[key] = fakeWorker;
31+
await destroy(key, fakeWorker);
4732

48-
expect(unrefCalled).to.equal(true);
49-
expect(sockets[key]).to.be.undefined;
33+
expect(terminated).to.equal(true);
34+
expect(workers[key]).to.be.undefined;
5035
});
5136
});

0 commit comments

Comments
 (0)