Skip to content

Commit 9a75626

Browse files
committed
introduce Endpoint entity
1 parent 4d33413 commit 9a75626

File tree

7 files changed

+181
-101
lines changed

7 files changed

+181
-101
lines changed

CMakePresets.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
"CMAKE_CROSS_CONFIGS": "all",
2323
"CMAKE_DEFAULT_BUILD_TYPE": "Release",
2424
"CMAKE_DEFAULT_CONFIGS": "Release",
25-
"CMAKE_PREFIX_PATH": "${sourceDir}/submodules/nunavut"
25+
"CMAKE_PREFIX_PATH": "${sourceDir}/submodules/nunavut",
26+
"CMAKE_CXX_FLAGS": "-DCETL_ENABLE_DEBUG_ASSERT=1"
2627
}
2728
},
2829
{

src/common/ipc/channel.hpp

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,12 @@ class Channel final : public AnyChannel
7171
setupEventHandler();
7272
}
7373

74+
~Channel() = default;
75+
7476
Channel(const Channel&) = delete;
7577
Channel& operator=(const Channel&) = delete;
7678
Channel& operator=(Channel&& other) noexcept = delete;
7779

78-
~Channel()
79-
{
80-
if (gateway_)
81-
{
82-
gateway_->setEventHandler(nullptr);
83-
}
84-
if (event_handler_)
85-
{
86-
event_handler_ = nullptr;
87-
}
88-
}
89-
9080
using SendFailure = nunavut::support::Error;
9181
using SendResult = cetl::optional<SendFailure>;
9282

@@ -107,22 +97,19 @@ class Channel final : public AnyChannel
10797
void setEventHandler(EventHandler event_handler)
10898
{
10999
event_handler_ = std::move(event_handler);
100+
setupEventHandler();
110101
}
111102

112103
private:
113104
friend class ClientRouter;
114105
friend class ServerRouter;
115106

116-
Channel(cetl::pmr::memory_resource& memory, detail::Gateway::Ptr gateway, EventHandler event_handler)
107+
Channel(cetl::pmr::memory_resource& memory, detail::Gateway::Ptr gateway)
117108
: memory_{memory}
118109
, gateway_{std::move(gateway)}
119-
, event_handler_{std::move(event_handler)}
120110
, output_type_id_{getTypeId<Output>()}
121111
{
122112
CETL_DEBUG_ASSERT(gateway_, "");
123-
CETL_DEBUG_ASSERT(event_handler_, "");
124-
125-
setupEventHandler();
126113
}
127114

128115
void setupEventHandler()

src/common/ipc/client_router.cpp

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <array>
2323
#include <cerrno>
24+
#include <cstddef>
2425
#include <cstdint>
2526
#include <memory>
2627
#include <unordered_map>
@@ -42,7 +43,7 @@ class ClientRouterImpl final : public ClientRouter
4243
ClientRouterImpl(cetl::pmr::memory_resource& memory, pipe::ClientPipe::Ptr client_pipe)
4344
: memory_{memory}
4445
, client_pipe_{std::move(client_pipe)}
45-
, next_tag_{0}
46+
, next_unique_tag_{0}
4647
{
4748
CETL_DEBUG_ASSERT(client_pipe_, "");
4849
}
@@ -69,14 +70,45 @@ class ClientRouterImpl final : public ClientRouter
6970

7071
CETL_NODISCARD detail::Gateway::Ptr makeGateway() override
7172
{
72-
const Tag new_tag = ++next_tag_;
73-
auto gateway = GatewayImpl::create(new_tag, *this);
74-
tag_to_gateway_[new_tag] = gateway;
75-
return gateway;
73+
const Endpoint endpoint{++next_unique_tag_};
74+
return GatewayImpl::create(*this, endpoint);
7675
}
7776

7877
private:
79-
using Tag = std::uint64_t;
78+
struct Endpoint final
79+
{
80+
using Tag = std::uint64_t;
81+
82+
explicit Endpoint(const Tag tag) noexcept
83+
: tag_{tag}
84+
{
85+
}
86+
87+
Tag getTag() const noexcept
88+
{
89+
return tag_;
90+
}
91+
92+
// Hasher
93+
94+
bool operator==(const Endpoint& other) const noexcept
95+
{
96+
return tag_ == other.tag_;
97+
}
98+
99+
struct Hasher final
100+
{
101+
std::size_t operator()(const Endpoint& endpoint) const noexcept
102+
{
103+
return std::hash<Tag>{}(endpoint.tag_);
104+
}
105+
106+
}; // Hasher
107+
108+
private:
109+
const Tag tag_;
110+
111+
}; // Endpoint
80112

