Skip to content

Commit b448b01

Browse files
committed
test: add a mocked Sock that allows inspecting what has been Send() to it
And also allows gradually providing the data to be returned by `Recv()` and sending and receiving net messages (`CNetMessage`).
1 parent f186414 commit b448b01

File tree

2 files changed

+322
-0
lines changed

2 files changed

+322
-0
lines changed

src/test/util/net.cpp

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
#include <random.h>
1515
#include <serialize.h>
1616
#include <span.h>
17+
#include <sync.h>
1718

19+
#include <chrono>
20+
#include <optional>
1821
#include <vector>
1922

2023
void ConnmanTestMsg::Handshake(CNode& node,
@@ -240,3 +243,168 @@ StaticContentsSock& StaticContentsSock::operator=(Sock&& other)
240243
assert(false && "Move of Sock into StaticContentsSock not allowed.");
241244
return *this;
242245
}
246+
247+
ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
248+
{
249+
WAIT_LOCK(m_mutex, lock);
250+
251+
if (m_data.empty()) {
252+
if (m_eof) {
253+
return 0;
254+
}
255+
errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
256+
return -1;
257+
}
258+
259+
const size_t read_bytes{std::min(len, m_data.size())};
260+
261+
std::memcpy(buf, m_data.data(), read_bytes);
262+
if ((flags & MSG_PEEK) == 0) {
263+
m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
264+
}
265+
266+
return read_bytes;
267+
}
268+
269+
std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
270+
{
271+
V1Transport transport{NodeId{0}};
272+
273+
{
274+
WAIT_LOCK(m_mutex, lock);
275+
276+
WaitForDataOrEof(lock);
277+
if (m_eof && m_data.empty()) {
278+
return std::nullopt;
279+
}
280+
281+
for (;;) {
282+
Span<const uint8_t> s{m_data};
283+
if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
284+
return std::nullopt;
285+
}
286+
m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
287+
if (transport.ReceivedMessageComplete()) {
288+
break;
289+
}
290+
if (m_data.empty()) {
291+
WaitForDataOrEof(lock);
292+
if (m_eof && m_data.empty()) {
293+
return std::nullopt;
294+
}
295+
}
296+
}
297+
}
298+
299+
bool reject{false};
300+
CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
301+
if (reject) {
302+
return std::nullopt;
303+
}
304+
return std::make_optional<CNetMessage>(std::move(msg));
305+
}
306+
307+
void DynSock::Pipe::PushBytes(const void* buf, size_t len)
308+
{
309+
LOCK(m_mutex);
310+
const uint8_t* b = static_cast<const uint8_t*>(buf);
311+
m_data.insert(m_data.end(), b, b + len);
312+
m_cond.notify_all();
313+
}
314+
315+
void DynSock::Pipe::Eof()
316+
{
317+
LOCK(m_mutex);
318+
m_eof = true;
319+
m_cond.notify_all();
320+
}
321+
322+
void DynSock::Pipe::WaitForDataOrEof(UniqueLock<Mutex>& lock)
323+
{
324+
Assert(lock.mutex() == &m_mutex);
325+
326+
m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
327+
AssertLockHeld(m_mutex);
328+
return !m_data.empty() || m_eof;
329+
});
330+
}
331+
332+
DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
333+
: m_pipes{pipes}, m_accept_sockets{accept_sockets}
334+
{
335+
}
336+
337+
DynSock::~DynSock()
338+
{
339+
m_pipes->send.Eof();
340+
}
341+
342+
ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
343+
{
344+
return m_pipes->recv.GetBytes(buf, len, flags);
345+
}
346+
347+
ssize_t DynSock::Send(const void* buf, size_t len, int) const
348+
{
349+
m_pipes->send.PushBytes(buf, len);
350+
return len;
351+
}
352+
353+
std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
354+
{
355+
ZeroSock::Accept(addr, addr_len);
356+
return m_accept_sockets->Pop().value_or(nullptr);
357+
}
358+
359+
bool DynSock::Wait(std::chrono::milliseconds timeout,
360+
Event requested,
361+
Event* occurred) const
362+
{
363+
EventsPerSock ev;
364+
ev.emplace(this, Events{requested});
365+
const bool ret{WaitMany(timeout, ev)};
366+
if (occurred != nullptr) {
367+
*occurred = ev.begin()->second.occurred;
368+
}
369+
return ret;
370+
}
371+
372+
bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
373+
{
374+
const auto deadline = std::chrono::steady_clock::now() + timeout;
375+
bool at_least_one_event_occurred{false};
376+
377+
for (;;) {
378+
// Check all sockets for readiness without waiting.
379+
for (auto& [sock, events] : events_per_sock) {
380+
if ((events.requested & Sock::SEND) != 0) {
381+
// Always ready for Send().
382+
events.occurred |= Sock::SEND;
383+
at_least_one_event_occurred = true;
384+
}
385+
386+
if ((events.requested & Sock::RECV) != 0) {
387+
auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
388+
uint8_t b;
389+
if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
390+
events.occurred |= Sock::RECV;
391+
at_least_one_event_occurred = true;
392+
}
393+
}
394+
}
395+
396+
if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
397+
break;
398+
}
399+
400+
std::this_thread::sleep_for(10ms);
401+
}
402+
403+
return true;
404+
}
405+
406+
DynSock& DynSock::operator=(Sock&&)
407+
{
408+
assert(false && "Move of Sock into DynSock not allowed.");
409+
return *this;
410+
}

src/test/util/net.h

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#define BITCOIN_TEST_UTIL_NET_H
77

