Skip to content

Commit ae26346

Browse files
committed
net: introduce Sock::WaitMany()
It allows waiting concurrently on more than one socket. Being a `virtual` `Sock` method it can be overriden by tests. Will be used to replace `CConnman::SocketEvents()`.
1 parent cc74459 commit ae26346

File tree

5 files changed

+147
-45
lines changed

5 files changed

+147
-45
lines changed

src/test/fuzz/util.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,15 @@ bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, Event*
223223
return true;
224224
}
225225

226+
bool FuzzedSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
227+
{
228+
for (auto& [sock, events] : events_per_sock) {
229+
(void)sock;
230+
events.occurred = m_fuzzed_data_provider.ConsumeBool() ? events.requested : 0;
231+
}
232+
return true;
233+
}
234+
226235
bool FuzzedSock::IsConnected(std::string& errmsg) const
227236
{
228237
if (m_fuzzed_data_provider.ConsumeBool()) {

src/test/fuzz/util.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ class FuzzedSock : public Sock
7272

7373
bool Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred = nullptr) const override;
7474

75+
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override;
76+
7577
bool IsConnected(std::string& errmsg) const override;
7678
};
7779

src/test/util/net.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,15 @@ class StaticContentsSock : public Sock
162162
return true;
163163
}
164164

165+
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override
166+
{
167+
for (auto& [sock, events] : events_per_sock) {
168+
(void)sock;
169+
events.occurred = events.requested;
170+
}
171+
return true;
172+
}
173+
165174
private:
166175
const std::string m_contents;
167176
mutable size_t m_consumed;

src/util/sock.cpp

