Skip to content

Commit cc119cb

Browse files
committed
corrected send flow for client/server routers
1 parent 3ff6325 commit cc119cb

File tree

5 files changed

+92
-42
lines changed

5 files changed

+92
-42
lines changed

src/common/ipc/client_router.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ class ClientRouterImpl final : public ClientRouter
262262
///
263263
/// Called on the gateway disposal (correspondingly on its channel destruction).
264264
/// The "dying" gateway might wish to notify the remote router about its disposal.
265-
/// The router fulfills the wish if the gateway was registered and the router is connected.
265+
/// This local router fulfills the wish if the gateway was registered and the router is connected.
266266
///
267267
void onGatewayDisposal(const Endpoint& endpoint, const bool send_ch_end)
268268
{

src/common/ipc/server_router.cpp

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,9 @@ class ServerRouterImpl final : public ServerRouter
147147

148148
~GatewayImpl()
149149
{
150-
router_.unregisterGateway(endpoint_, true);
151150
::syslog(LOG_DEBUG, "~Gateway(cl=%zu, tag=%zu).", endpoint_.getClientId(), endpoint_.getTag());
151+
152+
router_.onGatewayDisposal(endpoint_);
152153
}
153154

154155
// detail::Gateway
@@ -159,6 +160,10 @@ class ServerRouterImpl final : public ServerRouter
159160
{
160161
return static_cast<int>(ErrorCode::NotConnected);
161162
}
163+
if (!router_.isRegisteredGateway(endpoint_))
164+
{
165+
return static_cast<int>(ErrorCode::Disconnected);
166+
}
162167

163168
Route_1_0 route{&router_.memory_};
164169

@@ -183,16 +188,8 @@ class ServerRouterImpl final : public ServerRouter
183188

184189
void subscribe(EventHandler event_handler) override
185190
{
186-
if (event_handler)
187-
{
188-
event_handler_ = std::move(event_handler);
189-
router_.registerGateway(endpoint_, *this);
190-
}
191-
else
192-
{
193-
event_handler_ = nullptr;
194-
router_.unregisterGateway(endpoint_, false);
195-
}
191+
event_handler_ = std::move(event_handler);
192+
router_.onGatewaySubscription(endpoint_);
196193
}
197194

198195
private:
@@ -211,24 +208,62 @@ class ServerRouterImpl final : public ServerRouter
211208
return connected_client_ids_.find(endpoint.getClientId()) != connected_client_ids_.end();
212209
}
213210

214-
void registerGateway(const Endpoint& endpoint, GatewayImpl& gateway)
211+
CETL_NODISCARD bool isRegisteredGateway(const Endpoint& endpoint) const noexcept
212+
{
213+
return endpoint_to_gateway_.find(endpoint) != endpoint_to_gateway_.end();
214+
}
215+
216+
template <typename Action>
217+
void findAndActOnRegisteredGateway(const Endpoint endpoint, Action&& action)
218+
{
219+
const auto ep_to_gw = endpoint_to_gateway_.find(endpoint);
220+
if (ep_to_gw != endpoint_to_gateway_.end())
221+
{
222+
const auto gateway = ep_to_gw->second.lock();
223+
if (gateway)
224+
{
225+
std::forward<Action>(action)(*gateway, ep_to_gw);
226+
}
227+
}
228+
}
229+
230+
void onGatewaySubscription(const Endpoint endpoint)
215231
{
216-
endpoint_to_gateway_[endpoint] = gateway.shared_from_this();
217232
if (isConnected(endpoint))
218233
{
219-
gateway.event(detail::Gateway::Event::Connected{});
234+
findAndActOnRegisteredGateway(endpoint, [](auto& gateway, auto) {
235+
//
236+
gateway.event(detail::Gateway::Event::Connected{});
237+
});
220238
}
221239
}
222240

223-
void unregisterGateway(const Endpoint& endpoint, const bool is_disposed = false)
241+
/// Unregisters the gateway associated with the given endpoint.
242+
///
243+
/// Called on the gateway disposal (correspondingly on its channel destruction).
244+
/// The "dying" gateway wishes to notify the remote client router about its disposal.
245+
/// This local router fulfills the wish if the gateway was registered and the client router is connected.
246+
///
247+
void onGatewayDisposal(const Endpoint& endpoint)
224248
{
225-
endpoint_to_gateway_.erase(endpoint);
249+
const bool was_registered = (endpoint_to_gateway_.erase(endpoint) > 0);
226250

227-
// Notify "remote" router about the gateway disposal.
228-
// The router will deliver "disconnected" event to the counterpart gateway (if it exists).
251+
// Notify remote client router about the gateway disposal (aka channel completion).
252+
// The router will propagate "ChEnd" event to the counterpart gateway (if it's registered).
229253
//
230-
if (is_disposed && isConnected(endpoint))
254+
if (was_registered && isConnected(endpoint))
231255
{
256+
Route_1_0 route{&memory_};
257+
auto& channel_end = route.set_channel_end();
258+
channel_end.tag = endpoint.getTag();
259+
channel_end.error_code = 0; // No error b/c it's a normal channel completion.
260+
261+
const int result = tryPerformOnSerialized(route, [this, &endpoint](const auto payload) {
262+
//
263+
return server_pipe_->send(endpoint.getClientId(), {{payload}});
264+
});
265+
// Best efforts strategy - gateway anyway is gone, so nowhere to report.
266+
(void) result;
232267
}
233268
}
234269

