Skip to content

Commit aeee314

Browse files
authored
Merge pull request #946 from immobiliare/feat-flush-udp
fix: ensure udp messages are sent on close
2 parents 38a59cd + cfe1529 commit aeee314

File tree

3 files changed

+111
-17
lines changed

3 files changed

+111
-17
lines changed

src/socket.ts

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,34 @@ import net, { Socket as SocketTCP, isIP } from 'net';
33
import { URL } from 'url';
44
import buildLookupFunction from './dns-cache';
55
import { debuglog, DebugLoggerFunction } from 'util';
6-
export abstract class Socket {
6+
import EventEmitter, { once } from 'events';
7+
8+
/**
9+
* @emits idle The socket has no more pending messages. We use this event
10+
* to be sure the socket is not active anymore before closing it.
11+
*/
12+
export abstract class Socket extends EventEmitter {
713
protected hostname: string;
814
protected port: number;
915
protected connected: boolean;
1016
protected debug: typeof console.log;
1117
protected onError: (error: Error) => void;
18+
protected _pendingMessages: number;
1219

1320
protected constructor(
1421
url: URL,
1522
onError: (error: Error) => void = () => undefined,
1623
debug: DebugLoggerFunction = debuglog('dats')
1724
) {
25+
super();
1826
if (!url.port) {
1927
throw new Error('A port is required');
2028
}
2129
if (!url.hostname) {
2230
throw new Error('The hostname is required');
2331
}
2432

33+
this._pendingMessages = 0;
2534
this.onError = onError.bind(null);
2635
this.debug = debug;
2736
this.hostname = url.hostname;
@@ -33,6 +42,20 @@ export abstract class Socket {
3342
return this.connected;
3443
}
3544

45+
/**
46+
* Number of messages that are still being sent.
47+
*/
48+
get pendingMessages() {
49+
return this._pendingMessages;
50+
}
51+
/**
52+
* Whether the socket has pending messages or not.
53+
* Used to check if the socket is still active.
54+
*/
55+
get idle() {
56+
return this._pendingMessages === 0;
57+
}
58+
3659
abstract connect(): Promise<boolean>;
3760

3861
abstract send(data: string): void;
@@ -102,17 +125,34 @@ export class SocketTcp extends Socket {
102125

103126
send(data: string): void {
104127
if (!this.connected || !data) return;
105-
this.socket.write(data + '\n', (error) => error && this.onError(error));
128+
this._pendingMessages += 1;
129+
this.socket.write(data + '\n', (err) => {
130+
if (this._pendingMessages) {
131+
this._pendingMessages -= 1;
132+
}
133+
if (err) {
134+
try {
135+
this.onError(err);
136+
} catch (e) {
137+
this.debug('Exception on this.onError function', e);
138+
}
139+
}
140+
if (this._pendingMessages === 0) {
141+
this.emit('idle');
142+
}
143+
});
106144
}
107145

108-
close(): Promise<void> {
146+
async close(): Promise<void> {
109147
if (this.closing) return Promise.resolve();
148+
if (!this.idle) {
149+
await once(this, 'idle');
150+
}
110151
this.connected = false;
111152
this.closing = true;
112153
this.socket.removeListener('close', this.reconnectCb);
113154
this.socket.end();
114155
this.socket.destroy();
115-
return Promise.resolve();
116156
}
117157
}
118158

@@ -147,7 +187,6 @@ export class SocketUdp extends Socket {
147187
) {
148188
super(url, onError, debug);
149189
this.socket = null;
150-
151190
// Removed parenthesis if host name is ipv6 IP.
152191
if (this.hostname.startsWith('[') && this.hostname.endsWith(']')) {
153192
/* istanbul ignore next */
@@ -181,19 +220,31 @@ export class SocketUdp extends Socket {
181220

182221
send(data: string): void {
183222
if (!this.connected || !data) return;
184-
185-
return this.socket.send(
186-
data,
187-
this.port,
188-
this.hostname,
189-
(err) => err && this.onError(err)
190-
);
223+
this._pendingMessages += 1;
224+
this.socket.send(data, this.port, this.hostname, (err) => {
225+
if (this._pendingMessages) {
226+
this._pendingMessages -= 1;
227+
}
228+
if (err) {
229+
try {
230+
this.onError(err);
231+
} catch (e) {
232+
this.debug('Exception on this.onError function', e);
233+
}
234+
}
235+
if (this._pendingMessages === 0) {
236+
this.emit('idle');
237+
}
238+
});
191239
}
192240

193-
close(): Promise<void> {
194-
if (!this.connected) return Promise.resolve();
195-
return new Promise((res) => {
196-
this.socket.close(res);
241+
async close(): Promise<void> {
242+
if (!this.connected) return;
243+
if (!this.idle) {
244+
await once(this, 'idle');
245+
}
246+
await new Promise((res) => {
247+
this.socket.close(res as () => void);
197248
this.connected = false;
198249
});
199250
}

test/index.spec.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,28 @@ test('close with promise multiple times', async (t) => {
398398
t.pass();
399399
});
400400

401+
test('close with queued messages should wait', async (t) => {
402+
const host = new URL(`udp://127.0.0.1:${t.context.address.port}`);
403+
const namespace = 'ns1.${hostname}.${pid}';
404+
const client = new Client({ host, namespace });
405+
t.plan(6);
406+
client.connect();
407+
client.counter('some');
408+
client.counter('some');
409+
client.counter('some');
410+
client.counter('some');
411+
//@ts-expect-error Just for tests
412+
t.is(client.socket.pendingMessages, 4);
413+
//@ts-expect-error Just for tests
414+
const emit = sinon.spy(client.socket, 'emit');
415+
await t.notThrowsAsync(client.close() as Promise<void>);
416+
//@ts-expect-error Just for tests
417+
t.is(client.socket.pendingMessages, 0);
418+
t.notThrows(() => client.timing('time', 1));
419+
t.true(emit.calledOnceWith('idle'));
420+
t.pass();
421+
});
422+
401423
test('getSupportedTypes test', (t) => {
402424
const host = new URL(`udp://127.0.0.1:${t.context.address.port}`);
403425
const client = new Client({

test/tcp.spec.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import os from 'os';
21
import sinon from 'sinon';
32
import anyTest, { TestInterface } from 'ava';
43
import Client from '../src/index';
@@ -110,6 +109,28 @@ test('TCP does not send if connection is closed', async (t) => {
110109
const sendFun = sinon.spy((socket as any).socket, 'write');
111110
socket.send('something');
112111
t.true(sendFun.notCalled);
112+
t.is(socket.pendingMessages, 0);
113+
t.is(socket.idle, true);
114+
});
115+
116+
test('TCP close should wait', async (t) => {
117+
const host = new URL(
118+
`tcp://127.0.0.1:${(t.context.addressTcp as AddressInfo).port || 0}`
119+
);
120+
const socket = new SocketTcp(host);
121+
await socket.connect();
122+
socket.send('something');
123+
socket.send('something');
124+
socket.send('something');
125+
socket.send('something');
126+
t.is(socket.pendingMessages, 4);
127+
t.false(socket.idle);
128+
const emit = sinon.spy(socket, 'emit');
129+
await socket.close();
130+
socket.send('something');
131+
t.is(socket.pendingMessages, 0);
132+
t.is(socket.idle, true);
133+
t.true(emit.calledOnceWith('idle'));
113134
});
114135

115136
test('TCP _connect function doesnt create new connections if there is another connection active', async (t) => {

0 commit comments

Comments
 (0)