Skip to content

Commit a34d2a3

Browse files
authored
Merge pull request #358 from gummif/gfa/send-recv-multipart
Problem: Handling multipart messages is complex
2 parents 829997d + d4d3ce3 commit a34d2a3

File tree

6 files changed

+308
-2
lines changed

6 files changed

+308
-2
lines changed

README.md

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ Supported platforms
4141

4242
Examples
4343
========
44-
This example requires at least C++11.
44+
These examples require at least C++11.
4545
```c++
46-
#include <string>
4746
#include <zmq.hpp>
4847

4948
int main()
@@ -54,6 +53,36 @@ int main()
5453
sock.send(zmq::str_buffer("Hello, world"), zmq::send_flags::dontwait);
5554
}
5655
```
56+
This a more complex example where we send and receive multi-part messages.
57+
```c++
58+
#include <iostream>
59+
#include <zmq_addon.hpp>
60+
61+
int main()
62+
{
63+
zmq::context_t ctx;
64+
zmq::socket_t sock1(ctx, zmq::socket_type::pair);
65+
zmq::socket_t sock2(ctx, zmq::socket_type::pair);
66+
sock1.bind("inproc://test");
67+
sock2.connect("inproc://test");
68+
69+
std::array<zmq::const_buffer, 2> send_msgs = {
70+
zmq::str_buffer("foo"),
71+
zmq::str_buffer("bar!")
72+
};
73+
if (!zmq::send_multipart(sock1, send_msgs))
74+
return 1;
75+
76+
std::vector<zmq::message_t> recv_msgs;
77+
const auto ret = zmq::recv_multipart(
78+
sock2, std::back_inserter(recv_msgs));
79+
if (!ret)
80+
return 1;
81+
std::cout << "Got " << *ret
82+
<< " messages" << std::endl;
83+
return 0;
84+
}
85+
```
5786

5887
Contribution policy
5988
===================

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ add_executable(
2525
poller.cpp
2626
active_poller.cpp
2727
multipart.cpp
28+
recv_multipart.cpp
29+
send_multipart.cpp
2830
monitor.cpp
2931
utilities.cpp
3032
)

tests/recv_multipart.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#ifdef ZMQ_CPP11
2+
3+
#include <catch.hpp>
4+
#include <zmq_addon.hpp>
5+
6+
TEST_CASE("recv_multipart test", "[recv_multipart]")
7+
{
8+
zmq::context_t context(1);
9+
zmq::socket_t output(context, ZMQ_PAIR);
10+
zmq::socket_t input(context, ZMQ_PAIR);
11+
output.bind("inproc://multipart.test");
12+
input.connect("inproc://multipart.test");
13+
14+
SECTION("send 1 message")
15+
{
16+
input.send(zmq::str_buffer("hello"));
17+
18+
std::vector<zmq::message_t> msgs;
19+
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
20+
REQUIRE(ret);
21+
CHECK(*ret == 1);
22+
REQUIRE(msgs.size() == 1);
23+
CHECK(msgs[0].size() == 5);
24+
}
25+
SECTION("send 2 messages")
26+
{
27+
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
28+
input.send(zmq::str_buffer("world!"));
29+
30+
std::vector<zmq::message_t> msgs;
31+
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
32+
REQUIRE(ret);
33+
CHECK(*ret == 2);
34+
REQUIRE(msgs.size() == 2);
35+
CHECK(msgs[0].size() == 5);
36+
CHECK(msgs[1].size() == 6);
37+
}
38+
SECTION("send no messages, dontwait")
39+
{
40+
std::vector<zmq::message_t> msgs;
41+
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs), zmq::recv_flags::dontwait);
42+
CHECK_FALSE(ret);
43+
REQUIRE(msgs.size() == 0);
44+
}
45+
SECTION("send 1 partial message, dontwait")
46+
{
47+
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
48+
49+
std::vector<zmq::message_t> msgs;
50+
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs), zmq::recv_flags::dontwait);
51+
CHECK_FALSE(ret);
52+
REQUIRE(msgs.size() == 0);
53+
}
54+
SECTION("recv with invalid socket")
55+
{
56+
std::vector<zmq::message_t> msgs;
57+
CHECK_THROWS_AS(zmq::recv_multipart(zmq::socket_ref(), std::back_inserter(msgs)), const zmq::error_t &);
58+
}
59+
}
60+
#endif

