diff --git a/src/core/Service.js b/src/core/Service.js index 9d9984468..e014f8fe4 100644 --- a/src/core/Service.js +++ b/src/core/Service.js @@ -18,6 +18,16 @@ export default class Service extends EventEmitter { */ _serviceCallback = null; isAdvertised = false; + /** + * Queue for serializing advertise/unadvertise operations to prevent race conditions + * @private + */ + _operationQueue = Promise.resolve(); + /** + * Track if an unadvertise operation is pending to prevent double operations + * @private + */ + _pendingUnadvertise = false; /** * @param {Object} options * @param {Ros} options.ros - The ROSLIB.Ros connection handle. @@ -93,51 +103,91 @@ export default class Service extends EventEmitter { * @param {advertiseCallback} callback - This works similarly to the callback for a C++ service and should take the following params */ advertise(callback) { - if (this.isAdvertised) { - throw new Error('Cannot advertise the same Service twice!'); - } + // Queue this operation to prevent race conditions + this._operationQueue = this._operationQueue.then(async () => { + // If already advertised, unadvertise first + if (this.isAdvertised) { + await this._doUnadvertise(); + } - // Store the new callback for removal during un-advertisement - this._serviceCallback = (rosbridgeRequest) => { - var response = {}; - var success = callback(rosbridgeRequest.args, response); + // Store the new callback for removal during un-advertisement + this._serviceCallback = (rosbridgeRequest) => { + var response = {}; + var success = callback(rosbridgeRequest.args, response); - var call = { - op: 'service_response', - service: this.name, - values: response, - result: success - }; + var call = { + op: 'service_response', + service: this.name, + values: response, + result: success + }; - if (rosbridgeRequest.id) { - call.id = rosbridgeRequest.id; - } + if (rosbridgeRequest.id) { + call.id = rosbridgeRequest.id; + } - this.ros.callOnConnection(call); - }; + this.ros.callOnConnection(call); + }; - this.ros.on(this.name, this._serviceCallback); - this.ros.callOnConnection({ - op: 'advertise_service', - type: this.serviceType, - service: this.name + this.ros.on(this.name, this._serviceCallback); + this.ros.callOnConnection({ + op: 'advertise_service', + type: this.serviceType, + service: this.name + }); + this.isAdvertised = true; + }).catch(err => { + this.emit('error', err); + throw err; }); - this.isAdvertised = true; + + return this._operationQueue; } - unadvertise() { - if (!this.isAdvertised) { - throw new Error(`Tried to un-advertise service ${this.name}, but it was not advertised!`); + /** + * Internal method to perform unadvertisement without queueing + * @private + */ + async _doUnadvertise() { + if (!this.isAdvertised || this._pendingUnadvertise) { + return; } - this.ros.callOnConnection({ - op: 'unadvertise_service', - service: this.name - }); - // Remove the registered callback - if (this._serviceCallback) { - this.ros.off(this.name, this._serviceCallback); + + this._pendingUnadvertise = true; + + try { + // Mark as not advertised first to prevent new service calls + // This ensures callService() will not be blocked while we're unadvertising + this.isAdvertised = false; + + // Remove the registered callback to stop processing new requests + if (this._serviceCallback) { + this.ros.off(this.name, this._serviceCallback); + this._serviceCallback = null; + } + + // Send the unadvertise message to the server + // Note: This is fire-and-forget, but the operation queue ensures + // no new advertise can start until this completes + this.ros.callOnConnection({ + op: 'unadvertise_service', + service: this.name + }); + } finally { + this._pendingUnadvertise = false; } - this.isAdvertised = false; + } + + unadvertise() { + // Queue this operation to prevent race conditions + this._operationQueue = this._operationQueue.then(async () => { + await this._doUnadvertise(); + }).catch(err => { + this.emit('error', err); + throw err; + }); + + return this._operationQueue; } /** @@ -145,32 +195,42 @@ export default class Service extends EventEmitter { * @param {(request: TRequest) => Promise} callback An asynchronous callback processing the request and returning a response. */ advertiseAsync(callback) { - if (this.isAdvertised) { - throw new Error('Cannot advertise the same Service twice!'); - } - this._serviceCallback = async (rosbridgeRequest) => { - /** @type {{op: string, service: string, values?: TResponse, result: boolean, id?: string}} */ - let rosbridgeResponse = { - op: 'service_response', - service: this.name, - result: false + // Queue this operation to prevent race conditions + this._operationQueue = this._operationQueue.then(async () => { + // If already advertised, unadvertise first + if (this.isAdvertised) { + await this._doUnadvertise(); } - try { - rosbridgeResponse.values = await callback(rosbridgeRequest.args); - rosbridgeResponse.result = true; - } finally { - if (rosbridgeRequest.id) { - rosbridgeResponse.id = rosbridgeRequest.id; + + this._serviceCallback = async (rosbridgeRequest) => { + /** @type {{op: string, service: string, values?: TResponse, result: boolean, id?: string}} */ + let rosbridgeResponse = { + op: 'service_response', + service: this.name, + result: false + } + try { + rosbridgeResponse.values = await callback(rosbridgeRequest.args); + rosbridgeResponse.result = true; + } finally { + if (rosbridgeRequest.id) { + rosbridgeResponse.id = rosbridgeRequest.id; + } + this.ros.callOnConnection(rosbridgeResponse); } - this.ros.callOnConnection(rosbridgeResponse); } - } - this.ros.on(this.name, this._serviceCallback); - this.ros.callOnConnection({ - op: 'advertise_service', - type: this.serviceType, - service: this.name + this.ros.on(this.name, this._serviceCallback); + this.ros.callOnConnection({ + op: 'advertise_service', + type: this.serviceType, + service: this.name + }); + this.isAdvertised = true; + }).catch(err => { + this.emit('error', err); + throw err; }); - this.isAdvertised = true; + + return this._operationQueue; } } diff --git a/test/service.test.js b/test/service.test.js index 645d5d0be..db5504a1a 100644 --- a/test/service.test.js +++ b/test/service.test.js @@ -5,13 +5,15 @@ describe('Service', () => { const ros = new Ros({ url: 'ws://localhost:9090' }); + + it('Successfully advertises a service with an async return', async () => { const server = new Service({ ros, serviceType: 'std_srvs/Trigger', name: '/test_service' }); - server.advertiseAsync(async () => { + await server.advertiseAsync(async () => { return { success: true, message: 'foo' @@ -26,7 +28,7 @@ describe('Service', () => { expect(response).toEqual({success: true, message: 'foo'}); // Make sure un-advertisement actually disposes of the event handler expect(ros.listenerCount(server.name)).toEqual(1); - server.unadvertise(); + await server.unadvertise(); expect(ros.listenerCount(server.name)).toEqual(0); }) it('Successfully advertises a service with a synchronous return', async () => { @@ -35,7 +37,7 @@ describe('Service', () => { serviceType: 'std_srvs/Trigger', name: '/test_service' }); - server.advertise((request, response) => { + await server.advertise((_request, response) => { response.success = true; response.message = 'bar'; return true; @@ -49,7 +51,132 @@ describe('Service', () => { expect(response).toEqual({success: true, message: 'bar'}); // Make sure un-advertisement actually disposes of the event handler expect(ros.listenerCount(server.name)).toEqual(1); - server.unadvertise(); + await server.unadvertise(); expect(ros.listenerCount(server.name)).toEqual(0); }) + + it('Handles re-advertisement gracefully without throwing errors', async () => { + const server = new Service({ + ros, + serviceType: 'std_srvs/Trigger', + name: '/test_readvertise' + }); + + // First advertisement + await server.advertise((_request, response) => { + response.success = true; + response.message = 'first'; + return true; + }); + + expect(server.isAdvertised).toBe(true); + expect(ros.listenerCount(server.name)).toEqual(1); + + // Re-advertise with different callback - should not throw + await server.advertise((_request, response) => { + response.success = true; + response.message = 'second'; + return true; + }); + + expect(server.isAdvertised).toBe(true); + expect(ros.listenerCount(server.name)).toEqual(1); + + await server.unadvertise(); + expect(server.isAdvertised).toBe(false); + expect(ros.listenerCount(server.name)).toEqual(0); + }) + + it('Handles multiple unadvertise calls gracefully', async () => { + const server = new Service({ + ros, + serviceType: 'std_srvs/Trigger', + name: '/test_multiple_unadvertise' + }); + + await server.advertise((_request, response) => { + response.success = true; + return true; + }); + + expect(server.isAdvertised).toBe(true); + + // First unadvertise + await server.unadvertise(); + expect(server.isAdvertised).toBe(false); + + // Second unadvertise - should not throw + await server.unadvertise(); + + expect(server.isAdvertised).toBe(false); + }) + + it('Handles re-advertisement with advertiseAsync gracefully', async () => { + const server = new Service({ + ros, + serviceType: 'std_srvs/Trigger', + name: '/test_readvertise_async' + }); + + // First advertisement + await server.advertiseAsync(async () => { + return { + success: true, + message: 'first' + } + }); + + expect(server.isAdvertised).toBe(true); + + // Re-advertise with different callback - should not throw + await server.advertiseAsync(async () => { + return { + success: true, + message: 'second' + } + }); + + expect(server.isAdvertised).toBe(true); + + await server.unadvertise(); + expect(server.isAdvertised).toBe(false); + }) + + it('Ensures operations are serialized through queue', async () => { + const server = new Service({ + ros, + serviceType: 'std_srvs/Trigger', + name: '/test_queue' + }); + + // Rapid advertise/unadvertise operations + const operations = [ + server.advertise((_request, response) => { + response.success = true; + response.message = 'first'; + return true; + }), + server.unadvertise(), + server.advertise((_request, response) => { + response.success = true; + response.message = 'second'; + return true; + }), + server.unadvertise(), + server.advertise((_request, response) => { + response.success = true; + response.message = 'third'; + return true; + }) + ]; + + // All operations should complete without errors + await Promise.all(operations); + + // Final state should be advertised + expect(server.isAdvertised).toBe(true); + + await server.unadvertise(); + expect(server.isAdvertised).toBe(false); + }) })