Skip to content

Commit dac2caa

Browse files
committed
Merge #14060: ZMQ: add options to configure outbound message high water mark, aka SNDHWM
a4edb16 ZMQ: add options to configure outbound message high water mark, aka SNDHWM (mruddy) Pull request description: ZMQ: add options to configure outbound message high water mark, aka SNDHWM This is my attempt at bitcoin/bitcoin#13315 Tree-SHA512: a4cc3bcf179776899261a97c8c4f31f35d1d8950fd71a09a79c5c064879b38e600b26824c89c4091d941502ed5b0255390882f7d44baf9e6dc49d685a86e8edb
2 parents 45f5006 + a4edb16 commit dac2caa

File tree

11 files changed

+59
-14
lines changed

11 files changed

+59
-14
lines changed

contrib/zmq/zmq_sub.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def __init__(self):
4242
self.zmqContext = zmq.asyncio.Context()
4343

4444
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
45+
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
4546
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
4647
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
4748
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")

contrib/zmq/zmq_sub3.4.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def __init__(self):
4646
self.zmqContext = zmq.asyncio.Context()
4747

4848
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
49+
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
4950
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
5051
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
5152
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")

doc/zmq.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,21 @@ Currently, the following notifications are supported:
6666
The socket type is PUB and the address must be a valid ZeroMQ socket
6767
address. The same address can be used in more than one notification.
6868

69+
The option to set the PUB socket's outbound message high water mark
70+
(SNDHWM) may be set individually for each notification:
71+
72+
-zmqpubhashtxhwm=n
73+
-zmqpubhashblockhwm=n
74+
-zmqpubrawblockhwm=n
75+
-zmqpubrawtxhwm=n
76+
77+
The high water mark value must be an integer greater than or equal to 0.
78+
6979
For instance:
7080

7181
$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \
72-
-zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw
82+
-zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \
83+
-zmqpubhashtxhwm=10000
7384

7485
Each PUB notification has a topic and body, where the header
7586
corresponds to the notification type. For instance, for the

src/init.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
#include <openssl/crypto.h>
6363

6464
#if ENABLE_ZMQ
65+
#include <zmq/zmqabstractnotifier.h>
6566
#include <zmq/zmqnotificationinterface.h>
6667
#include <zmq/zmqrpc.h>
6768
#endif
@@ -418,11 +419,19 @@ void SetupServerArgs()
418419
gArgs.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", false, OptionsCategory::ZMQ);
419420
gArgs.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", false, OptionsCategory::ZMQ);
420421
gArgs.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", false, OptionsCategory::ZMQ);
422+
gArgs.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
423+
gArgs.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
424+
gArgs.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
425+
gArgs.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
421426
#else
422427
hidden_args.emplace_back("-zmqpubhashblock=<address>");
423428
hidden_args.emplace_back("-zmqpubhashtx=<address>");
424429
hidden_args.emplace_back("-zmqpubrawblock=<address>");
425430
hidden_args.emplace_back("-zmqpubrawtx=<address>");
431+
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
432+
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
433+
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
434+
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
426435
#endif
427436

428437
gArgs.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), true, OptionsCategory::DEBUG_TEST);

src/zmq/zmqabstractnotifier.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <zmq/zmqabstractnotifier.h>
66
#include <util/system.h>
77

8+
const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM;
89

910
CZMQAbstractNotifier::~CZMQAbstractNotifier()
1011
{

src/zmq/zmqabstractnotifier.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
1515
class CZMQAbstractNotifier
1616
{
1717
public:
18-
CZMQAbstractNotifier() : psocket(nullptr) { }
18+
static const int DEFAULT_ZMQ_SNDHWM {1000};
19+
20+
CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { }
1921
virtual ~CZMQAbstractNotifier();
2022

2123
template <typename T>
@@ -28,6 +30,12 @@ class CZMQAbstractNotifier
2830
void SetType(const std::string &t) { type = t; }
2931
std::string GetAddress() const { return address; }
3032
void SetAddress(const std::string &a) { address = a; }
33+
int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; }
34+
void SetOutboundMessageHighWaterMark(const int sndhwm) {
35+
if (sndhwm >= 0) {
36+
outbound_message_high_water_mark = sndhwm;
37+
}
38+
}
3139

3240
virtual bool Initialize(void *pcontext) = 0;
3341
virtual void Shutdown() = 0;
@@ -39,6 +47,7 @@ class CZMQAbstractNotifier
3947
void *psocket;
4048
std::string type;
4149
std::string address;
50+
int outbound_message_high_water_mark; // aka SNDHWM
4251
};
4352

4453
#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H

src/zmq/zmqnotificationinterface.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
5959
CZMQAbstractNotifier *notifier = factory();
6060
notifier->SetType(entry.first);
6161
notifier->SetAddress(address);
62+
notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
6263
notifiers.push_back(notifier);
6364
}
6465
}
@@ -102,11 +103,11 @@ bool CZMQNotificationInterface::Initialize()
102103
CZMQAbstractNotifier *notifier = *i;
103104
if (notifier->Initialize(pcontext))
104105
{
105-
LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
106+
LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
106107
}
107108
else
108109
{
109-
LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
110+
LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
110111
break;
111112
}
112113
}
@@ -128,7 +129,7 @@ void CZMQNotificationInterface::Shutdown()
128129
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
129130
{
130131
CZMQAbstractNotifier *notifier = *i;
131-
LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
132+
LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
132133
notifier->Shutdown();
133134
}
134135
zmq_ctx_term(pcontext);

src/zmq/zmqpublishnotifier.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
7676
return false;
7777
}
7878

79-
int rc = zmq_bind(psocket, address.c_str());
80-
if (rc!=0)
79+
LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
80+
81+
int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
82+
if (rc != 0)
83+
{
84+
zmqError("Failed to set outbound message high water mark");
85+
zmq_close(psocket);
86+
return false;
87+
}
88+
89+
rc = zmq_bind(psocket, address.c_str());
90+
if (rc != 0)
8191
{
8292
zmqError("Failed to bind address");
8393
zmq_close(psocket);
@@ -120,7 +130,7 @@ void CZMQAbstractPublishNotifier::Shutdown()
120130

121131
if (count == 1)
122132
{
123-
LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
133+
LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
124134
int linger = 0;
125135
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
126136
zmq_close(psocket);

src/zmq/zmqpublishnotifier.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class CBlockIndex;
1212
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
1313
{
1414
private:
15-
uint32_t nSequence; //!< upcounting per message sequence number
15+
uint32_t nSequence {0U}; //!< upcounting per message sequence number
1616

1717
public:
1818

src/zmq/zmqrpc.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
2222
"[\n"
2323
" { (json object)\n"
2424
" \"type\": \"pubhashtx\", (string) Type of notification\n"
25-
" \"address\": \"...\" (string) Address of the publisher\n"
25+
" \"address\": \"...\", (string) Address of the publisher\n"
26+
" \"hwm\": n (numeric) Outbound message high water mark\n"
2627
" },\n"
2728
" ...\n"
2829
"]\n"
@@ -38,6 +39,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
3839
UniValue obj(UniValue::VOBJ);
3940
obj.pushKV("type", n->GetType());
4041
obj.pushKV("address", n->GetAddress());
42+
obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark());
4143
result.push_back(obj);
4244
}
4345
}

0 commit comments

Comments
 (0)