Skip to content

Commit a541e69

Browse files
authored
fix: Merge pull request #25 from 6RiverSystems/feat/latch-subs
fix: latched subs
2 parents e74b949 + 7b5d08c commit a541e69

File tree

3 files changed

+60
-0
lines changed

3 files changed

+60
-0
lines changed

src/lib/RosNode.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,22 @@ class RosNode extends EventEmitter {
111111
subscribe(options, callback) {
112112
let topic = options.topic;
113113
let subImpl = this._subscribers[topic];
114+
let firstSubscriber = false;
114115
if (!subImpl) {
115116
subImpl = new SubscriberImpl(options, this);
116117
this._subscribers[topic] = subImpl;
118+
firstSubscriber = true;
117119
}
118120

119121
const sub = new Subscriber(subImpl);
120122
if (callback && typeof callback === 'function') {
121123
sub.on('message', callback);
122124
}
125+
// Duplicate subscribers wont get a latched 'message' emit from the existing subscriber impl.
126+
// This will forcibly send an emit if the topic is latched and has already seen an incoming message
127+
if (!firstSubscriber && subImpl.getLatching() && subImpl.getLastMessage()) {
128+
sub.emit('message', subImpl.getLastMessage())
129+
}
123130

124131
return sub;
125132
}

src/lib/impl/SubscriberImpl.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ class SubscriberImpl extends EventEmitter {
8585
this._pendingPubClients = {};
8686

8787
this._state = REGISTERING;
88+
89+
this._latching = undefined;
90+
91+
this._lastMessage = undefined;
92+
8893
this._register();
8994
}
9095

@@ -129,6 +134,23 @@ class SubscriberImpl extends EventEmitter {
129134
return this._nodeHandle;
130135
}
131136

137+
/**
138+
* Check if this subscriber is connected to a latched topic
139+
* @returns {boolean}
140+
*/
141+
getLatching() {
142+
return this._latching;
143+
}
144+
145+
/**
146+
* Return the last message received
147+
* @returns {any}
148+
*/
149+
getLastMessage() {
150+
return this._lastMessage;
151+
}
152+
153+
132154
/**
133155
* Clears and closes all client connections for this subscriber.
134156
*/
@@ -389,6 +411,10 @@ class SubscriberImpl extends EventEmitter {
389411
// remove client from pending map now that it's validated
390412
delete this._pendingPubClients[client.nodeUri];
391413

414+
if (header.latching) {
415+
this._latching = true;
416+
}
417+
392418
// pipe all future messages to _handleMessage
393419
client.$deserializer.on('message', this._handleMessage.bind(this));
394420

@@ -416,6 +442,9 @@ class SubscriberImpl extends EventEmitter {
416442
_handleMsgQueue(msgQueue) {
417443
try {
418444
msgQueue.forEach((msg) => {
445+
if (this.getLatching()) {
446+
this._lastMessage = this._messageHandler.deserialize(msg);
447+
}
419448
this.emit('message', this._messageHandler.deserialize(msg));
420449
});
421450
}

test/xmlrpcTest.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,30 @@ describe('Protocol Test', () => {
739739
});
740740
});
741741
});
742+
743+
it('2 Subscribers on Same Latched Topic subscribing at different times ', function(done) {
744+
this.slow(1000);
745+
const nh = rosnodejs.nh;
746+
747+
let msg1;
748+
const sub1 = nh.subscribe(topic, msgType, (msg) => {
749+
msg1 = msg.data;
750+
let msg2;
751+
const sub2 = nh.subscribe(topic, msgType, (msg) => {
752+
expect(sub1._impl.listenerCount('connection')).to.equal(2);
753+
msg2 = msg.data;
754+
expect(msg1).to.equal(1);
755+
expect(msg1).to.equal(msg2);
756+
done()
757+
});
758+
});
759+
760+
expect(sub1._impl.listenerCount('connection')).to.equal(1);
761+
762+
const pub = nh.advertise(topic, msgType, {latching: true});
763+
764+
pub.publish({data: 1});
765+
});
742766
});
743767

744768
describe('Service', () => {

0 commit comments

Comments
 (0)