4
4
5
5
#include < zmq/zmqnotificationinterface.h>
6
6
#include < zmq/zmqpublishnotifier.h>
7
+ #include < zmq/zmqutil.h>
8
+
9
+ #include < zmq.h>
7
10
8
11
#include < validation.h>
9
12
#include < util/system.h>
10
13
11
- void zmqError (const char *str)
12
- {
13
- LogPrint (BCLog::ZMQ, " zmq: Error: %s, errno=%s\n " , str, zmq_strerror (errno));
14
- }
15
-
16
14
CZMQNotificationInterface::CZMQNotificationInterface () : pcontext(nullptr )
17
15
{
18
16
}
19
17
20
18
CZMQNotificationInterface::~CZMQNotificationInterface ()
21
19
{
22
20
Shutdown ();
23
-
24
- for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin (); i!=notifiers.end (); ++i)
25
- {
26
- delete *i;
27
- }
28
21
}
29
22
30
23
std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers () const
31
24
{
32
25
std::list<const CZMQAbstractNotifier*> result;
33
- for (const auto * n : notifiers) {
34
- result.push_back (n);
26
+ for (const auto & n : notifiers) {
27
+ result.push_back (n. get () );
35
28
}
36
29
return result;
37
30
}
38
31
39
32
CZMQNotificationInterface* CZMQNotificationInterface::Create ()
40
33
{
41
- CZMQNotificationInterface* notificationInterface = nullptr ;
42
34
std::map<std::string, CZMQNotifierFactory> factories;
43
- std::list<CZMQAbstractNotifier*> notifiers;
44
-
45
35
factories[" pubhashblock" ] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
46
36
factories[" pubhashtx" ] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
47
37
factories[" pubrawblock" ] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
48
38
factories[" pubrawtx" ] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
49
39
40
+ std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
50
41
for (const auto & entry : factories)
51
42
{
52
43
std::string arg (" -zmq" + entry.first );
53
44
if (gArgs .IsArgSet (arg))
54
45
{
55
- CZMQNotifierFactory factory = entry.second ;
56
- std::string address = gArgs .GetArg (arg, " " );
57
- CZMQAbstractNotifier * notifier = factory ();
46
+ const auto & factory = entry.second ;
47
+ const std::string address = gArgs .GetArg (arg, " " );
48
+ std::unique_ptr< CZMQAbstractNotifier> notifier = factory ();
58
49
notifier->SetType (entry.first );
59
50
notifier->SetAddress (address);
60
51
notifier->SetOutboundMessageHighWaterMark (static_cast <int >(gArgs .GetArg (arg + " hwm" , CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
61
- notifiers.push_back (notifier);
52
+ notifiers.push_back (std::move ( notifier) );
62
53
}
63
54
}
64
55
65
56
if (!notifiers.empty ())
66
57
{
67
- notificationInterface = new CZMQNotificationInterface ();
68
- notificationInterface->notifiers = notifiers;
58
+ std::unique_ptr<CZMQNotificationInterface> notificationInterface ( new CZMQNotificationInterface () );
59
+ notificationInterface->notifiers = std::move ( notifiers) ;
69
60
70
- if (!notificationInterface->Initialize ())
71
- {
72
- delete notificationInterface;
73
- notificationInterface = nullptr ;
61
+ if (notificationInterface->Initialize ()) {
62
+ return notificationInterface.release ();
74
63
}
75
64
}
76
65
77
- return notificationInterface ;
66
+ return nullptr ;
78
67
}
79
68
80
69
// Called at startup to conditionally set up ZMQ socket(s)
@@ -95,26 +84,15 @@ bool CZMQNotificationInterface::Initialize()
95
84
return false ;
96
85
}
97
86
98
- std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin ();
99
- for (; i!=notifiers.end (); ++i)
100
- {
101
- CZMQAbstractNotifier *notifier = *i;
102
- if (notifier->Initialize (pcontext))
103
- {
87
+ for (auto & notifier : notifiers) {
88
+ if (notifier->Initialize (pcontext)) {
104
89
LogPrint (BCLog::ZMQ, " zmq: Notifier %s ready (address = %s)\n " , notifier->GetType (), notifier->GetAddress ());
105
- }
106
- else
107
- {
90
+ } else {
108
91
LogPrint (BCLog::ZMQ, " zmq: Notifier %s failed (address = %s)\n " , notifier->GetType (), notifier->GetAddress ());
109
- break ;
92
+ return false ;
110
93
}
111
94
}
112
95
113
- if (i!=notifiers.end ())
114
- {
115
- return false ;
116
- }
117
-
118
96
return true ;
119
97
}
120
98
@@ -124,9 +102,7 @@ void CZMQNotificationInterface::Shutdown()
124
102
LogPrint (BCLog::ZMQ, " zmq: Shutdown notification interface\n " );
125
103
if (pcontext)
126
104
{
127
- for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin (); i!=notifiers.end (); ++i)
128
- {
129
- CZMQAbstractNotifier *notifier = *i;
105
+ for (auto & notifier : notifiers) {
130
106
LogPrint (BCLog::ZMQ, " zmq: Shutdown notifier %s at %s\n " , notifier->GetType (), notifier->GetAddress ());
131
107
notifier->Shutdown ();
132
108
}
@@ -136,45 +112,43 @@ void CZMQNotificationInterface::Shutdown()
136
112
}
137
113
}
138
114
139
- void CZMQNotificationInterface::UpdatedBlockTip (const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload )
140
- {
141
- if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
142
- return ;
115
+ namespace {
143
116
144
- for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin (); i!=notifiers.end (); )
145
- {
146
- CZMQAbstractNotifier *notifier = *i;
147
- if (notifier->NotifyBlock (pindexNew))
148
- {
149
- i++;
150
- }
151
- else
152
- {
117
+ template <typename Function>
118
+ void TryForEachAndRemoveFailed (std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
119
+ {
120
+ for (auto i = notifiers.begin (); i != notifiers.end (); ) {
121
+ CZMQAbstractNotifier* notifier = i->get ();
122
+ if (func (notifier)) {
123
+ ++i;
124
+ } else {
153
125
notifier->Shutdown ();
154
126
i = notifiers.erase (i);
155
127
}
156
128
}
157
129
}
158
130
131
+ } // anonymous namespace
132
+
133
+ void CZMQNotificationInterface::UpdatedBlockTip (const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload )
134
+ {
135
+ if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
136
+ return ;
137
+
138
+ TryForEachAndRemoveFailed (notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
139
+ return notifier->NotifyBlock (pindexNew);
140
+ });
141
+ }
142
+
159
143
void CZMQNotificationInterface::TransactionAddedToMempool (const CTransactionRef& ptx)
160
144
{
161
145
// Used by BlockConnected and BlockDisconnected as well, because they're
162
146
// all the same external callback.
163
147
const CTransaction& tx = *ptx;
164
148
165
- for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin (); i!=notifiers.end (); )
166
- {
167
- CZMQAbstractNotifier *notifier = *i;
168
- if (notifier->NotifyTransaction (tx))
169
- {
170
- i++;
171
- }
172
- else
173
- {
174
- notifier->Shutdown ();
175
- i = notifiers.erase (i);
176
- }
177
- }
149
+ TryForEachAndRemoveFailed (notifiers, [&tx](CZMQAbstractNotifier* notifier) {
150
+ return notifier->NotifyTransaction (tx);
151
+ });
178
152
}
179
153
180
154
void CZMQNotificationInterface::BlockConnected (const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
0 commit comments