Skip to content

Commit 6a222d2

Browse files
authored
Merge pull request #175 from jachiang/master
zmq socket subscription filter methods added.
2 parents a147fb3 + 86141f4 commit 6a222d2

File tree

4 files changed

+91
-7
lines changed

4 files changed

+91
-7
lines changed

include/bitcoin/protocol/zmq/socket.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,11 @@ class BCP_API socket
115115
/// Configure the socket to connect through the specified socks5 proxy.
116116
bool set_socks_proxy(const config::authority& socks_proxy);
117117

118-
/////// Configure subscriber socket to apply the message filter.
119-
////bool set_subscription(const data_chunk& filter);
118+
/// Configure subscriber socket to apply the message filter.
119+
bool set_subscription(const data_chunk& filter);
120120

121-
/////// Configure subscriber socket to remove the message filter.
122-
////bool set_unsubscription(const data_chunk& filter);
121+
/// Configure subscriber socket to remove the message filter.
122+
bool set_unsubscription(const data_chunk& filter);
123123

124124
/// Send a message on this socket.
125125
code send(message& packet);
@@ -133,6 +133,7 @@ class BCP_API socket
133133
bool set32(int32_t option, int32_t value);
134134
bool set64(int32_t option, int64_t value);
135135
bool set(int32_t option, const std::string& value);
136+
bool set(int32_t option, const data_chunk& value);
136137

137138
private:
138139
void* self_;

src/zmq/socket.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
namespace libbitcoin {
3434
namespace protocol {
3535
namespace zmq {
36-
36+
3737
static const auto subscribe_all = "";
3838
static constexpr int32_t zmq_true = 1;
3939
static constexpr int32_t zmq_false = 0;
@@ -42,7 +42,7 @@ static constexpr int32_t reconnect_interval = 100;
4242
static const bc::protocol::settings default_settings;
4343

4444
// Linger
45-
// The default value of -1 specifies an infinite linger period. Pending
45+
// The default value of -1 specifies an infinite linger period. Pending
4646
// messages shall not be discarded after a call to zmq_close(); attempting to
4747
// terminate the socket's context with zmq_term() shall block until all pending
4848
// messages have been sent to a peer. The value 0 specifies no linger period.
@@ -241,6 +241,13 @@ bool socket::set(int32_t option, const std::string& value)
241241
return zmq_setsockopt(self_, option, buffer, value.size()) != zmq_fail;
242242
}
243243

244+
// private
245+
bool socket::set(int32_t option, const data_chunk& value)
246+
{
247+
return zmq_setsockopt(self_, option, value.data(), value.size())
248+
!= zmq_fail;
249+
}
250+
244251
// For NULL security, ZAP calls are only made for non-empty domain.
245252
// For PLAIN/CURVE, calls are always made if ZAP handler is present.
246253
bool socket::set_authentication_domain(const std::string& domain)
@@ -288,6 +295,16 @@ bool socket::set_socks_proxy(const config::authority& socks_proxy)
288295
return socks_proxy && set(ZMQ_SOCKS_PROXY, socks_proxy.to_string());
289296
}
290297

298+
bool socket::set_subscription(const data_chunk& filter)
299+
{
300+
return set(ZMQ_SUBSCRIBE, filter);
301+
}
302+
303+
bool socket::set_unsubscription(const data_chunk& filter)
304+
{
305+
return set(ZMQ_UNSUBSCRIBE, filter);
306+
}
307+
291308
code socket::send(message& packet)
292309
{
293310
return packet.send(*this);

test/utility.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
#define TEST_DOMAIN "testing"
2828
#define TEST_MESSAGE "hello world!"
29+
#define TEST_TOPIC "hello"
2930
#define TEST_HOST "127.0.0.1"
3031
#define TEST_HOST_BAD "127.0.0.42"
3132
#define TEST_PUBLIC_ENDPOINT "tcp://" TEST_HOST ":9000"
@@ -75,4 +76,4 @@ class simple_thread
7576
}
7677
};
7778

78-
#endif
79+
#endif

test/zmq/socket.cpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,71 @@ BOOST_AUTO_TEST_CASE(socket__pub_sub__grasslands_asynchronous_connect_first__rec
406406
SEND_MESSAGES_UNTIL(publisher, received);
407407
}
408408

409+
// Default receive-all filter is removed. No message received.
410+
BOOST_AUTO_TEST_CASE(socket__pub_sub__grasslands_asynchronous__no_subscription)
411+
{
412+
zmq::context context;
413+
BOOST_REQUIRE(context);
414+
415+
std::promise<bool> received;
416+
417+
simple_thread publisher_thread([&]()
418+
{
419+
zmq::socket publisher(context, role::publisher);
420+
BOOST_REQUIRE(publisher);
421+
BC_REQUIRE_SUCCESS(publisher.bind({ TEST_PUBLIC_ENDPOINT }));
422+
SEND_MESSAGES_UNTIL(publisher, received); //begin sending now, needs own thread.
423+
});
424+
425+
simple_thread subscriber_thread([&]()
426+
{
427+
zmq::socket subscriber(context, role::subscriber);
428+
BOOST_REQUIRE(subscriber);
429+
430+
data_chunk subscribe_all;
431+
BOOST_REQUIRE(subscriber.set_unsubscription(subscribe_all));
432+
433+
BC_REQUIRE_SUCCESS(subscriber.connect({ TEST_PUBLIC_ENDPOINT }));
434+
435+
RECEIVE_FAILURE(subscriber);
436+
received.set_value(true);
437+
});
438+
}
439+
440+
// Default receive-all filter is removed. "hello" filter is added.
441+
BOOST_AUTO_TEST_CASE(socket__pub_sub__grasslands_asynchronous__hello_subscription_only)
442+
{
443+
zmq::context context;
444+
BOOST_REQUIRE(context);
445+
446+
std::promise<bool> received;
447+
448+
simple_thread publisher_thread([&]()
449+
{
450+
zmq::socket publisher(context, role::publisher);
451+
BOOST_REQUIRE(publisher);
452+
BC_REQUIRE_SUCCESS(publisher.bind({ TEST_PUBLIC_ENDPOINT }));
453+
SEND_MESSAGES_UNTIL(publisher, received);
454+
});
455+
456+
simple_thread subscriber_thread([&]()
457+
{
458+
zmq::socket subscriber(context, role::subscriber);
459+
BOOST_REQUIRE(subscriber);
460+
461+
data_chunk subscribe_all;
462+
BOOST_REQUIRE(subscriber.set_unsubscription(subscribe_all));
463+
464+
std::string topic = TEST_TOPIC;
465+
BOOST_REQUIRE(subscriber.set_subscription(to_chunk(topic)));
466+
467+
BC_REQUIRE_SUCCESS(subscriber.connect({ TEST_PUBLIC_ENDPOINT }));
468+
469+
RECEIVE_MESSAGE(subscriber);
470+
received.set_value(true);
471+
});
472+
}
473+
409474
BOOST_AUTO_TEST_CASE(socket__xpub_xsub__grasslands_two_threads__subscribed)
410475
{
411476
zmq::context context;

0 commit comments

Comments
 (0)