Skip to content

Commit d0a3227

Browse files
authored
[improve] add physicalAddress as part of connection pool key (#411)
### Motivation Context: https://github.com/apache/pulsar/pull/22085/files#r1497008116 Currently, the connection pool key does not include physicalAddress (currently logicalAddress + keySuffix). This can be a problem when the same logicalAddresses are in the migrated(green) cluster. (the connection pool will return the connection to the old(blue) cluster) ### Modifications Add physicalAddress as part of the connection pool key
1 parent e2cacb7 commit d0a3227

File tree

3 files changed

+14
-6
lines changed

3 files changed

+14
-6
lines changed

lib/ClientConnection.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ void ClientConnection::close(Result result, bool detach) {
13211321
}
13221322
// Remove the connection from the pool before completing any promise
13231323
if (detach) {
1324-
pool_.remove(logicalAddress_ + "-" + std::to_string(poolIndex_), this);
1324+
pool_.remove(logicalAddress_, physicalAddress_, poolIndex_, this);
13251325
}
13261326

13271327
auto self = shared_from_this();

lib/ConnectionPool.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ bool ConnectionPool::close() {
6666
return true;
6767
}
6868

69+
static const std::string getKey(const std::string& logicalAddress, const std::string& physicalAddress,
70+
size_t keySuffix) {
71+
std::stringstream ss;
72+
ss << logicalAddress << '-' << physicalAddress << '-' << keySuffix;
73+
return ss.str();
74+
}
75+
6976
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
7077
const std::string& physicalAddress,
7178
size_t keySuffix) {
@@ -77,9 +84,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
7784

7885
std::unique_lock<std::recursive_mutex> lock(mutex_);
7986

80-
std::stringstream ss;
81-
ss << logicalAddress << '-' << keySuffix;
82-
const std::string key = ss.str();
87+
auto key = getKey(logicalAddress, physicalAddress, keySuffix);
8388

8489
PoolMap::iterator cnxIt = pool_.find(key);
8590
if (cnxIt != pool_.end()) {
@@ -127,7 +132,9 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
127132
return future;
128133
}
129134

130-
void ConnectionPool::remove(const std::string& key, ClientConnection* value) {
135+
void ConnectionPool::remove(const std::string& logicalAddress, const std::string& physicalAddress,
136+
size_t keySuffix, ClientConnection* value) {
137+
auto key = getKey(logicalAddress, physicalAddress, keySuffix);
131138
std::lock_guard<std::recursive_mutex> lock(mutex_);
132139
auto it = pool_.find(key);
133140
if (it != pool_.end() && it->second.get() == value) {

lib/ConnectionPool.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class PULSAR_PUBLIC ConnectionPool {
5151
*/
5252
bool close();
5353

54-
void remove(const std::string& key, ClientConnection* value);
54+
void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix,
55+
ClientConnection* value);
5556

5657
/**
5758
* Get a connection from the pool.

0 commit comments

Comments
 (0)