Skip to content

Commit 505edeb

Browse files
committed
Problem: Handling multipart messages is complex
Solution: Add generic algorithms for sending and receiving multipart messages.
1 parent fdb2f13 commit 505edeb

File tree

5 files changed

+268
-0
lines changed

5 files changed

+268
-0
lines changed

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ add_executable(
2525
poller.cpp
2626
active_poller.cpp
2727
multipart.cpp
28+
recv_multipart.cpp
29+
send_multipart.cpp
2830
monitor.cpp
2931
utilities.cpp
3032
)

tests/recv_multipart.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#include <catch.hpp>
2+
#include <zmq_addon.hpp>
3+
4+
#ifdef ZMQ_CPP11
5+
6+
TEST_CASE("recv_multipart test", "[recv_multipart]")
7+
{
8+
zmq::context_t context(1);
9+
zmq::socket_t output(context, ZMQ_PAIR);
10+
zmq::socket_t input(context, ZMQ_PAIR);
11+
output.bind("inproc://multipart.test");
12+
input.connect("inproc://multipart.test");
13+
14+
SECTION("send 1 message")
15+
{
16+
input.send(zmq::str_buffer("hello"));
17+
18+
std::vector<zmq::message_t> msgs;
19+
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
20+
REQUIRE(ret);
21+
CHECK(*ret == 1);
22+
REQUIRE(msgs.size() == 1);
23+
CHECK(msgs[0].size() == 5);
24+
}
25+
SECTION("send 2 messages")
26+
{
27+
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
28+
input.send(zmq::str_buffer("world!"));
29+
30+
std::vector<zmq::message_t> msgs;
31+
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
32+
REQUIRE(ret);
33+
CHECK(*ret == 2);
34+
REQUIRE(msgs.size() == 2);
35+
CHECK(msgs[0].size() == 5);
36+
CHECK(msgs[1].size() == 6);
37+
}
38+
SECTION("send no messages, dontwait")
39+
{
40+
std::vector<zmq::message_t> msgs;
41+
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs), zmq::recv_flags::dontwait);
42+
CHECK_FALSE(ret);
43+
REQUIRE(msgs.size() == 0);
44+
}
45+
SECTION("send 1 partial message, dontwait")
46+
{
47+
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
48+
49+
std::vector<zmq::message_t> msgs;
50+
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs), zmq::recv_flags::dontwait);
51+
CHECK_FALSE(ret);
52+
REQUIRE(msgs.size() == 0);
53+
}
54+
SECTION("recv with invalid socket")
55+
{
56+
std::vector<zmq::message_t> msgs;
57+
CHECK_THROWS_AS(zmq::recv_multipart(zmq::socket_ref(), std::back_inserter(msgs)), const zmq::error_t &);
58+
}
59+
}
60+
#endif

tests/send_multipart.cpp

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include <forward_list>
2+
#include <catch.hpp>
3+
#include <zmq_addon.hpp>
4+
5+
#ifdef ZMQ_CPP11
6+
7+
TEST_CASE("send_multipart test", "[send_multipart]")
8+
{
9+
zmq::context_t context(1);
10+
zmq::socket_t output(context, ZMQ_PAIR);
11+
zmq::socket_t input(context, ZMQ_PAIR);
12+
output.bind("inproc://multipart.test");
13+
input.connect("inproc://multipart.test");
14+
15+
SECTION("send 0 messages")
16+
{
17+
std::vector<zmq::message_t> imsgs;
18+
auto iret = zmq::send_multipart(input, imsgs);
19+
REQUIRE(iret);
20+
CHECK(*iret == 0);
21+
}
22+
SECTION("send 1 message")
23+
{
24+
std::array<zmq::message_t, 1> imsgs = {zmq::message_t(3)};
25+
auto iret = zmq::send_multipart(input, imsgs);
26+
REQUIRE(iret);
27+
CHECK(*iret == 1);
28+
29+
std::vector<zmq::message_t> omsgs;
30+
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
31+
REQUIRE(oret);
32+
CHECK(*oret == 1);
33+
REQUIRE(omsgs.size() == 1);
34+
CHECK(omsgs[0].size() == 3);
35+
}
36+
SECTION("send 2 messages")
37+
{
38+
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
39+
auto iret = zmq::send_multipart(input, imsgs);
40+
REQUIRE(iret);
41+
CHECK(*iret == 2);
42+
43+
std::vector<zmq::message_t> omsgs;
44+
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
45+
REQUIRE(oret);
46+
CHECK(*oret == 2);
47+
REQUIRE(omsgs.size() == 2);
48+
CHECK(omsgs[0].size() == 3);
49+
CHECK(omsgs[1].size() == 4);
50+
}
51+
SECTION("send 2 messages, const_buffer")
52+
{
53+
std::array<zmq::const_buffer, 2> imsgs = {zmq::str_buffer("foo"), zmq::str_buffer("bar!")};
54+
auto iret = zmq::send_multipart(input, imsgs);
55+
REQUIRE(iret);
56+
CHECK(*iret == 2);
57+
58+
std::vector<zmq::message_t> omsgs;
59+
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
60+
REQUIRE(oret);
61+
CHECK(*oret == 2);
62+
REQUIRE(omsgs.size() == 2);
63+
CHECK(omsgs[0].size() == 3);
64+
CHECK(omsgs[1].size() == 4);
65+
}
66+
SECTION("send 2 messages, mutable_buffer")
67+
{
68+
char buf[4] = {};
69+
std::array<zmq::mutable_buffer, 2> imsgs = {zmq::buffer(buf, 3), zmq::buffer(buf)};
70+
auto iret = zmq::send_multipart(input, imsgs);
71+
REQUIRE(iret);
72+
CHECK(*iret == 2);
73+
74+
std::vector<zmq::message_t> omsgs;
75+
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
76+
REQUIRE(oret);
77+
CHECK(*oret == 2);
78+
REQUIRE(omsgs.size() == 2);
79+
CHECK(omsgs[0].size() == 3);
80+
CHECK(omsgs[1].size() == 4);
81+
}
82+
SECTION("send 2 messages, dontwait")
83+
{
84+
zmq::socket_t push(context, ZMQ_PUSH);
85+
push.bind("inproc://multipart.test.push");
86+
87+
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
88+
auto iret = zmq::send_multipart(push, imsgs, zmq::send_flags::dontwait);
89+
REQUIRE_FALSE(iret);
90+
}
91+
// TODO send with EAGAIN
92+
SECTION("send, misc. containers")
93+
{
94+
std::vector<zmq::message_t> msgs_vec;
95+
msgs_vec.emplace_back(3);
96+
msgs_vec.emplace_back(4);
97+
auto iret = zmq::send_multipart(input, msgs_vec);
98+
REQUIRE(iret);
99+
CHECK(*iret == 2);
100+
101+
std::forward_list<zmq::message_t> msgs_list;
102+
msgs_list.emplace_front(4);
103+
msgs_list.emplace_front(3);
104+
iret = zmq::send_multipart(input, msgs_list);
105+
REQUIRE(iret);
106+
CHECK(*iret == 2);
107+
108+
// init. list
109+
const auto msgs_il = {zmq::str_buffer("foo"), zmq::str_buffer("bar!")};
110+
iret = zmq::send_multipart(input, msgs_il);
111+
REQUIRE(iret);
112+
CHECK(*iret == 2);
113+
// rvalue
114+
iret = zmq::send_multipart(input,
115+
std::initializer_list<zmq::const_buffer>{zmq::str_buffer("foo"), zmq::str_buffer("bar!")});
116+
REQUIRE(iret);
117+
CHECK(*iret == 2);
118+
}
119+
SECTION("send with invalid socket")
120+
{
121+
std::vector<zmq::message_t> msgs(1);
122+
CHECK_THROWS_AS(zmq::send_multipart(zmq::socket_ref(), msgs), const zmq::error_t &);
123+
}
124+
}
125+
#endif

