Skip to content

Commit 9624442

Browse files
committed
zmq: deduplicate 'sequence' publisher message creation/sending
1 parent 7ae86b3 commit 9624442

File tree

1 file changed

+17
-23
lines changed

1 file changed

+17
-23
lines changed

src/zmq/zmqpublishnotifier.cpp

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <cstdarg>
1818
#include <cstddef>
1919
#include <map>
20+
#include <optional>
2021
#include <string>
2122
#include <utility>
2223

@@ -227,50 +228,43 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
227228
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
228229
}
229230

231+
// Helper function to send a 'sequence' topic message with the following structure:
232+
// <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional)
233+
static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {})
234+
{
235+
unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)];
236+
for (unsigned int i = 0; i < sizeof(hash); ++i) {
237+
data[sizeof(hash) - 1 - i] = hash.begin()[i];
238+
}
239+
data[sizeof(hash)] = label;
240+
if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence);
241+
return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label));
242+
}
230243

231-
// TODO: Dedup this code to take label char, log string
232244
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
233245
{
234246
uint256 hash = pindex->GetBlockHash();
235247
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
236-
char data[sizeof(uint256)+1];
237-
for (unsigned int i = 0; i < sizeof(uint256); i++)
238-
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
239-
data[sizeof(data) - 1] = 'C'; // Block (C)onnect
240-
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
248+
return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C');
241249
}
242250

243251
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
244252
{
245253
uint256 hash = pindex->GetBlockHash();
246254
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
247-
char data[sizeof(uint256)+1];
248-
for (unsigned int i = 0; i < sizeof(uint256); i++)
249-
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
250-
data[sizeof(data) - 1] = 'D'; // Block (D)isconnect
251-
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
255+
return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D');
252256
}
253257

254258
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
255259
{
256260
uint256 hash = transaction.GetHash();
257261
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
258-
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
259-
for (unsigned int i = 0; i < sizeof(uint256); i++)
260-
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
261-
data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance
262-
WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
263-
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
262+
return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence);
264263
}
265264

266265
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
267266
{
268267
uint256 hash = transaction.GetHash();
269268
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
270-
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
271-
for (unsigned int i = 0; i < sizeof(uint256); i++)
272-
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
273-
data[sizeof(uint256)] = 'R'; // Mempool (R)emoval
274-
WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
275-
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
269+
return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
276270
}

0 commit comments

Comments
 (0)