Skip to content

Commit d1dd08b

Browse files
authored
[feat] PIP-307 added assigned broker urls for CloseProudcer and CloseConsumer commands and handler logic (#389)
Master Issue: #[pip-307](https://github.com/apache/pulsar/blob/master/pip/pip-307.md) ### Motivation As part of PIP-307, I am adding the client logic to this c++ client. ### Modifications - Added assigned broker urls to CloseProudcer and CloseConsumer commands. - Updated the client reconnection logic to directly connect to the assigned broker urls ### Verifying this change - Added ExtensibleLoadManagerTest to cover this logic
1 parent e13bd6d commit d1dd08b

18 files changed

+492
-21
lines changed

lib/BinaryProtoLookupService.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
8787
<< ", from " << cnx->cnxString());
8888
if (data->shouldProxyThroughServiceUrl()) {
8989
// logicalAddress is the proxy's address, we should still connect through proxy
90-
promise->setValue({responseBrokerAddress, address});
90+
promise->setValue({responseBrokerAddress, address, true});
9191
} else {
92-
promise->setValue({responseBrokerAddress, responseBrokerAddress});
92+
promise->setValue({responseBrokerAddress, responseBrokerAddress, false});
9393
}
9494
}
9595
});

lib/ClientConnection.cc

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1762,6 +1762,30 @@ void ClientConnection::handleError(const proto::CommandError& error) {
17621762
}
17631763
}
17641764

1765+
boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
1766+
const proto::CommandCloseProducer& closeProducer) {
1767+
if (tlsSocket_) {
1768+
if (closeProducer.has_assignedbrokerserviceurltls()) {
1769+
return closeProducer.assignedbrokerserviceurltls();
1770+
}
1771+
} else if (closeProducer.has_assignedbrokerserviceurl()) {
1772+
return closeProducer.assignedbrokerserviceurl();
1773+
}
1774+
return boost::none;
1775+
}
1776+
1777+
boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
1778+
const proto::CommandCloseConsumer& closeConsumer) {
1779+
if (tlsSocket_) {
1780+
if (closeConsumer.has_assignedbrokerserviceurltls()) {
1781+
return closeConsumer.assignedbrokerserviceurltls();
1782+
}
1783+
} else if (closeConsumer.has_assignedbrokerserviceurl()) {
1784+
return closeConsumer.assignedbrokerserviceurl();
1785+
}
1786+
return boost::none;
1787+
}
1788+
17651789
void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& closeProducer) {
17661790
int producerId = closeProducer.producer_id();
17671791

@@ -1775,7 +1799,8 @@ void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& cl
17751799
lock.unlock();
17761800

17771801
if (producer) {
1778-
producer->disconnectProducer();
1802+
auto assignedBrokerServiceUrl = getAssignedBrokerServiceUrl(closeProducer);
1803+
producer->disconnectProducer(assignedBrokerServiceUrl);
17791804
}
17801805
} else {
17811806
LOG_ERROR(cnxString_ << "Got invalid producer Id in closeProducer command: " << producerId);
@@ -1795,7 +1820,8 @@ void ClientConnection::handleCloseConsumer(const proto::CommandCloseConsumer& cl
17951820
lock.unlock();
17961821

17971822
if (consumer) {
1798-
consumer->disconnectConsumer();
1823+
auto assignedBrokerServiceUrl = getAssignedBrokerServiceUrl(closeconsumer);
1824+
consumer->disconnectConsumer(assignedBrokerServiceUrl);
17991825
}
18001826
} else {
18011827
LOG_ERROR(cnxString_ << "Got invalid consumer Id in closeConsumer command: " << consumerId);

lib/ClientConnection.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,10 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
421421
void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
422422
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
423423
void handleAckResponse(const proto::CommandAckResponse&);
424+
boost::optional<std::string> getAssignedBrokerServiceUrl(
425+
const proto::CommandCloseProducer& closeProducer);
426+
boost::optional<std::string> getAssignedBrokerServiceUrl(
427+
const proto::CommandCloseConsumer& closeConsumer);
424428
};
425429
} // namespace pulsar
426430