@@ -318,7 +353,8 @@ class ServerRouterImpl final : public ServerRouter
318353
const auto si_to_cf = service_id_to_channel_factory_.find(route_ch_msg.service_id);
319354
if (si_to_cf != service_id_to_channel_factory_.end())
320355
{
321-
auto gateway = GatewayImpl::create(*this, endpoint);
356+
auto gateway = GatewayImpl::create(*this, endpoint);
357+
endpoint_to_gateway_[endpoint] = gateway;
322358
si_to_cf->second(gateway, msg_payload);
323359
}
324360
}

test/common/ipc/ipc_gtest_helpers.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ testing::PolymorphicMatcher<PayloadMatcher<T>> PayloadWith(
164164
return testing::MakePolymorphicMatcher(PayloadMatcher<T>(matcher, memory));
165165
}
166166

167-
inline auto PayloadRouteConnectEq(cetl::pmr::memory_resource& mr,
167+
inline auto PayloadOfRouteConnect(cetl::pmr::memory_resource& mr,
168168
const std::uint8_t ver_major = VERSION_MAJOR,
169169
const std::uint8_t ver_minor = VERSION_MINOR)
170170
{
@@ -182,6 +182,14 @@ auto PayloadOfRouteChannel(cetl::pmr::memory_resource& mr,
182182
return PayloadWith<Route_1_0>(testing::VariantWith<RouteChannelMsg_1_0>(msg), mr);
183183
}
184184

185+
inline auto PayloadOfRouteChannelEnd(cetl::pmr::memory_resource& mr, //
186+
const std::uint64_t tag,
187+
const ErrorCode error_code)
188+
{
189+
const RouteChannelEnd_1_0 ch_end{{tag, static_cast<std::int32_t>(error_code), &mr}, &mr};
190+
return PayloadWith<Route_1_0>(testing::VariantWith<RouteChannelEnd_1_0>(ch_end), mr);
191+
}
192+
185193
} // namespace ipc
186194
} // namespace common
187195
} // namespace ocvsmd

test/common/ipc/test_client_router.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include "pipe/client_pipe_mock.hpp"
1414
#include "tracking_memory_resource.hpp"
1515

16-
#include "ocvsmd/common/ipc/RouteChannelEnd_1_0.hpp"
1716
#include "ocvsmd/common/ipc/RouteChannelMsg_1_0.hpp"
1817
#include "ocvsmd/common/ipc/RouteConnect_1_0.hpp"
1918
#include "ocvsmd/common/ipc/Route_1_0.hpp"
@@ -82,7 +81,8 @@ class TestClientRouter : public testing::Test
8281
void emulateRouteConnect(pipe::ClientPipeMock& client_pipe_mock)
8382
{
8483
// client RouteConnect -> server
85-
EXPECT_CALL(client_pipe_mock, send(PayloadRouteConnectEq(mr_))).WillOnce(Return(0));
84+
EXPECT_CALL(client_pipe_mock, send(PayloadOfRouteConnect(mr_))) //
85+
.WillOnce(Return(0));
8686
client_pipe_mock.event_handler_(pipe::ClientPipe::Event::Connected{});
8787

8888
// Server -> client RouteConnect
@@ -205,13 +205,15 @@ TEST_F(TestClientRouter, makeChannel_send)
205205

206206
const std::uint64_t tag = 0;
207207
std::uint64_t seq = 0;
208-
EXPECT_CALL(client_pipe_mock, send(PayloadOfRouteChannel<Msg>(mr_, tag, seq++))).WillOnce(Return(0));
208+
EXPECT_CALL(client_pipe_mock, send(PayloadOfRouteChannel<Msg>(mr_, tag, seq++))) //
209+
.WillOnce(Return(0));
209210
EXPECT_THAT(channel.send(msg), 0);
210211

211-
EXPECT_CALL(client_pipe_mock, send(PayloadOfRouteChannel<Msg>(mr_, tag, seq++))).WillOnce(Return(0));
212+
EXPECT_CALL(client_pipe_mock, send(PayloadOfRouteChannel<Msg>(mr_, tag, seq++))) //
213+
.WillOnce(Return(0));
212214
EXPECT_THAT(channel.send(msg), 0);
213215

214-
EXPECT_CALL(client_pipe_mock, send(PayloadWith<Route_1_0>(VariantWith<RouteChannelEnd_1_0>(_), mr_)))
216+
EXPECT_CALL(client_pipe_mock, send(PayloadOfRouteChannelEnd(mr_, tag, ErrorCode::Success))) //
215217
.WillOnce(Return(0));
216218
}
217219