88
#include <compat/compat.h>
9+
#include <netmessagemaker.h>
910
#include <net.h>
1011
#include <net_permissions.h>
1112
#include <net_processing.h>
@@ -19,9 +20,11 @@
1920
#include <array>
2021
#include <cassert>
2122
#include <chrono>
23+
#include <condition_variable>
2224
#include <cstdint>
2325
#include <cstring>
2426
#include <memory>
27+
#include <optional>
2528
#include <string>
2629
#include <unordered_map>
2730
#include <vector>
@@ -204,6 +207,157 @@ class StaticContentsSock : public ZeroSock
204207
mutable size_t m_consumed{0};
205208
};
206209

210+
/**
211+
* A mocked Sock alternative that allows providing the data to be returned by Recv()
212+
* and inspecting the data that has been supplied to Send().
213+
*/
214+
class DynSock : public ZeroSock
215+
{
216+
public:
217+
/**
218+
* Unidirectional bytes or CNetMessage queue (FIFO).
219+
*/
220+
class Pipe
221+
{
222+
public:
223+
/**
224+
* Get bytes and remove them from the pipe.
225+
* @param[in] buf Destination to write bytes to.
226+
* @param[in] len Write up to this number of bytes.
227+
* @param[in] flags Same as the flags of `recv(2)`. Just `MSG_PEEK` is honored.
228+
* @return The number of bytes written to `buf`. `0` if `Eof()` has been called.
229+
* If no bytes are available then `-1` is returned and `errno` is set to `EAGAIN`.
230+
*/
231+
ssize_t GetBytes(void* buf, size_t len, int flags = 0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
232+
233+
/**
234+
* Deserialize a `CNetMessage` and remove it from the pipe.
235+
* If not enough bytes are available then the function will wait. If parsing fails
236+
* or EOF is signaled to the pipe, then `std::nullopt` is returned.
237+
*/
238+
std::optional<CNetMessage> GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
239+
240+
/**
241+
* Push bytes to the pipe.
242+
*/
243+
void PushBytes(const void* buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
244+
245+
/**
246+
* Construct and push CNetMessage to the pipe.
247+
*/
248+
template <typename... Args>
249+
void PushNetMsg(const std::string& type, Args&&... payload) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
250+
251+
/**
252+
* Signal end-of-file on the receiving end (`GetBytes()` or `GetNetMsg()`).
253+
*/
254+
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
255+
256+
private:
257+
/**
258+
* Return when there is some data to read or EOF has been signaled.
259+
* @param[in,out] lock Unique lock that must have been derived from `m_mutex` by `WAIT_LOCK(m_mutex, lock)`.
260+
*/
261+
void WaitForDataOrEof(UniqueLock<Mutex>& lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex);
262+
263+
Mutex m_mutex;
264+
std::condition_variable m_cond;
265+
std::vector<uint8_t> m_data GUARDED_BY(m_mutex);
266+
bool m_eof GUARDED_BY(m_mutex){false};
267+
};
268+
269+
struct Pipes {
270+
Pipe recv;
271+
Pipe send;
272+
};
273+
274+
/**
275+
* A basic thread-safe queue, used for queuing sockets to be returned by Accept().
276+
*/
277+
class Queue
278+
{
279+
public:
280+
using S = std::unique_ptr<DynSock>;
281+
282+
void Push(S s) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
283+
{
284+
LOCK(m_mutex);
285+
m_queue.push(std::move(s));
286+
}
287+
288+
std::optional<S> Pop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
289+
{
290+
LOCK(m_mutex);
291+
if (m_queue.empty()) {
292+
return std::nullopt;
293+
}
294+
S front{std::move(m_queue.front())};
295+
m_queue.pop();
296+
return front;
297+
}
298+
299+
bool Empty() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
300+
{
301+
LOCK(m_mutex);
302+
return m_queue.empty();
303+
}
304+
305+
private:
306+
mutable Mutex m_mutex;
307+
std::queue<S> m_queue GUARDED_BY(m_mutex);
308+
};
309+
310+
/**
311+
* Create a new mocked sock.
312+
* @param[in] pipes Send/recv pipes used by the Send() and Recv() methods.
313+
* @param[in] accept_sockets Sockets to return by the Accept() method.
314+
*/
315+
explicit DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets);
316+
317+
~DynSock();
318+
319+
ssize_t Recv(void* buf, size_t len, int flags) const override;
320+
321+
ssize_t Send(const void* buf, size_t len, int) const override;
322+
323+
std::unique_ptr<Sock> Accept(sockaddr* addr, socklen_t* addr_len) const override;
324+
325+
bool Wait(std::chrono::milliseconds timeout,
326+
Event requested,
327+
Event* occurred = nullptr) const override;
328+
329+
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override;
330+
331+
private:
332+
DynSock& operator=(Sock&&) override;
333+
334+
std::shared_ptr<Pipes> m_pipes;
335+
std::shared_ptr<Queue> m_accept_sockets;
336+
};
337+
338+
template <typename... Args>
339+
void DynSock::Pipe::PushNetMsg(const std::string& type, Args&&... payload)
340+
{
341+
auto msg = NetMsg::Make(type, std::forward<Args>(payload)...);
342+
V1Transport transport{NodeId{0}};
343+
344+
const bool queued{transport.SetMessageToSend(msg)};
345+
assert(queued);
346+
347+
LOCK(m_mutex);
348+
349+
for (;;) {
350+
const auto& [bytes, _more, _msg_type] = transport.GetBytesToSend(/*have_next_message=*/true);
351+
if (bytes.empty()) {
352+
break;
353+
}
354+
m_data.insert(m_data.end(), bytes.begin(), bytes.end());
355+
transport.MarkBytesSent(bytes.size());
356+
}
357+
358+
m_cond.notify_all();
359+
}
360+
207361
std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context);
208362

209363
#endif // BITCOIN_TEST_UTIL_NET_H

0 commit comments

Comments
 (0)