Skip to content

Commit a0185d9

Browse files
committed
Merge #18309: zmq: Add support to listen on multiple interfaces
e66870c zmq: Append address to notify log output (nthumann) 241803d test: Add zmq test to support multiple interfaces (nthumann) a0b2e5c doc: Add release notes to support multiple interfaces (nthumann) b1c3f18 doc: Adjust ZMQ usage to support multiple interfaces (nthumann) 347c94f zmq: Add support to listen on multiple interfaces (Nicolas Thumann) Pull request description: This PR adds support for ZeroMQ to listen on multiple interfaces, just like the RPC server. Currently, if you specify more than one e.g. `zmqpubhashblock` paramter, only the first one will be used. Therefore a user may be forced to listen on all interfaces (e.g. `zmqpubhashblock=0.0.0.0:28332`), which can result in an increased attack surface. With this PR a user can specify multiple interfaces to listen on, e.g. `-zmqpubhashblock=tcp://127.0.0.1:28332 -zmqpubhashblock=tcp://192.168.1.123:28332`. ACKs for top commit: laanwj: Code review ACK e66870c instagibbs: reACK bitcoin/bitcoin@e66870c Tree-SHA512: f38ab4a6ff00dc821e5f4842508cefadb701e70bb3893992c1b32049be20247c8aa9476a1f886050c5f17fe7f2ce99ee30193ce2c81a7482a5a51f8fc22300c7
2 parents 9fc2f01 + e66870c commit a0185d9

File tree

5 files changed

+40
-12
lines changed

5 files changed

+40
-12
lines changed

doc/release-notes-18309.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Command-line options
2+
-----------------------------
3+
4+
The same ZeroMQ notification (e.g. `-zmqpubhashtx=address`) can now be specified multiple times to publish the same notification to different ZeroMQ sockets.

doc/zmq.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ Currently, the following notifications are supported:
6767

6868
The socket type is PUB and the address must be a valid ZeroMQ socket
6969
address. The same address can be used in more than one notification.
70+
The same notification can be specified more than once.
7071

7172
The option to set the PUB socket's outbound message high water mark
7273
(SNDHWM) may be set individually for each notification:
@@ -82,6 +83,7 @@ The high water mark value must be an integer greater than or equal to 0.
8283
For instance:
8384

8485
$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \
86+
-zmqpubhashtx=tcp://192.168.1.2:28332 \
8587
-zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \
8688
-zmqpubhashtxhwm=10000
8789

src/zmq/zmqnotificationinterface.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,8 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
4242
for (const auto& entry : factories)
4343
{
4444
std::string arg("-zmq" + entry.first);
45-
if (gArgs.IsArgSet(arg))
46-
{
47-
const auto& factory = entry.second;
48-
const std::string address = gArgs.GetArg(arg, "");
45+
const auto& factory = entry.second;
46+
for (const std::string& address : gArgs.GetArgs(arg)) {
4947
std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
5048
notifier->SetType(entry.first);
5149
notifier->SetAddress(address);

src/zmq/zmqpublishnotifier.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void
180180
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
181181
{
182182
uint256 hash = pindex->GetBlockHash();
183-
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
183+
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
184184
char data[32];
185185
for (unsigned int i = 0; i < 32; i++)
186186
data[31 - i] = hash.begin()[i];
@@ -190,7 +190,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
190190
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
191191
{
192192
uint256 hash = transaction.GetHash();
193-
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
193+
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
194194
char data[32];
195195
for (unsigned int i = 0; i < 32; i++)
196196
data[31 - i] = hash.begin()[i];
@@ -199,7 +199,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
199199

200200
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
201201
{
202-
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
202+
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
203203

204204
const Consensus::Params& consensusParams = Params().GetConsensus();
205205
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
@@ -221,7 +221,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
221221
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
222222
{
223223
uint256 hash = transaction.GetHash();
224-
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
224+
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
225225
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
226226
ss << transaction;
227227
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
@@ -232,7 +232,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
232232
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
233233
{
234234
uint256 hash = pindex->GetBlockHash();
235-
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", hash.GetHex());
235+
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
236236
char data[sizeof(uint256)+1];
237237
for (unsigned int i = 0; i < sizeof(uint256); i++)
238238
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
@@ -243,7 +243,7 @@ bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
243243
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
244244
{
245245
uint256 hash = pindex->GetBlockHash();
246-
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", hash.GetHex());
246+
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
247247
char data[sizeof(uint256)+1];
248248
for (unsigned int i = 0; i < sizeof(uint256); i++)
249249
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
@@ -254,7 +254,7 @@ bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pinde
254254
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
255255
{
256256
uint256 hash = transaction.GetHash();
257-
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", hash.GetHex());
257+
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
258258
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
259259
for (unsigned int i = 0; i < sizeof(uint256); i++)
260260
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
@@ -266,7 +266,7 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction
266266
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
267267
{
268268
uint256 hash = transaction.GetHash();
269-
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", hash.GetHex());
269+
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
270270
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
271271
for (unsigned int i = 0; i < sizeof(uint256); i++)
272272
data[sizeof(uint256) - 1 - i] = hash.begin()[i];

test/functional/interface_zmq.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def run_test(self):
7575
self.test_sequence()
7676
self.test_mempool_sync()
7777
self.test_reorg()
78+
self.test_multiple_interfaces()
7879
finally:
7980
# Destroy the ZMQ context.
8081
self.log.debug("Destroying ZMQ context")
@@ -506,5 +507,28 @@ def test_mempool_sync(self):
506507

507508
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
508509

510+
def test_multiple_interfaces(self):
511+
# Set up two subscribers with different addresses
512+
subscribers = []
513+
for i in range(2):
514+
address = 'tcp://127.0.0.1:%d' % (28334 + i)
515+
socket = self.ctx.socket(zmq.SUB)
516+
socket.set(zmq.RCVTIMEO, 60000)
517+
hashblock = ZMQSubscriber(socket, b"hashblock")
518+
socket.connect(address)
519+
subscribers.append({'address': address, 'hashblock': hashblock})
520+
521+
self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers])
522+
523+
# Relax so that the subscriber is ready before publishing zmq messages
524+
sleep(0.2)
525+
526+
# Generate 1 block in nodes[0] and receive all notifications
527+
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
528+
529+
# Should receive the same block hash on both subscribers
530+
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex())
531+
assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex())
532+
509533
if __name__ == '__main__':
510534
ZMQTest().main()

0 commit comments

Comments
 (0)