lib/ClientImpl.cc

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
9191
ClientImpl::getClientVersion(clientConfiguration)),
9292
producerIdGenerator_(0),
9393
consumerIdGenerator_(0),
94-
closingError(ResultOk) {
94+
closingError(ResultOk),
95+
useProxy_(false),
96+
lookupCount_(0L) {
9597
std::unique_ptr<LoggerFactory> loggerFactory = clientConfiguration_.impl_->takeLogger();
9698
if (loggerFactory) {
9799
LogUtils::setLoggerFactory(std::move(loggerFactory));
@@ -532,6 +534,8 @@ Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string&
532534
promise.setFailed(result);
533535
return;
534536
}
537+
useProxy_ = data.proxyThroughServiceUrl;
538+
lookupCount_++;
535539
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress, key)
536540
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
537541
if (result == ResultOk) {
@@ -550,6 +554,33 @@ Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string&
550554
return promise.getFuture();
551555
}
552556

557+
const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddress) {
558+
if (useProxy_) {
559+
return serviceNameResolver_.resolveHost();
560+
} else {
561+
return logicalAddress;
562+
}
563+
}
564+
565+
Future<Result, ClientConnectionPtr> ClientImpl::connect(const std::string& logicalAddress, size_t key) {
566+
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
567+
Promise<Result, ClientConnectionPtr> promise;
568+
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
569+
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
570+
if (result == ResultOk) {
571+
auto cnx = weakCnx.lock();
572+
if (cnx) {
573+
promise.setValue(cnx);
574+
} else {
575+
promise.setFailed(ResultConnectError);
576+
}
577+
} else {
578+
promise.setFailed(result);
579+
}
580+
});
581+
return promise.getFuture();
582+
}
583+
553584
void ClientImpl::handleGetPartitions(const Result result, const LookupDataResultPtr partitionMetadata,
554585
TopicNamePtr topicName, GetPartitionsCallback callback) {
555586
if (result != ResultOk) {
@@ -635,6 +666,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
635666
if (*numberOfOpenHandlers == 0 && callback) {
636667
handleClose(ResultOk, numberOfOpenHandlers, callback);
637668
}
669+
lookupCount_ = 0;
638670
}
639671

640672
void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, ResultCallback callback) {
@@ -722,6 +754,7 @@ void ClientImpl::shutdown() {
722754
partitionListenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
723755
timeoutProcessor.tok();
724756
LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
757+
lookupCount_ = 0;
725758
}
726759

727760
uint64_t ClientImpl::newProducerId() {

lib/ClientImpl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9797

9898
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);
9999

100+
Future<Result, ClientConnectionPtr> connect(const std::string& logicalAddress, size_t key);
101+
100102
void closeAsync(CloseCallback callback);
101103
void shutdown();
102104

@@ -124,6 +126,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
124126
std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { return requestIdGenerator_; }
125127

126128
ConnectionPool& getConnectionPool() noexcept { return pool_; }
129+
uint64_t getLookupCount() { return lookupCount_; }
127130

128131
static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration);
129132

@@ -160,6 +163,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
160163
const std::string& consumerName, const ConsumerConfiguration& conf,
161164
SubscribeCallback callback);
162165

166+
const std::string& getPhysicalAddress(const std::string& logicalAddress);
167+
163168
static std::string getClientVersion(const ClientConfiguration& clientConfiguration);
164169

165170
enum State
@@ -191,6 +196,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
191196
SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
192197

193198
std::atomic<Result> closingError;
199+
std::atomic<bool> useProxy_;
200+
std::atomic<uint64_t> lookupCount_;
194201

195202
friend class Client;
196203
};

lib/ConsumerImpl.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,10 +1234,13 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
12341234
negativeAcksTracker_->add(messageId);
12351235
}
12361236

1237-
void ConsumerImpl::disconnectConsumer() {
1238-
LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
1237+
void ConsumerImpl::disconnectConsumer() { disconnectConsumer(boost::none); }
1238+
1239+
void ConsumerImpl::disconnectConsumer(const boost::optional<std::string>& assignedBrokerUrl) {
1240+
LOG_INFO("Broker notification of Closed consumer: "
1241+
<< consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + assignedBrokerUrl.get()) : ""));
12391242
resetCnx();
1240-
scheduleReconnection();
1243+
scheduleReconnection(assignedBrokerUrl);
12411244
}
12421245

12431246
void ConsumerImpl::closeAsync(ResultCallback originalCallback) {

lib/ConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ class ConsumerImpl : public ConsumerImplBase {
131131
void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;
132132

133133
virtual void disconnectConsumer();
134+
virtual void disconnectConsumer(const boost::optional<std::string>& assignedBrokerUrl);
134135
Result fetchSingleMessageFromBroker(Message& msg);
135136

136137
virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);

lib/HandlerBase.cc

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,18 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
6868
connection_ = cnx;
6969
}
7070

