77
88#include " dsdl_helpers.hpp"
99#include " gateway.hpp"
10+ #include " ipc_types.hpp"
1011#include " pipe/client_pipe.hpp"
11- #include " pipe/pipe_types.hpp"
1212
1313#include " ocvsmd/common/ipc/RouteChannelEnd_1_0.hpp"
1414#include " ocvsmd/common/ipc/RouteChannelMsg_1_0.hpp"
@@ -44,7 +44,7 @@ class ClientRouterImpl final : public ClientRouter
4444 ClientRouterImpl (cetl::pmr::memory_resource& memory, pipe::ClientPipe::Ptr client_pipe)
4545 : memory_{memory}
4646 , client_pipe_{std::move (client_pipe)}
47- , last_unique_tag_ {0 }
47+ , next_tag_ {0 }
4848 , is_connected_{false }
4949 {
5050 CETL_DEBUG_ASSERT (client_pipe_, " " );
@@ -72,8 +72,11 @@ class ClientRouterImpl final : public ClientRouter
7272
7373 CETL_NODISCARD detail::Gateway::Ptr makeGateway () override
7474 {
75- const Endpoint endpoint{++last_unique_tag_};
76- return GatewayImpl::create (*this , endpoint);
75+ const Endpoint endpoint{next_tag_++};
76+
77+ auto gateway = GatewayImpl::create (*this , endpoint);
78+ endpoint_to_gateway_[endpoint] = gateway;
79+ return gateway;
7780 }
7881
7982private:
@@ -128,7 +131,7 @@ class ClientRouterImpl final : public ClientRouter
128131 GatewayImpl (Private, ClientRouterImpl& router, const Endpoint& endpoint)
129132 : router_{router}
130133 , endpoint_{endpoint}
131- , sequence_ {0 }
134+ , next_sequence_ {0 }
132135 {
133136 ::syslog (LOG_DEBUG, " Gateway(tag=%zu)." , endpoint.getTag());
134137 }
@@ -140,24 +143,27 @@ class ClientRouterImpl final : public ClientRouter
140143
141144 ~GatewayImpl ()
142145 {
143- router_.unregisterGateway (endpoint_, true );
144- ::syslog (LOG_DEBUG, " ~Gateway(tag=%zu)." , endpoint_.getTag());
146+ ::syslog (LOG_DEBUG, " ~Gateway(tag=%zu, seq=%zu)." , endpoint_.getTag(), next_sequence_);
147+
148+ // `next_sequence_ == 0` means that this gateway was never used for sending messages,
149+ // and so remote router never knew about it (its tag) - no need to post "ChEnd" event.
150+ router_.onGatewayDisposal (endpoint_, next_sequence_ > 0 );
145151 }
146152
147153 // detail::Gateway
148154
149- CETL_NODISCARD int send (const detail::ServiceId service_id, const pipe:: Payload payload) override
155+ CETL_NODISCARD int send (const detail::ServiceId service_id, const Payload payload) override
150156 {
151157 if (!router_.is_connected_ )
152158 {
153- return ENOTCONN ;
159+ return static_cast < int >(ErrorCode::NotConnected) ;
154160 }
155161
156162 Route_1_0 route{&router_.memory_ };
157163
158164 auto & channel_msg = route.set_channel_msg ();
159165 channel_msg.tag = endpoint_.getTag ();
160- channel_msg.sequence = sequence_ ++;
166+ channel_msg.sequence = next_sequence_ ++;
161167 channel_msg.service_id = service_id;
162168
163169 return tryPerformOnSerialized (route, [this , payload](const auto prefix) {
@@ -176,22 +182,14 @@ class ClientRouterImpl final : public ClientRouter
176182
177183 void subscribe (EventHandler event_handler) override
178184 {
179- if (event_handler)
180- {
181- event_handler_ = std::move (event_handler);
182- router_.registerGateway (endpoint_, *this );
183- }
184- else
185- {
186- event_handler_ = nullptr ;
187- router_.unregisterGateway (endpoint_);
188- }
185+ event_handler_ = std::move (event_handler);
186+ router_.onGatewaySubscription (endpoint_);
189187 }
190188
191189 private:
192190 ClientRouterImpl& router_;
193191 const Endpoint endpoint_;
194- std::uint64_t sequence_ ;
192+ std::uint64_t next_sequence_ ;
195193 EventHandler event_handler_;
196194
197195 }; // GatewayImpl
@@ -203,60 +201,83 @@ class ClientRouterImpl final : public ClientRouter
203201 return is_connected_;
204202 }
205203
206- void registerGateway (const Endpoint& endpoint, GatewayImpl& gateway)
204+ template <typename Action>
205+ void findRegisteredGateway (const Endpoint endpoint, Action&& action)
206+ {
207+ const auto ep_to_gw = endpoint_to_gateway_.find (endpoint);
208+ if (ep_to_gw != endpoint_to_gateway_.end ())
209+ {
210+ const auto gateway = ep_to_gw->second .lock ();
211+ if (gateway)
212+ {
213+ std::forward<Action>(action)(*gateway, ep_to_gw);
214+ }
215+ }
216+ }
217+
218+ template <typename Action>
219+ void forEachRegisteredGateway (Action action)
220+ {
221+ // Calling an action might indirectly modify the map, so we first
222+ // collect strong pointers to gateways into a local collection.
223+ //
224+ std::vector<detail::Gateway::Ptr> gateway_ptrs;
225+ gateway_ptrs.reserve (endpoint_to_gateway_.size ());
226+ for (const auto & ep_to_gw : endpoint_to_gateway_)
227+ {
228+ const auto gateway_ptr = ep_to_gw.second .lock ();
229+ if (gateway_ptr)
230+ {
231+ gateway_ptrs.push_back (gateway_ptr);
232+ }
233+ }
234+
235+ for (const auto & gateway_ptr : gateway_ptrs)
236+ {
237+ action (*gateway_ptr);
238+ }
239+ }
240+
241+ void onGatewaySubscription (const Endpoint endpoint)
207242 {
208- endpoint_to_gateway_[endpoint] = gateway.shared_from_this ();
209243 if (is_connected_)
210244 {
211- gateway.event (detail::Gateway::Event::Connected{});
245+ findRegisteredGateway (endpoint, [](auto & gateway, auto ) {
246+ //
247+ gateway.event (detail::Gateway::Event::Connected{});
248+ });
212249 }
213250 }
214251
215- void unregisterGateway (const Endpoint& endpoint, const bool is_disposed = false )
252+ // / Unregisters the gateway associated with the given endpoint.
253+ // /
254+ // / Called on the gateway disposal (correspondingly on its channel destruction).
255+ // / The "dying" gateway might wish to notify the remote router about its disposal.
256+ // / The router fulfills the wish if the gateway was registered and the router is connected.
257+ // /
258+ void onGatewayDisposal (const Endpoint& endpoint, const bool send_ch_end)
216259 {
217- endpoint_to_gateway_.erase (endpoint);
260+ const bool was_registered = ( endpoint_to_gateway_.erase (endpoint) > 0 );
218261
219- // Notify "remote" router about the gateway disposal.
220- // The router will deliver "disconnected " event to the counterpart gateway (if it exists ).
262+ // Notify "remote" router about the gateway disposal (aka channel completion) .
263+ // The router will propagate "ChEnd " event to the counterpart gateway (if it's registered ).
221264 //
222- if (is_disposed && isConnected (endpoint))
265+ if (was_registered && send_ch_end && isConnected (endpoint))
223266 {
224267 Route_1_0 route{&memory_};
225268 auto & channel_end = route.set_channel_end ();
226269 channel_end.tag = endpoint.getTag ();
227- channel_end.error_code = 0 ;
270+ channel_end.error_code = 0 ; // No error b/c it's a normal channel completion.
228271
229272 const int result = tryPerformOnSerialized (route, [this ](const auto payload) {
230273 //
231274 return client_pipe_->send ({{payload}});
232275 });
276+ // Best efforts strategy - gateway anyway is gone, so nowhere to report.
233277 (void ) result;
234278 }
235279 }
236280
237- template <typename Action>
238- void forEachGateway (Action action) const
239- {
240- // Calling an action might indirectly modify the map, so we first
241- // collect strong pointers to gateways into a local collection.
242- //
243- std::vector<detail::Gateway::Ptr> gateways;
244- gateways.reserve (endpoint_to_gateway_.size ());
245- for (const auto & ep_to_gw : endpoint_to_gateway_)
246- {
247- const auto gateway = ep_to_gw.second .lock ();
248- if (gateway)
249- {
250- gateways.push_back (gateway);
251- }
252- }
253-
254- for (const auto & gateway : gateways)
255- {
256- action (gateway);
257- }
258- }
259-
260281 CETL_NODISCARD int handlePipeEvent (const pipe::ClientPipe::Event::Connected) const
261282 {
262283 // TODO: log client pipe connection
@@ -295,7 +316,7 @@ class ClientRouterImpl final : public ClientRouter
295316 //
296317 handleRouteChannelMsg (route_ch_msg, msg_payload);
297318 },
298- [this , msg_payload ](const RouteChannelEnd_1_0& route_ch_end) {
319+ [this ](const RouteChannelEnd_1_0& route_ch_end) {
299320 //
300321 handleRouteChannelEnd (route_ch_end);
301322 }),
@@ -306,40 +327,42 @@ class ClientRouterImpl final : public ClientRouter
306327
307328 CETL_NODISCARD int handlePipeEvent (const pipe::ClientPipe::Event::Disconnected)
308329 {
309- // TODO: log client pipe disconnection
310-
311330 if (is_connected_)
312331 {
313332 is_connected_ = false ;
314333
315- // The whole router is disconnected, so we need to notify all local gateways.
334+ // The whole router is disconnected, so we need to unregister and notify all gateways.
316335 //
317- forEachGateway ([](const auto & gateway) {
318- //
319- gateway->event (detail::Gateway::Event::Disconnected{});
320- });
336+ EndpointToWeakGateway local_gateways;
337+ std::swap (local_gateways, endpoint_to_gateway_);
338+ for (const auto & ep_to_gw : local_gateways)
339+ {
340+ const auto gateway = ep_to_gw.second .lock ();
341+ if (gateway)
342+ {
343+ gateway->event (detail::Gateway::Event::Completed{ErrorCode::Disconnected});
344+ }
345+ }
321346 }
322347 return 0 ;
323348 }
324349
325350 void handleRouteConnect (const RouteConnect_1_0&)
326351 {
327- // TODO: log server route connection
328-
329352 if (!is_connected_)
330353 {
331354 is_connected_ = true ;
332355
333356 // We've got connection response from the server, so we need to notify all local gateways.
334357 //
335- forEachGateway ([](const auto & gateway) {
358+ forEachRegisteredGateway ([](auto & gateway) {
336359 //
337- gateway-> event (detail::Gateway::Event::Connected{});
360+ gateway. event (detail::Gateway::Event::Connected{});
338361 });
339362 }
340363 }
341364
342- void handleRouteChannelMsg (const RouteChannelMsg_1_0& route_ch_msg, pipe:: Payload payload)
365+ void handleRouteChannelMsg (const RouteChannelMsg_1_0& route_ch_msg, const Payload payload)
343366 {
344367 const Endpoint endpoint{route_ch_msg.tag };
345368
@@ -357,11 +380,21 @@ class ClientRouterImpl final : public ClientRouter
357380 // TODO: log unsolicited message
358381 }
359382
360- void handleRouteChannelEnd (const RouteChannelEnd_1_0& route_ch_end) {}
383+ void handleRouteChannelEnd (const RouteChannelEnd_1_0& route_ch_end)
384+ {
385+ const Endpoint endpoint{route_ch_end.tag };
386+ const auto error_code = static_cast <ErrorCode>(route_ch_end.error_code );
387+
388+ findRegisteredGateway (endpoint, [this , error_code](auto & gateway, auto it) {
389+ //
390+ endpoint_to_gateway_.erase (it);
391+ gateway.event (detail::Gateway::Event::Completed{error_code});
392+ });
393+ }
361394
362395 cetl::pmr::memory_resource& memory_;
363396 pipe::ClientPipe::Ptr client_pipe_;
364- Endpoint::Tag last_unique_tag_ ;
397+ Endpoint::Tag next_tag_ ;
365398 EndpointToWeakGateway endpoint_to_gateway_;
366399 bool is_connected_;
367400
0 commit comments