zmq.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,15 @@ inline const_buffer buffer(const const_buffer& cb, size_t n) noexcept
971971

972972
namespace detail
973973
{
974+
975+
template<class T>
976+
struct is_buffer
977+
{
978+
static constexpr bool value =
979+
std::is_same<T, const_buffer>::value ||
980+
std::is_same<T, mutable_buffer>::value;
981+
};
982+
974983
template<class T> struct is_pod_like
975984
{
976985
// NOTE: The networking draft N4771 section 16.11 requires

zmq_addon.hpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,78 @@
3737

3838
namespace zmq
3939
{
40+
41+
#ifdef ZMQ_CPP11
42+
43+
/* Receive a multipart message.
44+
45+
Writes the zmq::message_t objects to OutputIterator out.
46+
The out iterator must handle an unspecified amount of write,
47+
e.g. using std::back_inserter.
48+
49+
Returns: the number of messages received or nullopt (on EAGAIN).
50+
Throws: if recv throws.
51+
*/
52+
template<class OutputIt>
53+
ZMQ_NODISCARD detail::recv_result_t recv_multipart(socket_ref s, OutputIt out,
54+
recv_flags flags = recv_flags::none)
55+
{
56+
size_t msg_count = 0;
57+
message_t msg;
58+
while (true)
59+
{
60+
if (!s.recv(msg, flags))
61+
{
62+
// zmq ensures atomic delivery of messages
63+
assert(msg_count == 0);
64+
return {};
65+
}
66+
++msg_count;
67+
const bool more = msg.more();
68+
*out++ = std::move(msg);
69+
if (!more)
70+
break;
71+
}
72+
return msg_count;
73+
}
74+
75+
/* Send a multipart message.
76+
77+
The range must be a ForwardRange of zmq::message_t,
78+
zmq::const_buffer or zmq::mutable_buffer.
79+
The flags may be zmq::send_flags::sndmore if there are
80+
more message parts to be sent after the call to this function.
81+
82+
Returns: the number of messages sent or nullopt (on EAGAIN).
83+
Throws: if send throws.
84+
*/
85+
template<class Range,
86+
typename = typename std::enable_if<
87+
detail::is_range<Range>::value
88+
&& (std::is_same<detail::range_value_t<Range>, message_t>::value
89+
|| detail::is_buffer<detail::range_value_t<Range>>::value)
90+
>::type>
91+
detail::send_result_t send_multipart(socket_ref s, Range&& msgs,
92+
send_flags flags = send_flags::none)
93+
{
94+
auto it = msgs.begin();
95+
auto last = msgs.end();
96+
const size_t msg_count = static_cast<size_t>(std::distance(it, last));
97+
for (; it != last; ++it)
98+
{
99+
const auto mf = flags | (std::next(it) == last ? send_flags::none : send_flags::sndmore);
100+
if (!s.send(*it, mf))
101+
{
102+
// zmq ensures atomic delivery of messages
103+
assert(it == msgs.begin());
104+
return {};
105+
}
106+
}
107+
return msg_count;
108+
}
109+
110+
#endif
111+
40112
#ifdef ZMQ_HAS_RVALUE_REFS
41113

42114
/*

0 commit comments

Comments
 (0)