Skip to content

Commit 27c1a99

Browse files
committed
Remove all unnecessary callbacks
1 parent 1289dcb commit 27c1a99

24 files changed

+171
-157
lines changed

include/pulsar/Client.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class PULSAR_PUBLIC Client {
101101
* @param callback the callback that is triggered when the producer is created successfully or not
102102
* @param callback Callback function that is invoked when the operation is completed
103103
*/
104-
void createProducerAsync(const std::string& topic, CreateProducerCallback callback);
104+
void createProducerAsync(const std::string& topic, const CreateProducerCallback& callback);
105105

106106
/**
107107
* Asynchronously create a producer with the customized ProducerConfiguration for publishing on a specific
@@ -111,7 +111,7 @@ class PULSAR_PUBLIC Client {
111111
* @param conf the customized ProducerConfiguration
112112
*/
113113
void createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
114-
CreateProducerCallback callback);
114+
const CreateProducerCallback& callback);
115115

116116
/**
117117
* Subscribe to a given topic and subscription combination with the default ConsumerConfiguration
@@ -144,7 +144,7 @@ class PULSAR_PUBLIC Client {
144144
* default ConsumerConfiguration are asynchronously subscribed successfully or not
145145
*/
146146
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
147-
SubscribeCallback callback);
147+
const SubscribeCallback& callback);
148148

149149
/**
150150
* Asynchronously subscribe to a given topic and subscription combination with the customized
@@ -157,7 +157,7 @@ class PULSAR_PUBLIC Client {
157157
* customized ConsumerConfiguration are asynchronously subscribed successfully or not
158158
*/
159159
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
160-
const ConsumerConfiguration& conf, SubscribeCallback callback);
160+
const ConsumerConfiguration& conf, const SubscribeCallback& callback);
161161

162162
/**
163163
* Subscribe to multiple topics under the same namespace.
@@ -191,7 +191,7 @@ class PULSAR_PUBLIC Client {
191191
192192
*/
193193
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
194-
SubscribeCallback callback);
194+
const SubscribeCallback& callback);
195195

196196
/**
197197
* Asynchronously subscribe to a list of topics and subscription combination using the customized
@@ -204,7 +204,7 @@ class PULSAR_PUBLIC Client {
204204
* the customized ConsumerConfiguration are asynchronously subscribed successfully or not
205205
*/
206206
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
207-
const ConsumerConfiguration& conf, SubscribeCallback callback);
207+
const ConsumerConfiguration& conf, const SubscribeCallback& callback);
208208

209209
/**
210210
* Subscribe to multiple topics, which match given regexPattern, under the same namespace.
@@ -227,7 +227,7 @@ class PULSAR_PUBLIC Client {
227227
* SubscribeCallback)
228228
*/
229229
void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
230-
SubscribeCallback callback);
230+
const SubscribeCallback& callback);
231231

232232
/**
233233
* Asynchronously subscribe to multiple topics (which match given regexPatterns) with the customized
@@ -240,7 +240,7 @@ class PULSAR_PUBLIC Client {
240240
* ConsumerConfiguration under the same namespace are asynchronously subscribed successfully or not
241241
*/
242242
void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
243-
const ConsumerConfiguration& conf, SubscribeCallback callback);
243+
const ConsumerConfiguration& conf, const SubscribeCallback& callback);
244244

245245
/**
246246
* Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
@@ -301,7 +301,7 @@ class PULSAR_PUBLIC Client {
301301
* @return the Reader object
302302
*/
303303
void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
304-
const ReaderConfiguration& conf, ReaderCallback callback);
304+
const ReaderConfiguration& conf, const ReaderCallback& callback);
305305

306306
/**
307307
* Create a table view with given {@code TableViewConfiguration} for specified topic.
@@ -363,7 +363,7 @@ class PULSAR_PUBLIC Client {
363363
* the callback that will be invoked when the list of partitions is available
364364
* @since 2.3.0
365365
*/
366-
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
366+
void getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback);
367367

368368
/**
369369
*
@@ -380,7 +380,7 @@ class PULSAR_PUBLIC Client {
380380
* @param callback the callback that is triggered when the Pulsar client is asynchronously closed
381381
* successfully or not
382382
*/
383-
void closeAsync(CloseCallback callback);
383+
void closeAsync(const CloseCallback& callback);
384384

