Skip to content

Commit 52e0706

Browse files
bpwilcoxskyegalaxy
authored andcommitted
Fix mutltiple client requests (#142)
* store map of unique request id to client id and callback info pair * fix map end check * fix undefined reference * remove unnecessary request id erase, remove/fix unique id comment * improve unique id comment (cherry picked from commit b865383)
1 parent b9328f8 commit 52e0706

File tree

3 files changed

+38
-8
lines changed

3 files changed

+38
-8
lines changed

rclcpp/include/rclcpp/experimental/service_intra_process.hpp

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,22 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
108108
return std::static_pointer_cast<void>(data);
109109
}
110110

111-
void send_response(uint64_t intra_process_client_id, SharedResponse & response)
111+
void send_response(int64_t client_request_id, SharedResponse & response)
112112
{
113113
std::unique_lock<std::recursive_mutex> lock(reentrant_mutex_);
114114

115+
auto client_request_it = callback_info_.find(client_request_id);
116+
117+
if (client_request_it == callback_info_.end()) {
118+
RCLCPP_WARN(
119+
rclcpp::get_logger("rclcpp"),
120+
"Calling intra_process_service_send_response for invalid or no "
121+
"longer existing request id");
122+
123+
return;
124+
}
125+
126+
auto intra_process_client_id = client_request_it->second.first;
115127
auto client_it = clients_.find(intra_process_client_id);
116128

117129
if (client_it == clients_.end()) {
@@ -120,7 +132,7 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
120132
"Calling intra_process_service_send_response for invalid or no "
121133
"longer existing client id");
122134

123-
callback_info_.erase(intra_process_client_id);
135+
callback_info_.erase(client_request_id);
124136
return;
125137
}
126138

@@ -130,14 +142,14 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
130142
auto client = std::dynamic_pointer_cast<
131143
rclcpp::experimental::ClientIntraProcess<ServiceT>>(
132144
client_intra_process_base);
133-
CallbackInfoVariant & value = callback_info_[intra_process_client_id];
145+
CallbackInfoVariant & value = client_request_it->second.second;
134146
client->store_intra_process_response(
135147
std::make_pair(std::move(response), std::move(value)));
136148
} else {
137149
clients_.erase(client_it);
138150
}
139151

140-
callback_info_.erase(intra_process_client_id);
152+
callback_info_.erase(client_request_id);
141153
}
142154

143155
void execute(const std::shared_ptr<void> & data)
@@ -154,18 +166,20 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
154166
uint64_t intra_process_client_id = ptr->first;
155167
SharedRequest & typed_request = ptr->second.first;
156168
CallbackInfoVariant & value = ptr->second.second;
157-
callback_info_.emplace(std::make_pair(intra_process_client_id, std::move(value)));
158169

159170
// To allow for the user callback to handle deferred responses for IPC in an ambiguous way,
160171
// we are overloading the rmw_request_id semantics to provide the intra process client ID.
172+
uint64_t client_request_id = get_unique_request_id();
161173
auto req_id = std::make_shared<rmw_request_id_t>();
162-
req_id->sequence_number = intra_process_client_id;
174+
req_id->sequence_number = client_request_id;
163175
req_id->from_intra_process = true;
164176

177+
callback_info_.emplace(std::make_pair(req_id->sequence_number, std::make_pair(intra_process_client_id, std::move(value))));
178+
165179
SharedResponse response = any_callback_.dispatch(serv_handle, req_id, std::move(typed_request));
166180

167181
if (response) {
168-
send_response(intra_process_client_id, response);
182+
send_response(req_id->sequence_number, response);
169183
}
170184
}
171185

@@ -182,7 +196,7 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
182196

183197
// Store callback variants in a map to support deferred response
184198
// access by intra-process client id.
185-
std::unordered_map<uint64_t, CallbackInfoVariant> callback_info_;
199+
std::unordered_map<int64_t, std::pair<uint64_t, CallbackInfoVariant>> callback_info_;
186200
};
187201

188202
} // namespace experimental

rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ class ServiceIntraProcessBase : public rclcpp::Waitable
194194
}
195195
}
196196

197+
uint64_t get_unique_request_id();
198+
197199
using ClientMap =
198200
std::unordered_map<uint64_t, rclcpp::experimental::ClientIntraProcessBase::WeakPtr>;
199201

rclcpp/src/rclcpp/service_intra_process_base.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,17 @@ ServiceIntraProcessBase::add_intra_process_client(
4343
std::unique_lock<std::recursive_mutex> lock(reentrant_mutex_);
4444
clients_[client_id] = client;
4545
}
46+
47+
uint64_t
48+
ServiceIntraProcessBase::get_unique_request_id()
49+
{
50+
static std::atomic<uint64_t> _next_unique_id {1};
51+
52+
auto next_id = _next_unique_id.fetch_add(1, std::memory_order_relaxed);
53+
if (0 == next_id) {
54+
throw std::overflow_error(
55+
"exhausted the unique ids for client requests in this process "
56+
"(congratulations your computer is either extremely fast or extremely old)");
57+
}
58+
return next_id;
59+
}

0 commit comments

Comments
 (0)