Skip to content

Commit d27ec65

Browse files
committed
Subscriber shutdown bug
Fixes an issue where publisher clients in the process of being validated will not be destroyed when the subscriber is shutdown.
1 parent c803478 commit d27ec65

File tree

3 files changed

+80
-23
lines changed

3 files changed

+80
-23
lines changed

src/lib/impl/PublisherImpl.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ class PublisherImpl extends EventEmitter {
154154
Object.keys(this._subClients).forEach((clientId) => {
155155
const client = this._subClients[clientId];
156156
client.end();
157-
client.destroy();
158157
});
159158

160159
// disconnect from the spinner in case we have any pending callbacks
@@ -274,19 +273,21 @@ class PublisherImpl extends EventEmitter {
274273
}
275274

276275
subscriber.on('close', () => {
277-
this._log.info('Publisher %s client %s disconnected!',
278-
this.getTopic(), subscriber.name);
276+
this._log.info('Publisher client socket %s on topic %s disconnected',
277+
subscriber.name, this.getTopic());
279278
subscriber.removeAllListeners();
280279
delete this._subClients[subscriber.name];
281280
this.emit('disconnect');
282281
});
283282

284283
subscriber.on('end', () => {
285-
this._log.info('Sub %s sent END', subscriber.name);
284+
this._log.info('Publisher client socket %s on topic %s ended the connection',
285+
subscriber.name, this.getTopic());
286286
});
287287

288-
subscriber.on('error', () => {
289-
this._log.warn('Sub %s had error', subscriber.name);
288+
subscriber.on('error', (err) => {
289+
this._log.warn('Publisher client socket %s on topic %s had error: %s',
290+
subscriber.name, this.getTopic(), err);
290291
});
291292

292293
// if we've cached a message from latching, send it now

src/lib/impl/SubscriberImpl.js

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class SubscriberImpl extends EventEmitter {
7878

7979
this._pubClients = {};
8080

81+
this._pendingPubClients = {};
82+
8183
this._state = REGISTERING;
8284
this._register();
8385
}
@@ -131,10 +133,12 @@ class SubscriberImpl extends EventEmitter {
131133
this._log.debug('Shutting down subscriber %s', this.getTopic());
132134

133135
Object.keys(this._pubClients).forEach(this._disconnectClient.bind(this));
136+
Object.keys(this._pendingPubClients).forEach(this._disconnectClient.bind(this));
134137

135138
// disconnect from the spinner in case we have any pending callbacks
136139
this._nodeHandle.getSpinner().disconnect(this._getSpinnerId());
137140
this._pubClients = {};
141+
this._pendingPubClients = {};
138142
}
139143

140144
/**
@@ -210,11 +214,16 @@ class SubscriberImpl extends EventEmitter {
210214
* @param clientId {string}
211215
*/
212216
_disconnectClient(clientId) {
213-
const client = this._pubClients[clientId];
217+
let client = this._pubClients[clientId];
218+
219+
const hasValidatedClient = !!client;
220+
if (!hasValidatedClient) {
221+
client = this._pendingPubClients[clientId];
222+
}
223+
214224
if (client) {
215-
this._log.debug('Disconnecting client %s', clientId);
225+
this._log.debug('Subscriber %s disconnecting client %s', this.getTopic(), clientId);
216226
client.end();
217-
client.destroy();
218227

219228
client.removeAllListeners();
220229
client.$deserializer.removeAllListeners();
@@ -223,11 +232,13 @@ class SubscriberImpl extends EventEmitter {
223232
client.unpipe(client.$deserializer);
224233

225234
delete client.$deserializer;
226-
delete client.$boundMessageHandler;
227235

228236
delete this._pubClients[clientId];
237+
delete this._pendingPubClients[clientId];
229238

230-
this.emit('disconnect');
239+
if (hasValidatedClient) {
240+
this.emit('disconnect');
241+
}
231242
}
232243
}
233244

@@ -285,11 +296,20 @@ class SubscriberImpl extends EventEmitter {
285296
client.nodeUri = nodeUri;
286297

287298
client.on('end', () => {
288-
this._log.info('Pub %s sent END', client.name, this.getTopic());
299+
this._log.info('Subscriber client socket %s on topic %s ended the connection',
300+
client.name, this.getTopic());
289301
});
290302

291-
client.on('error', () => {
292-
this._log.warn('Pub %s error on topic %s', client.name, this.getTopic());
303+
client.on('error', (err) => {
304+
this._log.warn('Subscriber client socket %s on topic %s had error: %s',
305+
client.name, this.getTopic(), err);
306+
});
307+
308+
// hook into close event to clean things up
309+
client.on('close', () => {
310+
this._log.info('Subscriber client socket %s on topic %s disconnected',
311+
client.name, this.getTopic());
312+
this._disconnectClient(client.nodeUri);
293313
});
294314

295315
// open the socket at the provided address, port
@@ -308,6 +328,11 @@ class SubscriberImpl extends EventEmitter {
308328
client.$deserializer = deserializer;
309329
client.pipe(deserializer);
310330

331+
// cache client in "pending" map.
332+
// It's not validated yet so we don't want it to show up as a client.
333+
// Need to keep track of it in case we're shutdown before it can be validated.
334+
this._pendingPubClients[client.nodeUri] = client;
335+
311336
// create a one-time handler for the connection header
312337
// if the connection is validated, we'll listen for more events
313338
deserializer.once('message', this._handleConnectionHeader.bind(this, client));
@@ -329,6 +354,11 @@ class SubscriberImpl extends EventEmitter {
329354
* @param msg {string} message received from the publisher
330355
*/
331356
_handleConnectionHeader(client, msg) {
357+
if (this.isShutdown()) {
358+
this._disconnectClient(client.nodeUri);
359+
return;
360+
}
361+
332362
let header = TcprosUtils.parseTcpRosHeader(msg);
333363
// check if the publisher had a problem with our connection header
334364
if (header.error) {
@@ -349,18 +379,13 @@ class SubscriberImpl extends EventEmitter {
349379

350380
// cache client now that we've verified the connection header
351381
this._pubClients[client.nodeUri] = client;
382+
// remove client from pending map now that it's validated
383+
delete this._pendingPubClients[client.nodeUri];
352384

353385
// pipe all future messages to _handleMessage
354386
client.$deserializer.on('message', this._handleMessage.bind(this));
355387

356388
this.emit('connection', header, client.name);
357-
358-
// hook into close event to clean things up
359-
client.on('close', () => {
360-
this._log.warn('Pub %s closed on topic %s', client.name, this.getTopic());
361-
this._log.warn('Subscriber ' + this.getTopic() + ' client ' + client.name + ' disconnected!');
362-
this._disconnectClient(client.nodeUri);
363-
});
364389
}
365390

366391
/**

test/xmlrpcTest.js

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ describe('Protocol Test', () => {
566566
SOCKET_END_CACHED.call(this);
567567
};
568568

569-
sub._handleTopicRequestResponse([1, 'ok', ['TCPROS', 'junk_address', 1234]], 'http://junk_address:1234');
569+
subImpl._handleTopicRequestResponse([1, 'ok', ['TCPROS', 'junk_address', 1234]], 'http://junk_address:1234');
570570
sub.shutdown();
571571
});
572572
});
@@ -594,7 +594,7 @@ describe('Protocol Test', () => {
594594
});
595595
let pub = nh.advertise(topic, msgType);
596596

597-
pub.on('connected', () => {
597+
pub.on('connection', () => {
598598
pub.publish({data: 1});
599599
pub.shutdown();
600600
});
@@ -603,6 +603,37 @@ describe('Protocol Test', () => {
603603
setTimeout(done, 500);
604604
});
605605

606+
it('Shutdown Subscriber With Pending Publisher Client', function(done) {
607+
this.slow(1600);
608+
const nh = rosnodejs.nh;
609+
const sub = nh.subscribe(topic, msgType, () => {
610+
throwNext('Subscriber should never have gotten messages!');
611+
});
612+
let pub = nh.advertise(topic, msgType);
613+
614+
// when publisher emits 'connection', it has validated
615+
// the subscriber's connection header and sent a response back
616+
// the subscriber will not have validated the publisher's connection
617+
// header though so it should have a pending publisher entry.
618+
pub.on('connection', () => {
619+
const impl = sub._impl;
620+
621+
expect(Object.keys(impl._pendingPubClients)).to.have.lengthOf(1);
622+
expect(impl._pubClients).to.be.empty;
623+
624+
sub.shutdown();
625+
626+
expect(impl._pendingPubClients).to.be.empty;
627+
expect(impl._pubClients).to.be.empty;
628+
629+
setTimeout(() => {
630+
expect(impl._pendingPubClients).to.be.empty;
631+
expect(impl._pubClients).to.be.empty;
632+
done();
633+
}, 500);
634+
});
635+
});
636+
606637
it('2 Publishers on Same Topic', function(done) {
607638
this.slow(2000);
608639
const nh = rosnodejs.nh;

0 commit comments

Comments
 (0)