test/common/ipc/test_server_router.cpp

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
#include "pipe/server_pipe_mock.hpp"
1313
#include "tracking_memory_resource.hpp"
1414

15-
#include "ocvsmd/common/ipc/RouteConnect_1_0.hpp"
1615
#include "ocvsmd/common/ipc/Route_1_0.hpp"
1716
#include "ocvsmd/common/node_command/ExecCmd_1_0.hpp"
1817

@@ -169,40 +168,45 @@ TEST_F(TestServerRouter, channel_send)
169168
EXPECT_THAT(server_router->start(), 0);
170169
EXPECT_THAT(server_pipe_mock.event_handler_, IsTrue());
171170

172-
StrictMock<MockFunction<void(const Channel::EventVar&)>> ch1_event_mock;
171+
StrictMock<MockFunction<void(const Channel::EventVar&)>> ch_event_mock;
173172

174173
cetl::optional<Channel> maybe_channel;
175174
server_router->registerChannel<Channel>("", [&](Channel&& ch, const auto& input) {
176175
//
177-
ch.subscribe(ch1_event_mock.AsStdFunction());
176+
ch.subscribe(ch_event_mock.AsStdFunction());
178177
maybe_channel = std::move(ch);
179-
ch1_event_mock.Call(input);
178+
ch_event_mock.Call(input);
180179
});
181180
EXPECT_THAT(maybe_channel.has_value(), IsFalse());
182181

183182
// Emulate that client #42 is connected.
184183
//
185-
EXPECT_CALL(ch1_event_mock, Call(VariantWith<Channel::Connected>(_))).Times(1);
186-
emulatePipeConnect(42, server_pipe_mock);
184+
constexpr std::uint64_t cl_id = 42;
185+
EXPECT_CALL(ch_event_mock, Call(VariantWith<Channel::Connected>(_))).Times(1);
186+
emulatePipeConnect(cl_id, server_pipe_mock);
187187

188-
// Emulate that client posted initial `RouteChannelMsg` on 42/1 client/tag pair.
188+
// Emulate that client posted initial `RouteChannelMsg` on 42/7 client/tag pair.
189189
//
190-
const std::uint64_t tag = 1;
190+
const std::uint64_t tag = 7;
191191
std::uint64_t seq = 0;
192-
EXPECT_CALL(ch1_event_mock, Call(VariantWith<Channel::Input>(_))).Times(1);
193-
emulateRouteChannelMsg(42, server_pipe_mock, tag, Channel::Input{&mr_}, seq);
192+
EXPECT_CALL(ch_event_mock, Call(VariantWith<Channel::Input>(_))).Times(1);
193+
emulateRouteChannelMsg(cl_id, server_pipe_mock, tag, Channel::Input{&mr_}, seq);
194194
ASSERT_THAT(maybe_channel.has_value(), IsTrue());
195+
EXPECT_CALL(server_pipe_mock, send(cl_id, PayloadOfRouteChannelEnd(mr_, tag, ErrorCode::Success))) //
196+
.WillOnce(Return(0));
195197

196-
// Emulate that client posted one more `RouteChannelMsg` on the same 42/1 client/tag pair.
198+
// Emulate that client posted one more `RouteChannelMsg` on the same 42/7 client/tag pair.
197199
//
198-
EXPECT_CALL(ch1_event_mock, Call(VariantWith<Channel::Input>(_))).Times(1);
199-
emulateRouteChannelMsg(42, server_pipe_mock, tag, Channel::Input{&mr_}, seq);
200+
EXPECT_CALL(ch_event_mock, Call(VariantWith<Channel::Input>(_))).Times(1);
201+
emulateRouteChannelMsg(cl_id, server_pipe_mock, tag, Channel::Input{&mr_}, seq);
200202

201203
seq = 0;
202-
EXPECT_CALL(server_pipe_mock, send(42, PayloadOfRouteChannel<Msg>(mr_, tag, seq++))).WillOnce(Return(0));
204+
EXPECT_CALL(server_pipe_mock, send(cl_id, PayloadOfRouteChannel<Msg>(mr_, tag, seq++))) //
205+
.WillOnce(Return(0));
203206
EXPECT_THAT(maybe_channel->send(Channel::Output{&mr_}), 0);
204207

205-
EXPECT_CALL(server_pipe_mock, send(42, PayloadOfRouteChannel<Msg>(mr_, tag, seq++))).WillOnce(Return(0));
208+
EXPECT_CALL(server_pipe_mock, send(cl_id, PayloadOfRouteChannel<Msg>(mr_, tag, seq++))) //
209+
.WillOnce(Return(0));
206210
EXPECT_THAT(maybe_channel->send(Channel::Output{&mr_}), 0);
207211
}
208212

0 commit comments

Comments
 (0)