385385
/**
386386
* Perform immediate shutdown of Pulsar client.

lib/AckGroupingTracker.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ namespace pulsar {
3434

3535
DECLARE_LOG_OBJECT();
3636

37-
void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback callback,
37+
void AckGroupingTracker::doImmediateAck(const MessageId& msgId, const ResultCallback& callback,
3838
CommandAck_AckType ackType) const {
3939
const auto cnx = connectionSupplier_();
4040
if (!cnx) {
@@ -87,7 +87,8 @@ static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msg
8787
return os;
8888
}
8989

90-
void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const {
90+
void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds,
91+
const ResultCallback& callback) const {
9192
const auto cnx = connectionSupplier_();
9293
if (!cnx) {
9394
LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);

lib/AckGroupingTracker.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
100100
virtual void flushAndClean() {}
101101

102102
protected:
103-
void doImmediateAck(const MessageId& msgId, ResultCallback callback, CommandAck_AckType ackType) const;
104-
void doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const;
103+
void doImmediateAck(const MessageId& msgId, const ResultCallback& callback,
104+
CommandAck_AckType ackType) const;
105+
void doImmediateAck(const std::set<MessageId>& msgIds, const ResultCallback& callback) const;
105106

106107
private:
107108
const std::function<ClientConnectionPtr()> connectionSupplier_;

lib/Client.cc

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ Result Client::createProducer(const std::string& topic, const ProducerConfigurat
5454
return future.get(producer);
5555
}
5656

57-
void Client::createProducerAsync(const std::string& topic, CreateProducerCallback callback) {
58-
createProducerAsync(topic, ProducerConfiguration(), std::move(callback));
57+
void Client::createProducerAsync(const std::string& topic, const CreateProducerCallback& callback) {
58+
createProducerAsync(topic, ProducerConfiguration(), callback);
5959
}
6060

6161
void Client::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
62-
CreateProducerCallback callback) {
63-
impl_->createProducerAsync(topic, conf, std::move(callback));
62+
const CreateProducerCallback& callback) {
63+
impl_->createProducerAsync(topic, conf, callback);
6464
}
6565

6666
Result Client::subscribe(const std::string& topic, const std::string& subscriptionName, Consumer& consumer) {
@@ -77,14 +77,14 @@ Result Client::subscribe(const std::string& topic, const std::string& subscripti
7777
}
7878

7979
void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
80-
SubscribeCallback callback) {
81-
subscribeAsync(topic, subscriptionName, ConsumerConfiguration(), std::move(callback));
80+
const SubscribeCallback& callback) {
81+
subscribeAsync(topic, subscriptionName, ConsumerConfiguration(), callback);
8282
}
8383

8484
void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
85-
const ConsumerConfiguration& conf, SubscribeCallback callback) {
85+
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
8686
LOG_INFO("Subscribing on Topic :" << topic);
87-
impl_->subscribeAsync(topic, subscriptionName, conf, std::move(callback));
87+
impl_->subscribeAsync(topic, subscriptionName, conf, callback);
8888
}
8989

9090
Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
@@ -102,13 +102,13 @@ Result Client::subscribe(const std::vector<std::string>& topics, const std::stri
102102
}
103103

104104
void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
105-
SubscribeCallback callback) {
106-
subscribeAsync(topics, subscriptionName, ConsumerConfiguration(), std::move(callback));
105+
const SubscribeCallback& callback) {
106+
subscribeAsync(topics, subscriptionName, ConsumerConfiguration(), callback);
107107
}
108108

109109
void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
110-
const ConsumerConfiguration& conf, SubscribeCallback callback) {
111-
impl_->subscribeAsync(topics, subscriptionName, conf, std::move(callback));
110+
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
111+
impl_->subscribeAsync(topics, subscriptionName, conf, callback);
112112
}
113113

114114
Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName,
@@ -126,13 +126,13 @@ Result Client::subscribeWithRegex(const std::string& regexPattern, const std::st
126126
}
127127

128128
void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
129-
SubscribeCallback callback) {
130-
subscribeWithRegexAsync(regexPattern, subscriptionName, ConsumerConfiguration(), std::move(callback));
129+
const SubscribeCallback& callback) {
130+
subscribeWithRegexAsync(regexPattern, subscriptionName, ConsumerConfiguration(), callback);
131131
}
132132

133133
void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
134-
const ConsumerConfiguration& conf, SubscribeCallback callback) {
135-
impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, std::move(callback));
134+
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
135+
impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, callback);
136136
}
137137

138138
Result Client::createReader(const std::string& topic, const MessageId& startMessageId,
@@ -145,8 +145,8 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess
145145
}
146146

147147
void Client::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
148-
const ReaderConfiguration& conf, ReaderCallback callback) {
149-
impl_->createReaderAsync(topic, startMessageId, conf, std::move(callback));
148+
const ReaderConfiguration& conf, const ReaderCallback& callback) {
149+
impl_->createReaderAsync(topic, startMessageId, conf, callback);
150150
}
151151

152152
Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf,
@@ -171,8 +171,8 @@ Result Client::getPartitionsForTopic(const std::string& topic, std::vector<std::
171171
return future.get(partitions);
172172
}
173173

174-
void Client::getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback) {
175-
impl_->getPartitionsForTopicAsync(topic, std::move(callback));
174+
void Client::getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback) {
175+
impl_->getPartitionsForTopicAsync(topic, callback);
176176
}
177177

178178
Result Client::close() {
@@ -184,7 +184,7 @@ Result Client::close() {
184184
return result;
185185
}
186186

187-
void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(std::move(callback)); }
187+
void Client::closeAsync(const CloseCallback& callback) { impl_->closeAsync(callback); }
188188

189189
void Client::shutdown() { impl_->shutdown(); }
190190

lib/ClientConnection.cc

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ static bool file_exists(const std::string& path) {
160160
std::atomic<int32_t> ClientConnection::maxMessageSize_{Commands::DefaultMaxMessageSize};
161161

162162
ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
163-
ExecutorServicePtr executor,
163+
const ExecutorServicePtr& executor,
164164
const ClientConfiguration& clientConfiguration,
165165
const AuthenticationPtr& authentication, const std::string& clientVersion,
166166
ConnectionPool& pool, size_t poolIndex)
@@ -223,7 +223,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
223223
} else {
224224
ctx.set_verify_mode(ASIO::ssl::context::verify_peer);
225225

226-
std::string trustCertFilePath = clientConfiguration.getTlsTrustCertsFilePath();
226+
const auto& trustCertFilePath = clientConfiguration.getTlsTrustCertsFilePath();
227227
if (!trustCertFilePath.empty()) {
228228
if (file_exists(trustCertFilePath)) {
229229
ctx.load_verify_file(trustCertFilePath);
@@ -605,15 +605,16 @@ void ClientConnection::tcpConnectAsync() {
605605
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
606606
tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
607607
auto weakSelf = weak_from_this();
608-
resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) {
609-
auto self = weakSelf.lock();
610-
if (self) {
611-
self->handleResolve(err, iterator);
612-
}
613-
});
608+
resolver_->async_resolve(query,
609+
[weakSelf](const ASIO_ERROR& err, const tcp::resolver::iterator& iterator) {
610+
auto self = weakSelf.lock();
611+
if (self) {
612+
self->handleResolve(err, iterator);
613+
}
614+
});
614615
}
615616

616-
void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
617+
void ClientConnection::handleResolve(const ASIO_ERROR& err, const tcp::resolver::iterator& endpointIterator) {
617618
if (err) {
618619
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
619620
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
@@ -1033,18 +1034,18 @@ Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint6
10331034
}
10341035

10351036
void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative,
1036-
const std::string& listenerName, const uint64_t requestId,
1037-
LookupDataResultPromisePtr promise) {
1037+
const std::string& listenerName, uint64_t requestId,
1038+
const LookupDataResultPromisePtr& promise) {
10381039
newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise);
10391040
}
10401041

1041-
void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, const uint64_t requestId,
1042-
LookupDataResultPromisePtr promise) {
1042+
void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId,
1043+
const LookupDataResultPromisePtr& promise) {
10431044
newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, promise);
10441045
}
10451046

1046-
void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t requestId,
1047-
LookupDataResultPromisePtr promise) {
1047+
void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
1048+
const LookupDataResultPromisePtr& promise) {
10481049
Lock lock(mutex_);
10491050
std::shared_ptr<LookupDataResultPtr> lookupDataResult;
10501051
lookupDataResult = std::make_shared<LookupDataResultPtr>();
@@ -1216,20 +1217,22 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm
12161217
return requestData.promise.getFuture();
12171218
}
12181219

1219-
void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec, PendingRequestData pendingRequestData) {
1220+
void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec,
1221+
const PendingRequestData& pendingRequestData) {
12201222
if (!ec && !pendingRequestData.hasGotResponse->load()) {
12211223
pendingRequestData.promise.setFailed(ResultTimeout);
12221224
}
12231225
}
12241226

1225-
void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec, LookupRequestData pendingRequestData) {
1227+
void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec,
1228+
const LookupRequestData& pendingRequestData) {
12261229
if (!ec) {
12271230
pendingRequestData.promise->setFailed(ResultTimeout);
12281231
}
12291232
}
12301233

12311234
void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
1232-
ClientConnection::LastMessageIdRequestData data) {
1235+
const ClientConnection::LastMessageIdRequestData& data) {
12331236
if (!ec) {
12341237
data.promise->setFailed(ResultTimeout);
12351238
}
@@ -1267,7 +1270,7 @@ void ClientConnection::handleKeepAliveTimeout() {
12671270
}
12681271

12691272
void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,
1270-
std::vector<uint64_t> consumerStatsRequests) {
1273+
const std::vector<uint64_t>& consumerStatsRequests) {
12711274
if (ec) {
12721275
LOG_DEBUG(cnxString_ << " Ignoring timer cancelled event, code[" << ec << "]");
12731276
return;

0 commit comments

Comments
 (0)