|
14 | 14 | #include <random.h>
|
15 | 15 | #include <serialize.h>
|
16 | 16 | #include <span.h>
|
| 17 | +#include <sync.h> |
17 | 18 |
|
| 19 | +#include <chrono> |
| 20 | +#include <optional> |
18 | 21 | #include <vector>
|
19 | 22 |
|
20 | 23 | void ConnmanTestMsg::Handshake(CNode& node,
|
@@ -137,3 +140,271 @@ std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candida
|
137 | 140 | }
|
138 | 141 | return candidates;
|
139 | 142 | }
|
| 143 | + |
| 144 | +// Have different ZeroSock (or others that inherit from it) objects have different |
| 145 | +// m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two |
| 146 | +// different objects comparing as equal. |
| 147 | +static std::atomic<SOCKET> g_mocked_sock_fd{0}; |
| 148 | + |
| 149 | +ZeroSock::ZeroSock() : Sock{g_mocked_sock_fd++} {} |
| 150 | + |
| 151 | +// Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that. |
| 152 | +ZeroSock::~ZeroSock() { m_socket = INVALID_SOCKET; } |
| 153 | + |
| 154 | +ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; } |
| 155 | + |
| 156 | +ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const |
| 157 | +{ |
| 158 | + memset(buf, 0x0, len); |
| 159 | + return len; |
| 160 | +} |
| 161 | + |
| 162 | +int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; } |
| 163 | + |
| 164 | +int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; } |
| 165 | + |
| 166 | +int ZeroSock::Listen(int) const { return 0; } |
| 167 | + |
| 168 | +std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const |
| 169 | +{ |
| 170 | + if (addr != nullptr) { |
| 171 | + // Pretend all connections come from 5.5.5.5:6789 |
| 172 | + memset(addr, 0x00, *addr_len); |
| 173 | + const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in)); |
| 174 | + if (*addr_len >= write_len) { |
| 175 | + *addr_len = write_len; |
| 176 | + sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr); |
| 177 | + addr_in->sin_family = AF_INET; |
| 178 | + memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr)); |
| 179 | + addr_in->sin_port = htons(6789); |
| 180 | + } |
| 181 | + } |
| 182 | + return std::make_unique<ZeroSock>(); |
| 183 | +} |
| 184 | + |
| 185 | +int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const |
| 186 | +{ |
| 187 | + std::memset(opt_val, 0x0, *opt_len); |
| 188 | + return 0; |
| 189 | +} |
| 190 | + |
| 191 | +int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; } |
| 192 | + |
| 193 | +int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const |
| 194 | +{ |
| 195 | + std::memset(name, 0x0, *name_len); |
| 196 | + return 0; |
| 197 | +} |
| 198 | + |
| 199 | +bool ZeroSock::SetNonBlocking() const { return true; } |
| 200 | + |
| 201 | +bool ZeroSock::IsSelectable() const { return true; } |
| 202 | + |
| 203 | +bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const |
| 204 | +{ |
| 205 | + if (occurred != nullptr) { |
| 206 | + *occurred = requested; |
| 207 | + } |
| 208 | + return true; |
| 209 | +} |
| 210 | + |
| 211 | +bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const |
| 212 | +{ |
| 213 | + for (auto& [sock, events] : events_per_sock) { |
| 214 | + (void)sock; |
| 215 | + events.occurred = events.requested; |
| 216 | + } |
| 217 | + return true; |
| 218 | +} |
| 219 | + |
| 220 | +ZeroSock& ZeroSock::operator=(Sock&& other) |
| 221 | +{ |
| 222 | + assert(false && "Move of Sock into ZeroSock not allowed."); |
| 223 | + return *this; |
| 224 | +} |
| 225 | + |
| 226 | +StaticContentsSock::StaticContentsSock(const std::string& contents) |
| 227 | + : m_contents{contents} |
| 228 | +{ |
| 229 | +} |
| 230 | + |
| 231 | +ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const |
| 232 | +{ |
| 233 | + const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)}; |
| 234 | + std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes); |
| 235 | + if ((flags & MSG_PEEK) == 0) { |
| 236 | + m_consumed += consume_bytes; |
| 237 | + } |
| 238 | + return consume_bytes; |
| 239 | +} |
| 240 | + |
| 241 | +StaticContentsSock& StaticContentsSock::operator=(Sock&& other) |
| 242 | +{ |
| 243 | + assert(false && "Move of Sock into StaticContentsSock not allowed."); |
| 244 | + return *this; |
| 245 | +} |
| 246 | + |
| 247 | +ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags) |
| 248 | +{ |
| 249 | + WAIT_LOCK(m_mutex, lock); |
| 250 | + |
| 251 | + if (m_data.empty()) { |
| 252 | + if (m_eof) { |
| 253 | + return 0; |
| 254 | + } |
| 255 | + errno = EAGAIN; // Same as recv(2) on a non-blocking socket. |
| 256 | + return -1; |
| 257 | + } |
| 258 | + |
| 259 | + const size_t read_bytes{std::min(len, m_data.size())}; |
| 260 | + |
| 261 | + std::memcpy(buf, m_data.data(), read_bytes); |
| 262 | + if ((flags & MSG_PEEK) == 0) { |
| 263 | + m_data.erase(m_data.begin(), m_data.begin() + read_bytes); |
| 264 | + } |
| 265 | + |
| 266 | + return read_bytes; |
| 267 | +} |
| 268 | + |
| 269 | +std::optional<CNetMessage> DynSock::Pipe::GetNetMsg() |
| 270 | +{ |
| 271 | + V1Transport transport{NodeId{0}}; |
| 272 | + |
| 273 | + { |
| 274 | + WAIT_LOCK(m_mutex, lock); |
| 275 | + |
| 276 | + WaitForDataOrEof(lock); |
| 277 | + if (m_eof && m_data.empty()) { |
| 278 | + return std::nullopt; |
| 279 | + } |
| 280 | + |
| 281 | + for (;;) { |
| 282 | + Span<const uint8_t> s{m_data}; |
| 283 | + if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s. |
| 284 | + return std::nullopt; |
| 285 | + } |
| 286 | + m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size()); |
| 287 | + if (transport.ReceivedMessageComplete()) { |
| 288 | + break; |
| 289 | + } |
| 290 | + if (m_data.empty()) { |
| 291 | + WaitForDataOrEof(lock); |
| 292 | + if (m_eof && m_data.empty()) { |
| 293 | + return std::nullopt; |
| 294 | + } |
| 295 | + } |
| 296 | + } |
| 297 | + } |
| 298 | + |
| 299 | + bool reject{false}; |
| 300 | + CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)}; |
| 301 | + if (reject) { |
| 302 | + return std::nullopt; |
| 303 | + } |
| 304 | + return std::make_optional<CNetMessage>(std::move(msg)); |
| 305 | +} |
| 306 | + |
| 307 | +void DynSock::Pipe::PushBytes(const void* buf, size_t len) |
| 308 | +{ |
| 309 | + LOCK(m_mutex); |
| 310 | + const uint8_t* b = static_cast<const uint8_t*>(buf); |
| 311 | + m_data.insert(m_data.end(), b, b + len); |
| 312 | + m_cond.notify_all(); |
| 313 | +} |
| 314 | + |
| 315 | +void DynSock::Pipe::Eof() |
| 316 | +{ |
| 317 | + LOCK(m_mutex); |
| 318 | + m_eof = true; |
| 319 | + m_cond.notify_all(); |
| 320 | +} |
| 321 | + |
| 322 | +void DynSock::Pipe::WaitForDataOrEof(UniqueLock<Mutex>& lock) |
| 323 | +{ |
| 324 | + Assert(lock.mutex() == &m_mutex); |
| 325 | + |
| 326 | + m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { |
| 327 | + AssertLockHeld(m_mutex); |
| 328 | + return !m_data.empty() || m_eof; |
| 329 | + }); |
| 330 | +} |
| 331 | + |
| 332 | +DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets) |
| 333 | + : m_pipes{pipes}, m_accept_sockets{accept_sockets} |
| 334 | +{ |
| 335 | +} |
| 336 | + |
| 337 | +DynSock::~DynSock() |
| 338 | +{ |
| 339 | + m_pipes->send.Eof(); |
| 340 | +} |
| 341 | + |
| 342 | +ssize_t DynSock::Recv(void* buf, size_t len, int flags) const |
| 343 | +{ |
| 344 | + return m_pipes->recv.GetBytes(buf, len, flags); |
| 345 | +} |
| 346 | + |
| 347 | +ssize_t DynSock::Send(const void* buf, size_t len, int) const |
| 348 | +{ |
| 349 | + m_pipes->send.PushBytes(buf, len); |
| 350 | + return len; |
| 351 | +} |
| 352 | + |
| 353 | +std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const |
| 354 | +{ |
| 355 | + ZeroSock::Accept(addr, addr_len); |
| 356 | + return m_accept_sockets->Pop().value_or(nullptr); |
| 357 | +} |
| 358 | + |
| 359 | +bool DynSock::Wait(std::chrono::milliseconds timeout, |
| 360 | + Event requested, |
| 361 | + Event* occurred) const |
| 362 | +{ |
| 363 | + EventsPerSock ev; |
| 364 | + ev.emplace(this, Events{requested}); |
| 365 | + const bool ret{WaitMany(timeout, ev)}; |
| 366 | + if (occurred != nullptr) { |
| 367 | + *occurred = ev.begin()->second.occurred; |
| 368 | + } |
| 369 | + return ret; |
| 370 | +} |
| 371 | + |
| 372 | +bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const |
| 373 | +{ |
| 374 | + const auto deadline = std::chrono::steady_clock::now() + timeout; |
| 375 | + bool at_least_one_event_occurred{false}; |
| 376 | + |
| 377 | + for (;;) { |
| 378 | + // Check all sockets for readiness without waiting. |
| 379 | + for (auto& [sock, events] : events_per_sock) { |
| 380 | + if ((events.requested & Sock::SEND) != 0) { |
| 381 | + // Always ready for Send(). |
| 382 | + events.occurred |= Sock::SEND; |
| 383 | + at_least_one_event_occurred = true; |
| 384 | + } |
| 385 | + |
| 386 | + if ((events.requested & Sock::RECV) != 0) { |
| 387 | + auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get()); |
| 388 | + uint8_t b; |
| 389 | + if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) { |
| 390 | + events.occurred |= Sock::RECV; |
| 391 | + at_least_one_event_occurred = true; |
| 392 | + } |
| 393 | + } |
| 394 | + } |
| 395 | + |
| 396 | + if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) { |
| 397 | + break; |
| 398 | + } |
| 399 | + |
| 400 | + std::this_thread::sleep_for(10ms); |
| 401 | + } |
| 402 | + |
| 403 | + return true; |
| 404 | +} |
| 405 | + |
| 406 | +DynSock& DynSock::operator=(Sock&&) |
| 407 | +{ |
| 408 | + assert(false && "Move of Sock into DynSock not allowed."); |
| 409 | + return *this; |
| 410 | +} |
0 commit comments