71-
void HandlerBase::grabCnx() {
71+
void HandlerBase::grabCnx() { grabCnx(boost::none); }
72+
73+
Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
74+
const ClientImplPtr& client, const boost::optional<std::string>& assignedBrokerUrl) {
75+
if (assignedBrokerUrl && client->getLookupCount() > 0) {
76+
return client->connect(assignedBrokerUrl.get(), connectionKeySuffix_);
77+
} else {
78+
return client->getConnection(topic(), connectionKeySuffix_);
79+
}
80+
}
81+
82+
void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl) {
7283
bool expectedState = false;
7384
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
7485
LOG_INFO(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
@@ -90,7 +101,7 @@ void HandlerBase::grabCnx() {
90101
return;
91102
}
92103
auto self = shared_from_this();
93-
auto cnxFuture = client->getConnection(topic(), connectionKeySuffix_);
104+
auto cnxFuture = getConnection(client, assignedBrokerUrl);
94105
cnxFuture.addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
95106
if (result == ResultOk) {
96107
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
@@ -141,37 +152,37 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr&
141152
break;
142153
}
143154
}
144-
145-
void HandlerBase::scheduleReconnection() {
155+
void HandlerBase::scheduleReconnection() { scheduleReconnection(boost::none); }
156+
void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assignedBrokerUrl) {
146157
const auto state = state_.load();
147158

148159
if (state == Pending || state == Ready) {
149-
TimeDuration delay = backoff_.next();
160+
TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next();
150161

151162
LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s");
152163
timer_->expires_from_now(delay);
153164
// passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
154165
// so we will not run into the case where grabCnx is invoked on out of scope handler
155166
auto name = getName();
156167
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
157-
timer_->async_wait([name, weakSelf](const ASIO_ERROR& ec) {
168+
timer_->async_wait([name, weakSelf, assignedBrokerUrl](const ASIO_ERROR& ec) {
158169
auto self = weakSelf.lock();
159170
if (self) {
160-
self->handleTimeout(ec);
171+
self->handleTimeout(ec, assignedBrokerUrl);
161172
} else {
162173
LOG_WARN(name << "Cancel the reconnection since the handler is destroyed");
163174
}
164175
});
165176
}
166177
}
167178

168-
void HandlerBase::handleTimeout(const ASIO_ERROR& ec) {
179+
void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const boost::optional<std::string>& assignedBrokerUrl) {
169180
if (ec) {
170181
LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec << "]");
171182
return;
172183
} else {
173184
epoch_++;
174-
grabCnx();
185+
grabCnx(assignedBrokerUrl);
175186
}
176187
}
177188

lib/HandlerBase.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define _PULSAR_HANDLER_BASE_HEADER_
2121
#include <pulsar/Result.h>
2222

23+
#include <boost/optional.hpp>
2324
#include <memory>
2425
#include <mutex>
2526
#include <string>
@@ -53,11 +54,22 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
5354
void resetCnx() { setCnx(nullptr); }
5455

5556
protected:
57+
/*
58+
* tries reconnection and sets connection_ to valid object
59+
* @param assignedBrokerUrl assigned broker url to directly connect to without lookup
60+
*/
61+
void grabCnx(const boost::optional<std::string>& assignedBrokerUrl);
62+
5663
/*
5764
* tries reconnection and sets connection_ to valid object
5865
*/
5966
void grabCnx();
6067

68+
/*
69+
* Schedule reconnection after backoff time
70+
* @param assignedBrokerUrl assigned broker url to directly connect to without lookup
71+
*/
72+
void scheduleReconnection(const boost::optional<std::string>& assignedBrokerUrl);
6173
/*
6274
* Schedule reconnection after backoff time
6375
*/
@@ -89,9 +101,12 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
89101
private:
90102
const std::shared_ptr<std::string> topic_;
91103

104+
Future<Result, ClientConnectionPtr> getConnection(const ClientImplPtr& client,
105+
const boost::optional<std::string>& assignedBrokerUrl);
106+
92107
void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
93108

94-
void handleTimeout(const ASIO_ERROR& ec);
109+
void handleTimeout(const ASIO_ERROR& ec, const boost::optional<std::string>& assignedBrokerUrl);
95110

96111
protected:
97112
ClientImplWeakPtr client_;

lib/LookupService.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class LookupService {
4242
struct LookupResult {
4343
std::string logicalAddress;
4444
std::string physicalAddress;
45+
bool proxyThroughServiceUrl;
4546

4647
friend std::ostream& operator<<(std::ostream& os, const LookupResult& lookupResult) {
4748
return os << "logical address: " << lookupResult.logicalAddress

0 commit comments

Comments
 (0)