Skip to content

Fix Service advertisement race conditions #919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 118 additions & 58 deletions src/core/Service.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ export default class Service extends EventEmitter {
*/
_serviceCallback = null;
isAdvertised = false;
/**
* Queue for serializing advertise/unadvertise operations to prevent race conditions
* @private
*/
_operationQueue = Promise.resolve();
/**
* Track if an unadvertise operation is pending to prevent double operations
* @private
*/
_pendingUnadvertise = false;
/**
* @param {Object} options
* @param {Ros} options.ros - The ROSLIB.Ros connection handle.
Expand Down Expand Up @@ -93,84 +103,134 @@ export default class Service extends EventEmitter {
* @param {advertiseCallback} callback - This works similarly to the callback for a C++ service and should take the following params
*/
advertise(callback) {
if (this.isAdvertised) {
throw new Error('Cannot advertise the same Service twice!');
}
// Queue this operation to prevent race conditions
this._operationQueue = this._operationQueue.then(async () => {
// If already advertised, unadvertise first
if (this.isAdvertised) {
await this._doUnadvertise();
}

// Store the new callback for removal during un-advertisement
this._serviceCallback = (rosbridgeRequest) => {
var response = {};
var success = callback(rosbridgeRequest.args, response);
// Store the new callback for removal during un-advertisement
this._serviceCallback = (rosbridgeRequest) => {
var response = {};
var success = callback(rosbridgeRequest.args, response);

var call = {
op: 'service_response',
service: this.name,
values: response,
result: success
};
var call = {
op: 'service_response',
service: this.name,
values: response,
result: success
};

if (rosbridgeRequest.id) {
call.id = rosbridgeRequest.id;
}
if (rosbridgeRequest.id) {
call.id = rosbridgeRequest.id;
}

this.ros.callOnConnection(call);
};
this.ros.callOnConnection(call);
};

this.ros.on(this.name, this._serviceCallback);
this.ros.callOnConnection({
op: 'advertise_service',
type: this.serviceType,
service: this.name
this.ros.on(this.name, this._serviceCallback);
this.ros.callOnConnection({
op: 'advertise_service',
type: this.serviceType,
service: this.name
});
this.isAdvertised = true;
}).catch(err => {
this.emit('error', err);
throw err;
});
this.isAdvertised = true;

return this._operationQueue;
}

unadvertise() {
if (!this.isAdvertised) {
throw new Error(`Tried to un-advertise service ${this.name}, but it was not advertised!`);
/**
* Internal method to perform unadvertisement without queueing
* @private
*/
async _doUnadvertise() {
if (!this.isAdvertised || this._pendingUnadvertise) {
return;
}
this.ros.callOnConnection({
op: 'unadvertise_service',
service: this.name
});
// Remove the registered callback
if (this._serviceCallback) {
this.ros.off(this.name, this._serviceCallback);

this._pendingUnadvertise = true;

try {
// Mark as not advertised first to prevent new service calls
// This ensures callService() will not be blocked while we're unadvertising
this.isAdvertised = false;

// Remove the registered callback to stop processing new requests
if (this._serviceCallback) {
this.ros.off(this.name, this._serviceCallback);
this._serviceCallback = null;
}

// Send the unadvertise message to the server
// Note: This is fire-and-forget, but the operation queue ensures
// no new advertise can start until this completes
this.ros.callOnConnection({
op: 'unadvertise_service',
service: this.name
});
} finally {
this._pendingUnadvertise = false;
}
this.isAdvertised = false;
}

unadvertise() {
// Queue this operation to prevent race conditions
this._operationQueue = this._operationQueue.then(async () => {
await this._doUnadvertise();
}).catch(err => {
this.emit('error', err);
throw err;
});

return this._operationQueue;
}

/**
* An alternate form of Service advertisement that supports a modern Promise-based interface for use with async/await.
* @param {(request: TRequest) => Promise<TResponse>} callback An asynchronous callback processing the request and returning a response.
*/
advertiseAsync(callback) {
if (this.isAdvertised) {
throw new Error('Cannot advertise the same Service twice!');
}
this._serviceCallback = async (rosbridgeRequest) => {
/** @type {{op: string, service: string, values?: TResponse, result: boolean, id?: string}} */
let rosbridgeResponse = {
op: 'service_response',
service: this.name,
result: false
// Queue this operation to prevent race conditions
this._operationQueue = this._operationQueue.then(async () => {
// If already advertised, unadvertise first
if (this.isAdvertised) {
await this._doUnadvertise();
}
try {
rosbridgeResponse.values = await callback(rosbridgeRequest.args);
rosbridgeResponse.result = true;
} finally {
if (rosbridgeRequest.id) {
rosbridgeResponse.id = rosbridgeRequest.id;

this._serviceCallback = async (rosbridgeRequest) => {
/** @type {{op: string, service: string, values?: TResponse, result: boolean, id?: string}} */
let rosbridgeResponse = {
op: 'service_response',
service: this.name,
result: false
}
try {
rosbridgeResponse.values = await callback(rosbridgeRequest.args);
rosbridgeResponse.result = true;
} finally {
if (rosbridgeRequest.id) {
rosbridgeResponse.id = rosbridgeRequest.id;
}
this.ros.callOnConnection(rosbridgeResponse);
}
this.ros.callOnConnection(rosbridgeResponse);
}
}
this.ros.on(this.name, this._serviceCallback);
this.ros.callOnConnection({
op: 'advertise_service',
type: this.serviceType,
service: this.name
this.ros.on(this.name, this._serviceCallback);
this.ros.callOnConnection({
op: 'advertise_service',
type: this.serviceType,
service: this.name
});
this.isAdvertised = true;
}).catch(err => {
this.emit('error', err);
throw err;
});
this.isAdvertised = true;

return this._operationQueue;
}
}
135 changes: 131 additions & 4 deletions test/service.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ describe('Service', () => {
const ros = new Ros({
url: 'ws://localhost:9090'
});


it('Successfully advertises a service with an async return', async () => {
const server = new Service({
ros,
serviceType: 'std_srvs/Trigger',
name: '/test_service'
});
server.advertiseAsync(async () => {
await server.advertiseAsync(async () => {
return {
success: true,
message: 'foo'
Expand All @@ -26,7 +28,7 @@ describe('Service', () => {
expect(response).toEqual({success: true, message: 'foo'});
// Make sure un-advertisement actually disposes of the event handler
expect(ros.listenerCount(server.name)).toEqual(1);
server.unadvertise();
await server.unadvertise();
expect(ros.listenerCount(server.name)).toEqual(0);
})
it('Successfully advertises a service with a synchronous return', async () => {
Expand All @@ -35,7 +37,7 @@ describe('Service', () => {
serviceType: 'std_srvs/Trigger',
name: '/test_service'
});
server.advertise((request, response) => {
await server.advertise((_request, response) => {
response.success = true;
response.message = 'bar';
return true;
Expand All @@ -49,7 +51,132 @@ describe('Service', () => {
expect(response).toEqual({success: true, message: 'bar'});
// Make sure un-advertisement actually disposes of the event handler
expect(ros.listenerCount(server.name)).toEqual(1);
server.unadvertise();
await server.unadvertise();
expect(ros.listenerCount(server.name)).toEqual(0);
})

it('Handles re-advertisement gracefully without throwing errors', async () => {
const server = new Service({
ros,
serviceType: 'std_srvs/Trigger',
name: '/test_readvertise'
});

// First advertisement
await server.advertise((_request, response) => {
response.success = true;
response.message = 'first';
return true;
});

expect(server.isAdvertised).toBe(true);
expect(ros.listenerCount(server.name)).toEqual(1);

// Re-advertise with different callback - should not throw
await server.advertise((_request, response) => {
response.success = true;
response.message = 'second';
return true;
});

expect(server.isAdvertised).toBe(true);
expect(ros.listenerCount(server.name)).toEqual(1);

await server.unadvertise();
expect(server.isAdvertised).toBe(false);
expect(ros.listenerCount(server.name)).toEqual(0);
})

it('Handles multiple unadvertise calls gracefully', async () => {
const server = new Service({
ros,
serviceType: 'std_srvs/Trigger',
name: '/test_multiple_unadvertise'
});

await server.advertise((_request, response) => {
response.success = true;
return true;
});

expect(server.isAdvertised).toBe(true);

// First unadvertise
await server.unadvertise();
expect(server.isAdvertised).toBe(false);

// Second unadvertise - should not throw
await server.unadvertise();

expect(server.isAdvertised).toBe(false);
})

it('Handles re-advertisement with advertiseAsync gracefully', async () => {
const server = new Service({
ros,
serviceType: 'std_srvs/Trigger',
name: '/test_readvertise_async'
});

// First advertisement
await server.advertiseAsync(async () => {
return {
success: true,
message: 'first'
}
});

expect(server.isAdvertised).toBe(true);

// Re-advertise with different callback - should not throw
await server.advertiseAsync(async () => {
return {
success: true,
message: 'second'
}
});

expect(server.isAdvertised).toBe(true);

await server.unadvertise();
expect(server.isAdvertised).toBe(false);
})

it('Ensures operations are serialized through queue', async () => {
const server = new Service({
ros,
serviceType: 'std_srvs/Trigger',
name: '/test_queue'
});

// Rapid advertise/unadvertise operations
const operations = [
server.advertise((_request, response) => {
response.success = true;
response.message = 'first';
return true;
}),
server.unadvertise(),
server.advertise((_request, response) => {
response.success = true;
response.message = 'second';
return true;
}),
server.unadvertise(),
server.advertise((_request, response) => {
response.success = true;
response.message = 'third';
return true;
})
];

// All operations should complete without errors
await Promise.all(operations);

// Final state should be advertised
expect(server.isAdvertised).toBe(true);

await server.unadvertise();
expect(server.isAdvertised).toBe(false);
})
})