81113
class GatewayImpl final : public std::enable_shared_from_this<GatewayImpl>, public detail::Gateway
82114
{
@@ -86,14 +118,14 @@ class ClientRouterImpl final : public ClientRouter
86118
};
87119

88120
public:
89-
static std::shared_ptr<GatewayImpl> create(const Tag tag, ClientRouterImpl& router)
121+
static std::shared_ptr<GatewayImpl> create(ClientRouterImpl& router, const Endpoint& endpoint)
90122
{
91-
return std::make_shared<GatewayImpl>(Private(), tag, router);
123+
return std::make_shared<GatewayImpl>(Private(), router, endpoint);
92124
}
93125

94-
GatewayImpl(Private, const Tag tag, ClientRouterImpl& router)
95-
: tag_{tag}
96-
, router_{router}
126+
GatewayImpl(Private, ClientRouterImpl& router, const Endpoint& endpoint)
127+
: router_{router}
128+
, endpoint_{endpoint}
97129
{
98130
}
99131

@@ -104,14 +136,14 @@ class ClientRouterImpl final : public ClientRouter
104136

105137
~GatewayImpl()
106138
{
107-
setEventHandler(nullptr);
139+
router_.unregisterGateway(endpoint_);
108140
}
109141

110142
void send(const detail::MsgTypeId type_id, const pipe::Payload payload) override
111143
{
112144
Route_1_0 route{&router_.memory_};
113145
auto& channel_msg = route.set_channel_msg();
114-
channel_msg.tag = tag_;
146+
channel_msg.tag = endpoint_.getTag();
115147
channel_msg.type_id = type_id;
116148

117149
tryPerformOnSerialized(route, [this, payload](const auto prefix) {
@@ -131,35 +163,40 @@ class ClientRouterImpl final : public ClientRouter
131163

132164
void setEventHandler(EventHandler event_handler) override
133165
{
134-
if (event_handler)
135-
{
136-
event_handler_ = std::move(event_handler);
137-
router_.tag_to_gateway_[tag_] = shared_from_this();
138-
}
139-
else
140-
{
141-
router_.tag_to_gateway_.erase(tag_);
142-
}
166+
router_.registerGateway(endpoint_, shared_from_this());
167+
event_handler_ = std::move(event_handler);
143168
}
144169

145170
private:
146-
const Tag tag_;
147171
ClientRouterImpl& router_;
172+
const Endpoint endpoint_;
148173
EventHandler event_handler_;
149174

150175
}; // GatewayImpl
151176

177+
using EndpointToWeakGateway = std::unordered_map<Endpoint, detail::Gateway::WeakPtr, Endpoint::Hasher>;
178+
179+
void registerGateway(const Endpoint& endpoint, detail::Gateway::WeakPtr gateway)
180+
{
181+
endpoint_to_gateway_[endpoint] = std::move(gateway);
182+
}
183+
184+
void unregisterGateway(const Endpoint& endpoint)
185+
{
186+
endpoint_to_gateway_.erase(endpoint);
187+
}
188+
152189
template <typename Action>
153190
void forEachGateway(Action action) const
154191
{
155192
// Calling an action might indirectly modify the map, so we first
156193
// collect strong pointers to gateways into a local collection.
157194
//
158195
std::vector<detail::Gateway::Ptr> gateways;
159-
gateways.reserve(tag_to_gateway_.size());
160-
for (const auto& pair : tag_to_gateway_)
196+
gateways.reserve(endpoint_to_gateway_.size());
197+
for (const auto& ep_to_gw : endpoint_to_gateway_)
161198
{
162-
const auto gateway = pair.second.lock();
199+
const auto gateway = ep_to_gw.second.lock();
163200
if (gateway)
164201
{
165202
gateways.push_back(gateway);
@@ -197,7 +234,8 @@ class ClientRouterImpl final : public ClientRouter
197234
return EINVAL;
198235
}
199236

200-
const auto remaining_payload = msg.payload.subspan(result_size.value());
237+
// Cut routing stuff from the payload - remaining is the actual message payload.
238+
const auto msg_payload = msg.payload.subspan(result_size.value());
201239

202240
cetl::visit(cetl::make_overloaded(
203241
//
@@ -206,9 +244,9 @@ class ClientRouterImpl final : public ClientRouter
206244
//
207245
handleRouteConnect(route_conn);
208246
},
209-
[this, remaining_payload](const RouteChannelMsg_1_0& route_channel) {
247+
[this, msg_payload](const RouteChannelMsg_1_0& route_channel) {
210248
//
211-
handleRouteChannelMsg(route_channel, remaining_payload);
249+
handleRouteChannelMsg(route_channel, msg_payload);
212250
}),
213251
route_msg.union_value);
214252

@@ -238,10 +276,12 @@ class ClientRouterImpl final : public ClientRouter
238276

239277
void handleRouteChannelMsg(const RouteChannelMsg_1_0& route_channel_msg, pipe::Payload payload)
240278
{
241-
const auto tag_it = tag_to_gateway_.find(route_channel_msg.tag);
242-
if (tag_it != tag_to_gateway_.end())
279+
const Endpoint endpoint{route_channel_msg.tag};
280+
281+
const auto ep_to_gw = endpoint_to_gateway_.find(endpoint);
282+
if (ep_to_gw != endpoint_to_gateway_.end())
243283
{
244-
const auto gateway = tag_it->second.lock();
284+
const auto gateway = ep_to_gw->second.lock();
245285
if (gateway)
246286
{
247287
gateway->event(detail::Gateway::Event::Message{payload});
@@ -251,10 +291,10 @@ class ClientRouterImpl final : public ClientRouter
251291
// TODO: log unsolicited message
252292
}
253293

254-
cetl::pmr::memory_resource& memory_;
255-
pipe::ClientPipe::Ptr client_pipe_;
256-
Tag next_tag_;
257-
std::unordered_map<Tag, detail::Gateway::WeakPtr> tag_to_gateway_;
294+
cetl::pmr::memory_resource& memory_;
295+
pipe::ClientPipe::Ptr client_pipe_;
296+
Endpoint::Tag next_unique_tag_;
297+
EndpointToWeakGateway endpoint_to_gateway_;
258298

259299
}; // ClientRouterImpl
260300

src/common/ipc/client_router.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ class ClientRouter
4242
template <typename Input, typename Output>
4343
CETL_NODISCARD Channel<Input, Output> makeChannel(AnyChannel::EventHandler<Input> event_handler)
4444
{
45-
return Channel<Input, Output>{memory(), makeGateway(), event_handler};
45+
auto channel = Channel<Input, Output>{memory(), makeGateway()};
46+
channel.setEventHandler(event_handler);
47+
return channel;
4648
}
4749

4850
protected:

src/common/ipc/pipe/unix_socket_server.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class ClientContextImpl final : public detail::ClientContext
4343
: id_{id}
4444
, fd_{fd}
4545
{
46-
CETL_DEBUG_ASSERT(client_fd != -1, "");
46+
CETL_DEBUG_ASSERT(fd_ != -1, "");
4747

4848
// NOLINTNEXTLINE *-vararg
4949
::syslog(LOG_NOTICE, "New client connection on fd=%d (id=%zu).", fd, id);
@@ -102,7 +102,7 @@ UnixSocketServer::~UnixSocketServer()
102102
int UnixSocketServer::start(EventHandler event_handler)
103103
{
104104
CETL_DEBUG_ASSERT(server_fd_ == -1, "");
105-
CETL_DEBUG_ASSERT(client_event_handler, "");
105+
CETL_DEBUG_ASSERT(event_handler, "");
106106

107107
event_handler_ = std::move(event_handler);
108108

@@ -174,7 +174,7 @@ void UnixSocketServer::handle_accept()
174174
}
175175

176176
CETL_DEBUG_ASSERT(client_fd != -1, "");
177-
CETL_DEBUG_ASSERT(client_contexts_.find(client_fd) == client_contexts_.end(), "");
177+
CETL_DEBUG_ASSERT(client_fd_to_context_.find(client_fd) == client_fd_to_context_.end(), "");
178178

179179
const ClientId new_client_id = ++unique_client_id_counter_;
180180
auto client_context = std::make_unique<ClientContextImpl>(new_client_id, client_fd);

0 commit comments

Comments
 (0)