Lines changed: 75 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -113,73 +113,103 @@ int Sock::SetSockOpt(int level, int opt_name, const void* opt_val, socklen_t opt
113113

114114
bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
115115
{
116-
#ifdef USE_POLL
117-
pollfd fd;
118-
fd.fd = m_socket;
119-
fd.events = 0;
120-
if (requested & RECV) {
121-
fd.events |= POLLIN;
122-
}
123-
if (requested & SEND) {
124-
fd.events |= POLLOUT;
125-
}
116+
// We need a `shared_ptr` owning `this` for `WaitMany()`, but don't want
117+
// `this` to be destroyed when the `shared_ptr` goes out of scope at the
118+
// end of this function. Create it with a custom noop deleter.
119+
std::shared_ptr<const Sock> shared{this, [](const Sock*) {}};
120+
121+
EventsPerSock events_per_sock{std::make_pair(shared, Events{requested})};
126122

127-
if (poll(&fd, 1, count_milliseconds(timeout)) == SOCKET_ERROR) {
123+
if (!WaitMany(timeout, events_per_sock)) {
128124
return false;
129125
}
130126

131127
if (occurred != nullptr) {
132-
*occurred = 0;
133-
if (fd.revents & POLLIN) {
134-
*occurred |= RECV;
135-
}
136-
if (fd.revents & POLLOUT) {
137-
*occurred |= SEND;
128+
*occurred = events_per_sock.begin()->second.occurred;
129+
}
130+
131+
return true;
132+
}
133+
134+
bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
135+
{
136+
#ifdef USE_POLL
137+
std::vector<pollfd> pfds;
138+
for (const auto& [sock, events] : events_per_sock) {
139+
pfds.emplace_back();
140+
auto& pfd = pfds.back();
141+
pfd.fd = sock->m_socket;
142+
if (events.requested & RECV) {
143+
pfd.events |= POLLIN;
138144
}
139-
if (fd.revents & (POLLERR | POLLHUP)) {
140-
*occurred |= ERR;
145+
if (events.requested & SEND) {
146+
pfd.events |= POLLOUT;
141147
}
142148
}
143149

144-
return true;
145-
#else
146-
if (!IsSelectableSocket(m_socket)) {
150+
if (poll(pfds.data(), pfds.size(), count_milliseconds(timeout)) == SOCKET_ERROR) {
147151
return false;
148152
}
149153

150-
fd_set fdset_recv;
151-
fd_set fdset_send;
152-
fd_set fdset_err;
153-
FD_ZERO(&fdset_recv);
154-
FD_ZERO(&fdset_send);
155-
FD_ZERO(&fdset_err);
156-
157-
if (requested & RECV) {
158-
FD_SET(m_socket, &fdset_recv);
154+
assert(pfds.size() == events_per_sock.size());
155+
size_t i{0};
156+
for (auto& [sock, events] : events_per_sock) {
157+
assert(sock->m_socket == static_cast<SOCKET>(pfds[i].fd));
158+
events.occurred = 0;
159+
if (pfds[i].revents & POLLIN) {
160+
events.occurred |= RECV;
161+
}
162+
if (pfds[i].revents & POLLOUT) {
163+
events.occurred |= SEND;
164+
}
165+
if (pfds[i].revents & (POLLERR | POLLHUP)) {
166+
events.occurred |= ERR;
167+
}
168+
++i;
159169
}
160170

161-
if (requested & SEND) {
162-
FD_SET(m_socket, &fdset_send);
171+
return true;
172+
#else
173+
fd_set recv;
174+
fd_set send;
175+
fd_set err;
176+
FD_ZERO(&recv);
177+
FD_ZERO(&send);
178+
FD_ZERO(&err);
179+
SOCKET socket_max{0};
180+
181+
for (const auto& [sock, events] : events_per_sock) {
182+
const auto& s = sock->m_socket;
183+
if (!IsSelectableSocket(s)) {
184+
return false;
185+
}
186+
if (events.requested & RECV) {
187+
FD_SET(s, &recv);
188+
}
189+
if (events.requested & SEND) {
190+
FD_SET(s, &send);
191+
}
192+
FD_SET(s, &err);
193+
socket_max = std::max(socket_max, s);
163194
}
164195

165-
FD_SET(m_socket, &fdset_err);
166-
167-
timeval timeout_struct = MillisToTimeval(timeout);
196+
timeval tv = MillisToTimeval(timeout);
168197

169-
if (select(m_socket + 1, &fdset_recv, &fdset_send, &fdset_err, &timeout_struct) == SOCKET_ERROR) {
198+
if (select(socket_max + 1, &recv, &send, &err, &tv) == SOCKET_ERROR) {
170199
return false;
171200
}
172201

173-
if (occurred != nullptr) {
174-
*occurred = 0;
175-
if (FD_ISSET(m_socket, &fdset_recv)) {
176-
*occurred |= RECV;
202+
for (auto& [sock, events] : events_per_sock) {
203+
const auto& s = sock->m_socket;
204+
events.occurred = 0;
205+
if (FD_ISSET(s, &recv)) {
206+
events.occurred |= RECV;
177207
}
178-
if (FD_ISSET(m_socket, &fdset_send)) {
179-
*occurred |= SEND;
208+
if (FD_ISSET(s, &send)) {
209+
events.occurred |= SEND;
180210
}
181-
if (FD_ISSET(m_socket, &fdset_err)) {
182-
*occurred |= ERR;
211+
if (FD_ISSET(s, &err)) {
212+
events.occurred |= ERR;
183213
}
184214
}
185215

src/util/sock.h

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <chrono>
1313
#include <memory>
1414
#include <string>
15+
#include <unordered_map>
1516

1617
/**
1718
* Maximum time to wait for I/O readiness.
@@ -157,6 +158,57 @@ class Sock
157158
Event requested,
158159
Event* occurred = nullptr) const;
159160

161+
/**
162+
* Auxiliary requested/occurred events to wait for in `WaitMany()`.
163+
*/
164+
struct Events {
165+
explicit Events(Event req) : requested{req}, occurred{0} {}
166+
Event requested;
167+
Event occurred;
168+
};
169+
170+
struct HashSharedPtrSock {
171+
size_t operator()(const std::shared_ptr<const Sock>& s) const
172+
{
173+
return s ? s->m_socket : std::numeric_limits<SOCKET>::max();
174+
}
175+
};
176+
177+
struct EqualSharedPtrSock {
178+
bool operator()(const std::shared_ptr<const Sock>& lhs,
179+
const std::shared_ptr<const Sock>& rhs) const
180+
{
181+
if (lhs && rhs) {
182+
return lhs->m_socket == rhs->m_socket;
183+
}
184+
if (!lhs && !rhs) {
185+
return true;
186+
}
187+
return false;
188+
}
189+
};
190+
191+
/**
192+
* On which socket to wait for what events in `WaitMany()`.
193+
* The `shared_ptr` is copied into the map to ensure that the `Sock` object
194+
* is not destroyed (its destructor would close the underlying socket).
195+
* If this happens shortly before or after we call `poll(2)` and a new
196+
* socket gets created under the same file descriptor number then the report
197+
* from `WaitMany()` will be bogus.
198+
*/
199+
using EventsPerSock = std::unordered_map<std::shared_ptr<const Sock>, Events, HashSharedPtrSock, EqualSharedPtrSock>;
200+
201+
/**
202+
* Same as `Wait()`, but wait on many sockets within the same timeout.
203+
* @param[in] timeout Wait this long for at least one of the requested events to occur.
204+
* @param[in,out] events_per_sock Wait for the requested events on these sockets and set
205+
* `occurred` for the events that actually occurred.
206+
* @return true on success (or timeout, if all `what[].occurred` are returned as 0),
207+
* false otherwise
208+
*/
209+
[[nodiscard]] virtual bool WaitMany(std::chrono::milliseconds timeout,
210+
EventsPerSock& events_per_sock) const;
211+
160212
/* Higher level, convenience, methods. These may throw. */
161213

162214
/**

0 commit comments

Comments
 (0)