tests/send_multipart.cpp

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#ifdef ZMQ_CPP11
2+
3+
#include <forward_list>
4+
#include <catch.hpp>
5+
#include <zmq_addon.hpp>
6+
7+
TEST_CASE("send_multipart test", "[send_multipart]")
8+
{
9+
zmq::context_t context(1);
10+
zmq::socket_t output(context, ZMQ_PAIR);
11+
zmq::socket_t input(context, ZMQ_PAIR);
12+
output.bind("inproc://multipart.test");
13+
input.connect("inproc://multipart.test");
14+
15+
SECTION("send 0 messages")
16+
{
17+
std::vector<zmq::message_t> imsgs;
18+
auto iret = zmq::send_multipart(input, imsgs);
19+
REQUIRE(iret);
20+
CHECK(*iret == 0);
21+
}
22+
SECTION("send 1 message")
23+
{
24+
std::array<zmq::message_t, 1> imsgs = {zmq::message_t(3)};
25+
auto iret = zmq::send_multipart(input, imsgs);
26+
REQUIRE(iret);
27+
CHECK(*iret == 1);
28+
29+
std::vector<zmq::message_t> omsgs;
30+
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
31+
REQUIRE(oret);
32+
CHECK(*oret == 1);
33+
REQUIRE(omsgs.size() == 1);
34+
CHECK(omsgs[0].size() == 3);
35+
}
36+
SECTION("send 2 messages")
37+
{
38+
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
39+
auto iret = zmq::send_multipart(input, imsgs);
40+
REQUIRE(iret);
41+
CHECK(*iret == 2);
42+
43+
std::vector<zmq::message_t> omsgs;
44+
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
45+
REQUIRE(oret);
46+
CHECK(*oret == 2);
47+
REQUIRE(omsgs.size() == 2);
48+
CHECK(omsgs[0].size() == 3);
49+
CHECK(omsgs[1].size() == 4);
50+
}
51+
SECTION("send 2 messages, const_buffer")
52+
{
53+
std::array<zmq::const_buffer, 2> imsgs = {zmq::str_buffer("foo"), zmq::str_buffer("bar!")};
54+
auto iret = zmq::send_multipart(input, imsgs);
55+
REQUIRE(iret);
56+
CHECK(*iret == 2);
57+
58+
std::vector<zmq::message_t> omsgs;
59+
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
60+
REQUIRE(oret);
61+
CHECK(*oret == 2);
62+
REQUIRE(omsgs.size() == 2);
63+
CHECK(omsgs[0].size() == 3);
64+
CHECK(omsgs[1].size() == 4);
65+
}
66+
SECTION("send 2 messages, mutable_buffer")
67+
{
68+
char buf[4] = {};
69+
std::array<zmq::mutable_buffer, 2> imsgs = {zmq::buffer(buf, 3), zmq::buffer(buf)};
70+
auto iret = zmq::send_multipart(input, imsgs);
71+
REQUIRE(iret);
72+
CHECK(*iret == 2);
73+
74+
std::vector<zmq::message_t> omsgs;
75+
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
76+
REQUIRE(oret);
77+
CHECK(*oret == 2);
78+
REQUIRE(omsgs.size() == 2);
79+
CHECK(omsgs[0].size() == 3);
80+
CHECK(omsgs[1].size() == 4);
81+
}
82+
SECTION("send 2 messages, dontwait")
83+
{
84+
zmq::socket_t push(context, ZMQ_PUSH);
85+
push.bind("inproc://multipart.test.push");
86+
87+
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
88+
auto iret = zmq::send_multipart(push, imsgs, zmq::send_flags::dontwait);
89+
REQUIRE_FALSE(iret);
90+
}
91+
SECTION("send, misc. containers")
92+
{
93+
std::vector<zmq::message_t> msgs_vec;
94+
msgs_vec.emplace_back(3);
95+
msgs_vec.emplace_back(4);
96+
auto iret = zmq::send_multipart(input, msgs_vec);
97+
REQUIRE(iret);
98+
CHECK(*iret == 2);
99+
100+
std::forward_list<zmq::message_t> msgs_list;
101+
msgs_list.emplace_front(4);
102+
msgs_list.emplace_front(3);
103+
iret = zmq::send_multipart(input, msgs_list);
104+
REQUIRE(iret);
105+
CHECK(*iret == 2);
106+
107+
// init. list
108+
const auto msgs_il = {zmq::str_buffer("foo"), zmq::str_buffer("bar!")};
109+
iret = zmq::send_multipart(input, msgs_il);
110+
REQUIRE(iret);
111+
CHECK(*iret == 2);
112+
// rvalue
113+
iret = zmq::send_multipart(input,
114+
std::initializer_list<zmq::const_buffer>{zmq::str_buffer("foo"), zmq::str_buffer("bar!")});
115+
REQUIRE(iret);
116+
CHECK(*iret == 2);
117+
}
118+
SECTION("send with invalid socket")
119+
{
120+
std::vector<zmq::message_t> msgs(1);
121+
CHECK_THROWS_AS(zmq::send_multipart(zmq::socket_ref(), msgs), const zmq::error_t &);
122+
}
123+
}
124+
#endif

