Skip to content

Commit 97d808d

Browse files
Merge pull request #62 from RethinkRobotics-opensource/subShutdown
Subscriber shutdown bug
2 parents c803478 + d27ec65 commit 97d808d

File tree

3 files changed

+80
-23
lines changed

3 files changed

+80
-23
lines changed

src/lib/impl/PublisherImpl.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ class PublisherImpl extends EventEmitter {
154154
Object.keys(this._subClients).forEach((clientId) => {
155155
const client = this._subClients[clientId];
156156
client.end();
157-
client.destroy();
158157
});
159158

160159
// disconnect from the spinner in case we have any pending callbacks
@@ -274,19 +273,21 @@ class PublisherImpl extends EventEmitter {
274273
}
275274

276275
subscriber.on('close', () => {
277-
this._log.info('Publisher %s client %s disconnected!',
278-
this.getTopic(), subscriber.name);
276+
this._log.info('Publisher client socket %s on topic %s disconnected',
277+
subscriber.name, this.getTopic());
279278
subscriber.removeAllListeners();
280279
delete this._subClients[subscriber.name];
281280
this.emit('disconnect');
282281
});
283282

284283
subscriber.on('end', () => {
285-
this._log.info('Sub %s sent END', subscriber.name);
284+
this._log.info('Publisher client socket %s on topic %s ended the connection',
285+
subscriber.name, this.getTopic());
286286
});
287287

288-
subscriber.on('error', () => {
289-
this._log.warn('Sub %s had error', subscriber.name);
288+
subscriber.on('error', (err) => {
289+
this._log.warn('Publisher client socket %s on topic %s had error: %s',
290+
subscriber.name, this.getTopic(), err);
290291
});
291292

292293
// if we've cached a message from latching, send it now

src/lib/impl/SubscriberImpl.js

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class SubscriberImpl extends EventEmitter {
7878

7979
this._pubClients = {};
8080

81+
this._pendingPubClients = {};
82+
8183
this._state = REGISTERING;
8284
this._register();
8385
}
@@ -131,10 +133,12 @@ class SubscriberImpl extends EventEmitter {
131133
this._log.debug('Shutting down subscriber %s', this.getTopic());
132134

133135
Object.keys(this._pubClients).forEach(this._disconnectClient.bind(this));
136+
Object.keys(this._pendingPubClients).forEach(this._disconnectClient.bind(this));
134137

135138
// disconnect from the spinner in case we have any pending callbacks
136139
this._nodeHandle.getSpinner().disconnect(this._getSpinnerId());
137140
this._pubClients = {};
141+
this._pendingPubClients = {};
138142
}
139143

140144
/**
@@ -210,11 +214,16 @@ class SubscriberImpl extends EventEmitter {
210214
* @param clientId {string}
211215
*/
212216
_disconnectClient(clientId) {
213-
const client = this._pubClients[clientId];
217+
let client = this._pubClients[clientId];
218+
219+
const hasValidatedClient = !!client;
220+
if (!hasValidatedClient) {
221+
client = this._pendingPubClients[clientId];
222+
}
223+
214224
if (client) {
215-
this._log.debug('Disconnecting client %s', clientId);
225+
this._log.debug('Subscriber %s disconnecting client %s', this.getTopic(), clientId);
216226
client.end();
217-
client.destroy();
218227

219228
client.removeAllListeners();
220229
client.$deserializer.removeAllListeners();
@@ -223,11 +232,13 @@ class SubscriberImpl extends EventEmitter {
223232
client.unpipe(client.$deserializer);
224233

225234
delete client.$deserializer;
226-
delete client.$boundMessageHandler;
227235

228236
delete this._pubClients[clientId];
237+
delete this._pendingPubClients[clientId];
229238

230-
this.emit('disconnect');
239+
if (hasValidatedClient) {
240+
this.emit('disconnect');
241+
}
231242
}
232243
}
233244

@@ -285,11 +296,20 @@ class SubscriberImpl extends EventEmitter {
285296
client.nodeUri = nodeUri;
286297

287298
client.on('end', () => {
288-
this._log.info('Pub %s sent END', client.name, this.getTopic());
299+
this._log.info('Subscriber client socket %s on topic %s ended the connection',
300+
client.name, this.getTopic());
289301
});
290302

291-
client.on('error', () => {
292-
this._log.warn('Pub %s error on topic %s', client.name, this.getTopic());
303+
client.on('error', (err) => {
304+
this._log.warn('Subscriber client socket %s on topic %s had error: %s',
305+
client.name, this.getTopic(), err);
306+
});
307+
308+
// hook into close event to clean things up
309+
client.on('close', () => {
310+
this._log.info('Subscriber client socket %s on topic %s disconnected',
311+
client.name, this.getTopic());
312+
this._disconnectClient(client.nodeUri);
293313
});
294314

295315
// open the socket at the provided address, port
@@ -308,6 +328,11 @@ class SubscriberImpl extends EventEmitter {
308328
client.$deserializer = deserializer;
309329
client.pipe(deserializer);
310330

331+
// cache client in "pending" map.
332+
// It's not validated yet so we don't want it to show up as a client.
333+
// Need to keep track of it in case we're shutdown before it can be validated.
334+
this._pendingPubClients[client.nodeUri] = client;
335+
311336
// create a one-time handler for the connection header
312337
// if the connection is validated, we'll listen for more events
313338
deserializer.once('message', this._handleConnectionHeader.bind(this, client));
@@ -329,6 +354,11 @@ class SubscriberImpl extends EventEmitter {
329354
* @param msg {string} message received from the publisher
330355
*/
331356
_handleConnectionHeader(client, msg) {
357+
if (this.isShutdown()) {
358+
this._disconnectClient(client.nodeUri);
359+
return;
360+
}
361+
332362
let header = TcprosUtils.parseTcpRosHeader(msg);
333363
// check if the publisher had a problem with our connection header
334364
if (header.error) {
@@ -349,18 +379,13 @@ class SubscriberImpl extends EventEmitter {
349379

350380
// cache client now that we've verified the connection header
351381
this._pubClients[client.nodeUri] = client;
382+
// remove client from pending map now that it's validated
383+
delete this._pendingPubClients[client.nodeUri];
352384

353385
// pipe all future messages to _handleMessage
354386
client.$deserializer.on('message', this._handleMessage.bind(this));
355387

356388
this.emit('connection', header, client.name);
357-
358-
// hook into close event to clean things up
359-
client.on('close', () => {
360-
this._log.warn('Pub %s closed on topic %s', client.name, this.getTopic());
361-
this._log.warn('Subscriber ' + this.getTopic() + ' client ' + client.name + ' disconnected!');
362-
this._disconnectClient(client.nodeUri);
363-
});
364389
}
365390

366391
/**

test/xmlrpcTest.js

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ describe('Protocol Test', () => {
566566
SOCKET_END_CACHED.call(this);
567567
};
568568

569-
sub._handleTopicRequestResponse([1, 'ok', ['TCPROS', 'junk_address', 1234]], 'http://junk_address:1234');
569+
subImpl._handleTopicRequestResponse([1, 'ok', ['TCPROS', 'junk_address', 1234]], 'http://junk_address:1234');
570570
sub.shutdown();
571571
});
572572
});
@@ -594,7 +594,7 @@ describe('Protocol Test', () => {
594594
});
595595
let pub = nh.advertise(topic, msgType);
596596

597-
pub.on('connected', () => {
597+
pub.on('connection', () => {
598598
pub.publish({data: 1});
599599
pub.shutdown();
600600
});
@@ -603,6 +603,37 @@ describe('Protocol Test', () => {
603603
setTimeout(done, 500);
604604
});
605605

606+
it('Shutdown Subscriber With Pending Publisher Client', function(done) {
607+
this.slow(1600);
608+
const nh = rosnodejs.nh;
609+
const sub = nh.subscribe(topic, msgType, () => {
610+
throwNext('Subscriber should never have gotten messages!');
611+
});
612+
let pub = nh.advertise(topic, msgType);
613+
614+
// when publisher emits 'connection', it has validated
615+
// the subscriber's connection header and sent a response back
616+
// the subscriber will not have validated the publisher's connection
617+
// header though so it should have a pending publisher entry.
618+
pub.on('connection', () => {
619+
const impl = sub._impl;
620+
621+
expect(Object.keys(impl._pendingPubClients)).to.have.lengthOf(1);
622+
expect(impl._pubClients).to.be.empty;
623+
624+
sub.shutdown();
625+
626+
expect(impl._pendingPubClients).to.be.empty;
627+
expect(impl._pubClients).to.be.empty;
628+
629+
setTimeout(() => {
630+
expect(impl._pendingPubClients).to.be.empty;
631+
expect(impl._pubClients).to.be.empty;
632+
done();
633+
}, 500);
634+
});
635+
});
636+
606637
it('2 Publishers on Same Topic', function(done) {
607638
this.slow(2000);
608639
const nh = rosnodejs.nh;

0 commit comments

Comments
 (0)