Skip to content

Commit 7311901

Browse files
authored
Fix Service advertisement race conditions (#919)
* fix advertising services race conditions - Implement operation queue: All advertise/unadvertise operations are now serialized through a promise queue to prevent race conditions - Allow safe re-advertisement: advertise() and advertiseAsync() automatically unadvertise before re-advertising when a service is already advertised - Make operations async: Both methods now return promises that resolve when the operation completes - Prevent double operations: Added internal state tracking to prevent duplicate unadvertise operations - Fix cleanup order: Event callbacks are removed before sending unadvertise messages to stop processing new requests immediately * passing tests, use event emitter for errors
1 parent e7abd23 commit 7311901

File tree

2 files changed

+249
-62
lines changed

2 files changed

+249
-62
lines changed

src/core/Service.js

Lines changed: 118 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ export default class Service extends EventEmitter {
1818
*/
1919
_serviceCallback = null;
2020
isAdvertised = false;
21+
/**
22+
* Queue for serializing advertise/unadvertise operations to prevent race conditions
23+
* @private
24+
*/
25+
_operationQueue = Promise.resolve();
26+
/**
27+
* Track if an unadvertise operation is pending to prevent double operations
28+
* @private
29+
*/
30+
_pendingUnadvertise = false;
2131
/**
2232
* @param {Object} options
2333
* @param {Ros} options.ros - The ROSLIB.Ros connection handle.
@@ -93,84 +103,134 @@ export default class Service extends EventEmitter {
93103
* @param {advertiseCallback} callback - This works similarly to the callback for a C++ service and should take the following params
94104
*/
95105
advertise(callback) {
96-
if (this.isAdvertised) {
97-
throw new Error('Cannot advertise the same Service twice!');
98-
}
106+
// Queue this operation to prevent race conditions
107+
this._operationQueue = this._operationQueue.then(async () => {
108+
// If already advertised, unadvertise first
109+
if (this.isAdvertised) {
110+
await this._doUnadvertise();
111+
}
99112

100-
// Store the new callback for removal during un-advertisement
101-
this._serviceCallback = (rosbridgeRequest) => {
102-
var response = {};
103-
var success = callback(rosbridgeRequest.args, response);
113+
// Store the new callback for removal during un-advertisement
114+
this._serviceCallback = (rosbridgeRequest) => {
115+
var response = {};
116+
var success = callback(rosbridgeRequest.args, response);
104117

105-
var call = {
106-
op: 'service_response',
107-
service: this.name,
108-
values: response,
109-
result: success
110-
};
118+
var call = {
119+
op: 'service_response',
120+
service: this.name,
121+
values: response,
122+
result: success
123+
};
111124

112-
if (rosbridgeRequest.id) {
113-
call.id = rosbridgeRequest.id;
114-
}
125+
if (rosbridgeRequest.id) {
126+
call.id = rosbridgeRequest.id;
127+
}
115128

116-
this.ros.callOnConnection(call);
117-
};
129+
this.ros.callOnConnection(call);
130+
};
118131

119-
this.ros.on(this.name, this._serviceCallback);
120-
this.ros.callOnConnection({
121-
op: 'advertise_service',
122-
type: this.serviceType,
123-
service: this.name
132+
this.ros.on(this.name, this._serviceCallback);
133+
this.ros.callOnConnection({
134+
op: 'advertise_service',
135+
type: this.serviceType,
136+
service: this.name
137+
});
138+
this.isAdvertised = true;
139+
}).catch(err => {
140+
this.emit('error', err);
141+
throw err;
124142
});
125-
this.isAdvertised = true;
143+
144+
return this._operationQueue;
126145
}
127146

128-
unadvertise() {
129-
if (!this.isAdvertised) {
130-
throw new Error(`Tried to un-advertise service ${this.name}, but it was not advertised!`);
147+
/**
148+
* Internal method to perform unadvertisement without queueing
149+
* @private
150+
*/
151+
async _doUnadvertise() {
152+
if (!this.isAdvertised || this._pendingUnadvertise) {
153+
return;
131154
}
132-
this.ros.callOnConnection({
133-
op: 'unadvertise_service',
134-
service: this.name
135-
});
136-
// Remove the registered callback
137-
if (this._serviceCallback) {
138-
this.ros.off(this.name, this._serviceCallback);
155+
156+
this._pendingUnadvertise = true;
157+
158+
try {
159+
// Mark as not advertised first to prevent new service calls
160+
// This ensures callService() will not be blocked while we're unadvertising
161+
this.isAdvertised = false;
162+
163+
// Remove the registered callback to stop processing new requests
164+
if (this._serviceCallback) {
165+
this.ros.off(this.name, this._serviceCallback);
166+
this._serviceCallback = null;
167+
}
168+
169+
// Send the unadvertise message to the server
170+
// Note: This is fire-and-forget, but the operation queue ensures
171+
// no new advertise can start until this completes
172+
this.ros.callOnConnection({
173+
op: 'unadvertise_service',
174+
service: this.name
175+
});
176+
} finally {
177+
this._pendingUnadvertise = false;
139178
}
140-
this.isAdvertised = false;
179+
}
180+
181+
unadvertise() {
182+
// Queue this operation to prevent race conditions
183+
this._operationQueue = this._operationQueue.then(async () => {
184+
await this._doUnadvertise();
185+
}).catch(err => {
186+
this.emit('error', err);
187+
throw err;
188+
});
189+
190+
return this._operationQueue;
141191
}
142192

143193
/**
144194
* An alternate form of Service advertisement that supports a modern Promise-based interface for use with async/await.
145195
* @param {(request: TRequest) => Promise<TResponse>} callback An asynchronous callback processing the request and returning a response.
146196
*/
147197
advertiseAsync(callback) {
148-
if (this.isAdvertised) {
149-
throw new Error('Cannot advertise the same Service twice!');
150-
}
151-
this._serviceCallback = async (rosbridgeRequest) => {
152-
/** @type {{op: string, service: string, values?: TResponse, result: boolean, id?: string}} */
153-
let rosbridgeResponse = {
154-
op: 'service_response',
155-
service: this.name,
156-
result: false
198+
// Queue this operation to prevent race conditions
199+
this._operationQueue = this._operationQueue.then(async () => {
200+
// If already advertised, unadvertise first
201+
if (this.isAdvertised) {
202+
await this._doUnadvertise();
157203
}
158-
try {
159-
rosbridgeResponse.values = await callback(rosbridgeRequest.args);
160-
rosbridgeResponse.result = true;
161-
} finally {
162-
if (rosbridgeRequest.id) {
163-
rosbridgeResponse.id = rosbridgeRequest.id;
204+
205+
this._serviceCallback = async (rosbridgeRequest) => {
206+
/** @type {{op: string, service: string, values?: TResponse, result: boolean, id?: string}} */
207+
let rosbridgeResponse = {
208+
op: 'service_response',
209+
service: this.name,
210+
result: false
211+
}
212+
try {
213+
rosbridgeResponse.values = await callback(rosbridgeRequest.args);
214+
rosbridgeResponse.result = true;
215+
} finally {
216+
if (rosbridgeRequest.id) {
217+
rosbridgeResponse.id = rosbridgeRequest.id;
218+
}
219+
this.ros.callOnConnection(rosbridgeResponse);
164220
}
165-
this.ros.callOnConnection(rosbridgeResponse);
166221
}
167-
}
168-
this.ros.on(this.name, this._serviceCallback);
169-
this.ros.callOnConnection({
170-
op: 'advertise_service',
171-
type: this.serviceType,
172-
service: this.name
222+
this.ros.on(this.name, this._serviceCallback);
223+
this.ros.callOnConnection({
224+
op: 'advertise_service',
225+
type: this.serviceType,
226+
service: this.name
227+
});
228+
this.isAdvertised = true;
229+
}).catch(err => {
230+
this.emit('error', err);
231+
throw err;
173232
});
174-
this.isAdvertised = true;
233+
234+
return this._operationQueue;
175235
}
176236
}

test/service.test.js

Lines changed: 131 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ describe('Service', () => {
55
const ros = new Ros({
66
url: 'ws://localhost:9090'
77
});
8+
9+
810
it('Successfully advertises a service with an async return', async () => {
911
const server = new Service({
1012
ros,
1113
serviceType: 'std_srvs/Trigger',
1214
name: '/test_service'
1315
});
14-
server.advertiseAsync(async () => {
16+
await server.advertiseAsync(async () => {
1517
return {
1618
success: true,
1719
message: 'foo'
@@ -26,7 +28,7 @@ describe('Service', () => {
2628
expect(response).toEqual({success: true, message: 'foo'});
2729
// Make sure un-advertisement actually disposes of the event handler
2830
expect(ros.listenerCount(server.name)).toEqual(1);
29-
server.unadvertise();
31+
await server.unadvertise();
3032
expect(ros.listenerCount(server.name)).toEqual(0);
3133
})
3234
it('Successfully advertises a service with a synchronous return', async () => {
@@ -35,7 +37,7 @@ describe('Service', () => {
3537
serviceType: 'std_srvs/Trigger',
3638
name: '/test_service'
3739
});
38-
server.advertise((request, response) => {
40+
await server.advertise((_request, response) => {
3941
response.success = true;
4042
response.message = 'bar';
4143
return true;
@@ -49,7 +51,132 @@ describe('Service', () => {
4951
expect(response).toEqual({success: true, message: 'bar'});
5052
// Make sure un-advertisement actually disposes of the event handler
5153
expect(ros.listenerCount(server.name)).toEqual(1);
52-
server.unadvertise();
54+
await server.unadvertise();
5355
expect(ros.listenerCount(server.name)).toEqual(0);
5456
})
57+
58+
it('Handles re-advertisement gracefully without throwing errors', async () => {
59+
const server = new Service({
60+
ros,
61+
serviceType: 'std_srvs/Trigger',
62+
name: '/test_readvertise'
63+
});
64+
65+
// First advertisement
66+
await server.advertise((_request, response) => {
67+
response.success = true;
68+
response.message = 'first';
69+
return true;
70+
});
71+
72+
expect(server.isAdvertised).toBe(true);
73+
expect(ros.listenerCount(server.name)).toEqual(1);
74+
75+
// Re-advertise with different callback - should not throw
76+
await server.advertise((_request, response) => {
77+
response.success = true;
78+
response.message = 'second';
79+
return true;
80+
});
81+
82+
expect(server.isAdvertised).toBe(true);
83+
expect(ros.listenerCount(server.name)).toEqual(1);
84+
85+
await server.unadvertise();
86+
expect(server.isAdvertised).toBe(false);
87+
expect(ros.listenerCount(server.name)).toEqual(0);
88+
})
89+
90+
it('Handles multiple unadvertise calls gracefully', async () => {
91+
const server = new Service({
92+
ros,
93+
serviceType: 'std_srvs/Trigger',
94+
name: '/test_multiple_unadvertise'
95+
});
96+
97+
await server.advertise((_request, response) => {
98+
response.success = true;
99+
return true;
100+
});
101+
102+
expect(server.isAdvertised).toBe(true);
103+
104+
// First unadvertise
105+
await server.unadvertise();
106+
expect(server.isAdvertised).toBe(false);
107+
108+
// Second unadvertise - should not throw
109+
await server.unadvertise();
110+
111+
expect(server.isAdvertised).toBe(false);
112+
})
113+
114+
it('Handles re-advertisement with advertiseAsync gracefully', async () => {
115+
const server = new Service({
116+
ros,
117+
serviceType: 'std_srvs/Trigger',
118+
name: '/test_readvertise_async'
119+
});
120+
121+
// First advertisement
122+
await server.advertiseAsync(async () => {
123+
return {
124+
success: true,
125+
message: 'first'
126+
}
127+
});
128+
129+
expect(server.isAdvertised).toBe(true);
130+
131+
// Re-advertise with different callback - should not throw
132+
await server.advertiseAsync(async () => {
133+
return {
134+
success: true,
135+
message: 'second'
136+
}
137+
});
138+
139+
expect(server.isAdvertised).toBe(true);
140+
141+
await server.unadvertise();
142+
expect(server.isAdvertised).toBe(false);
143+
})
144+
145+
it('Ensures operations are serialized through queue', async () => {
146+
const server = new Service({
147+
ros,
148+
serviceType: 'std_srvs/Trigger',
149+
name: '/test_queue'
150+
});
151+
152+
// Rapid advertise/unadvertise operations
153+
const operations = [
154+
server.advertise((_request, response) => {
155+
response.success = true;
156+
response.message = 'first';
157+
return true;
158+
}),
159+
server.unadvertise(),
160+
server.advertise((_request, response) => {
161+
response.success = true;
162+
response.message = 'second';
163+
return true;
164+
}),
165+
server.unadvertise(),
166+
server.advertise((_request, response) => {
167+
response.success = true;
168+
response.message = 'third';
169+
return true;
170+
})
171+
];
172+
173+
// All operations should complete without errors
174+
await Promise.all(operations);
175+
176+
// Final state should be advertised
177+
expect(server.isAdvertised).toBe(true);
178+
179+
await server.unadvertise();
180+
expect(server.isAdvertised).toBe(false);
181+
})
55182
})

0 commit comments

Comments
 (0)