zmq.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,15 @@ inline const_buffer buffer(const const_buffer& cb, size_t n) noexcept
971971

972972
namespace detail
973973
{
974+
975+
template<class T>
976+
struct is_buffer
977+
{
978+
static constexpr bool value =
979+
std::is_same<T, const_buffer>::value ||
980+
std::is_same<T, mutable_buffer>::value;
981+
};
982+
974983
template<class T> struct is_pod_like
975984
{
976985
// NOTE: The networking draft N4771 section 16.11 requires

zmq_addon.hpp

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,88 @@
3737

3838
namespace zmq
3939
{
40+
41+
#ifdef ZMQ_CPP11
42+
43+
/* Receive a multipart message.
44+
45+
Writes the zmq::message_t objects to OutputIterator out.
46+
The out iterator must handle an unspecified number of writes,
47+
e.g. by using std::back_inserter.
48+
49+
Returns: the number of messages received or nullopt (on EAGAIN).
50+
Throws: if recv throws. Any exceptions thrown
51+
by the out iterator will be propagated and the message
52+
may have been only partially received with pending
53+
message parts. It is adviced to close this socket in that event.
54+
*/
55+
template<class OutputIt>
56+
ZMQ_NODISCARD detail::recv_result_t recv_multipart(socket_ref s, OutputIt out,
57+
recv_flags flags = recv_flags::none)
58+
{
59+
size_t msg_count = 0;
60+
message_t msg;
61+
while (true)
62+
{
63+
if (!s.recv(msg, flags))
64+
{
65+
// zmq ensures atomic delivery of messages
66+
assert(msg_count == 0);
67+
return {};
68+
}
69+
++msg_count;
70+
const bool more = msg.more();
71+
*out++ = std::move(msg);
72+
if (!more)
73+
break;
74+
}
75+
return msg_count;
76+
}
77+
78+
/* Send a multipart message.
79+
80+
The range must be a ForwardRange of zmq::message_t,
81+
zmq::const_buffer or zmq::mutable_buffer.
82+
The flags may be zmq::send_flags::sndmore if there are
83+
more message parts to be sent after the call to this function.
84+
85+
Returns: the number of messages sent (exactly msgs.size()) or nullopt (on EAGAIN).
86+
Throws: if send throws. Any exceptions thrown
87+
by the msgs range will be propagated and the message
88+
may have been only partially sent. It is adviced to close this socket in that event.
89+
*/
90+
template<class Range,
91+
typename = typename std::enable_if<
92+
detail::is_range<Range>::value
93+
&& (std::is_same<detail::range_value_t<Range>, message_t>::value
94+
|| detail::is_buffer<detail::range_value_t<Range>>::value)
95+
>::type>
96+
detail::send_result_t send_multipart(socket_ref s, Range&& msgs,
97+
send_flags flags = send_flags::none)
98+
{
99+
using std::begin;
100+
using std::end;
101+
auto it = begin(msgs);
102+
const auto end_it = end(msgs);
103+
size_t msg_count = 0;
104+
while (it != end_it)
105+
{
106+
const auto next = std::next(it);
107+
const auto msg_flags = flags | (next == end_it ? send_flags::none : send_flags::sndmore);
108+
if (!s.send(*it, msg_flags))
109+
{
110+
// zmq ensures atomic delivery of messages
111+
assert(it == begin(msgs));
112+
return {};
113+
}
114+
++msg_count;
115+
it = next;
116+
}
117+
return msg_count;
118+
}
119+
120+
#endif
121+
40122
#ifdef ZMQ_HAS_RVALUE_REFS
41123

42124
/*

0 commit comments

Comments
 (0)