Skip to content

Commit f2c580b

Browse files
Fix pending requests failed with ResultConnectError when disconnecting (apache#322)
### Motivation When there are multiple pending requests in the same `ClientConnection`, if one request failed with a retryable error, e.g. the `ServiceUnitNotReady` error when finding the owner broker of a topic, the socket will be closed in `checkServerError` and `close()` will be called subsequently in `handleRead` (`err` is `eof` or `operation_failed`). However, the default value of 1st parameter is `ResultConnectError` for `close`, which is not retryable. ### Modifications If the connection is disconnected by the client, pass `ResultDisconnected` to `close` and treat it as retryable. closeSocket is replaced with close(ResultDisconnected) to avoid the connection being the status that socket is closed but TLS stream is not closed.
1 parent af45a54 commit f2c580b

File tree

7 files changed

+75
-36
lines changed

7 files changed

+75
-36
lines changed

lib/ClientConnection.cc

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "OpSendMsg.h"
3232
#include "ProducerImpl.h"
3333
#include "PulsarApi.pb.h"
34+
#include "ResultUtils.h"
3435
#include "Url.h"
3536
#include "auth/InitialAuthData.h"
3637
#include "checksum/ChecksumProvider.h"
@@ -205,7 +206,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
205206
ctx.load_verify_file(trustCertFilePath);
206207
} else {
207208
LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
208-
close();
209+
close(ResultAuthenticationError, false);
209210
return;
210211
}
211212
} else {
@@ -215,7 +216,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
215216

216217
if (!authentication_) {
217218
LOG_ERROR("Invalid authentication plugin");
218-
close();
219+
close(ResultAuthenticationError, false);
219220
return;
220221
}
221222

@@ -229,12 +230,12 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
229230
tlsPrivateKey = authData->getTlsPrivateKey();
230231
if (!file_exists(tlsCertificates)) {
231232
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
232-
close();
233+
close(ResultAuthenticationError, false);
233234
return;
234235
}
235236
if (!file_exists(tlsCertificates)) {
236237
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
237-
close();
238+
close(ResultAuthenticationError, false);
238239
return;
239240
}
240241
ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem);
@@ -660,7 +661,7 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b
660661
} else {
661662
LOG_ERROR(cnxString_ << "Read operation failed: " << err.message());
662663
}
663-
close();
664+
close(ResultDisconnected);
664665
} else if (bytesTransferred < minReadSize) {
665666
// Read the remaining part, use a slice of buffer to write on the next
666667
// region
@@ -718,7 +719,7 @@ void ClientConnection::processIncomingBuffer() {
718719
proto::BaseCommand incomingCmd;
719720
if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
720721
LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
721-
close();
722+
close(ResultDisconnected);
722723
return;
723724
}
724725

@@ -742,7 +743,7 @@ void ClientConnection::processIncomingBuffer() {
742743
<< incomingCmd.message().message_id().ledgerid() << ", entry id "
743744
<< incomingCmd.message().message_id().entryid()
744745
<< "] Error parsing broker entry metadata");
745-
close();
746+
close(ResultDisconnected);
746747
return;
747748
}
748749
incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize);
@@ -760,7 +761,7 @@ void ClientConnection::processIncomingBuffer() {
760761
<< incomingCmd.message().message_id().ledgerid() //
761762
<< ", entry id " << incomingCmd.message().message_id().entryid()
762763
<< "] Error parsing message metadata");
763-
close();
764+
close(ResultDisconnected);
764765
return;
765766
}
766767

@@ -991,7 +992,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
991992

992993
default:
993994
LOG_WARN(cnxString_ << "Received invalid message from server");
994-
close();
995+
close(ResultDisconnected);
995996
break;
996997
}
997998
}
@@ -1133,7 +1134,7 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
11331134
void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) {
11341135
if (err) {
11351136
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
1136-
close();
1137+
close(ResultDisconnected);
11371138
} else {
11381139
sendPendingCommands();
11391140
}
@@ -1142,7 +1143,7 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh
11421143
void ClientConnection::handleSendPair(const boost::system::error_code& err) {
11431144
if (err) {
11441145
LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message());
1145-
close();
1146+
close(ResultDisconnected);
11461147
} else {
11471148
sendPendingCommands();
11481149
}
@@ -1247,7 +1248,7 @@ void ClientConnection::handleKeepAliveTimeout() {
12471248

12481249
if (havePendingPingRequest_) {
12491250
LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive timeout");
1250-
close();
1251+
close(ResultDisconnected);
12511252
} else {
12521253
// Send keep alive probe to peer
12531254
LOG_DEBUG(cnxString_ << "Sending ping message");
@@ -1287,7 +1288,14 @@ void ClientConnection::close(Result result, bool detach) {
12871288
}
12881289
state_ = Disconnected;
12891290

1290-
closeSocket();
1291+
if (socket_) {
1292+
boost::system::error_code err;
1293+
socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
1294+
socket_->close(err);
1295+
if (err) {
1296+
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
1297+
}
1298+
}
12911299
if (tlsSocket_) {
12921300
boost::system::error_code err;
12931301
tlsSocket_->lowest_layer().close(err);
@@ -1326,7 +1334,7 @@ void ClientConnection::close(Result result, bool detach) {
13261334
}
13271335

13281336
lock.unlock();
1329-
if (result != ResultDisconnected && result != ResultRetryable) {
1337+
if (!isResultRetryable(result)) {
13301338
LOG_ERROR(cnxString_ << "Connection closed with " << result);
13311339
} else {
13321340
LOG_INFO(cnxString_ << "Connection disconnected");
@@ -1473,26 +1481,15 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
14731481
return promise.getFuture();
14741482
}
14751483

1476-
void ClientConnection::closeSocket() {
1477-
boost::system::error_code err;
1478-
if (socket_) {
1479-
socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
1480-
socket_->close(err);
1481-
if (err) {
1482-
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
1483-
}
1484-
}
1485-
}
1486-
14871484
void ClientConnection::checkServerError(ServerError error) {
14881485
switch (error) {
14891486
case proto::ServerError::ServiceNotReady:
1490-
closeSocket();
1487+
close(ResultDisconnected);
14911488
break;
14921489
case proto::ServerError::TooManyRequests:
14931490
// TODO: Implement maxNumberOfRejectedRequestPerConnection like
14941491
// https://github.com/apache/pulsar/pull/274
1495-
closeSocket();
1492+
close(ResultDisconnected);
14961493
break;
14971494
default:
14981495
break;
@@ -1518,7 +1515,7 @@ void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendRe
15181515
if (!producer->ackReceived(sequenceId, messageId)) {
15191516
// If the producer fails to process the ack, we need to close the connection
15201517
// to give it a chance to recover from there
1521-
close();
1518+
close(ResultDisconnected);
15221519
}
15231520
}
15241521
} else {
@@ -1542,12 +1539,12 @@ void ClientConnection::handleSendError(const proto::CommandSendError& error) {
15421539
if (!producer->removeCorruptMessage(sequenceId)) {
15431540
// If the producer fails to remove corrupt msg, we need to close the
15441541
// connection to give it a chance to recover from there
1545-
close();
1542+
close(ResultDisconnected);
15461543
}
15471544
}
15481545
}
15491546
} else {
1550-
close();
1547+
close(ResultDisconnected);
15511548
}
15521549
}
15531550

lib/ClientConnection.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,16 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
142142
*/
143143
void tcpConnectAsync();
144144

145+
/**
146+
* Close the connection.
147+
*
148+
* @param result all pending futures will complete with this result
149+
* @param detach remove it from the pool if it's true
150+
*
151+
* `detach` should only be false when:
152+
* 1. Before the connection is put into the pool, i.e. during the construction.
153+
* 2. When the connection pool is closed
154+
*/
145155
void close(Result result = ResultConnectError, bool detach = true);
146156

147157
bool isClosed() const;
@@ -392,7 +402,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
392402
ConnectionPool& pool_;
393403
friend class PulsarFriend;
394404

395-
void closeSocket();
396405
void checkServerError(ServerError error);
397406

398407
void handleSendReceipt(const proto::CommandSendReceipt&);

lib/ConsumerImpl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "MessagesImpl.h"
4343
#include "ProducerConfigurationImpl.h"
4444
#include "PulsarApi.pb.h"
45+
#include "ResultUtils.h"
4546
#include "TimeUtils.h"
4647
#include "TopicName.h"
4748
#include "UnAckedMessageTrackerDisabled.h"
@@ -319,7 +320,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
319320
} else {
320321
// Consumer was not yet created, retry to connect to broker if it's possible
321322
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
322-
if (result == ResultRetryable) {
323+
if (isResultRetryable(result)) {
323324
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
324325
scheduleReconnection();
325326
} else {

lib/HandlerBase.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "ClientImpl.h"
2323
#include "ExecutorService.h"
2424
#include "LogUtils.h"
25+
#include "ResultUtils.h"
2526
#include "TimeUtils.h"
2627

2728
DECLARE_LOG_OBJECT()
@@ -110,7 +111,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr&
110111

111112
resetCnx();
112113

113-
if (result == ResultRetryable) {
114+
if (isResultRetryable(result)) {
114115
scheduleReconnection();
115116
return;
116117
}
@@ -165,7 +166,7 @@ void HandlerBase::handleTimeout(const boost::system::error_code& ec) {
165166
}
166167

167168
Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const {
168-
if (result == ResultRetryable && (TimeUtils::now() - startTimestamp >= operationTimeut_)) {
169+
if (isResultRetryable(result) && (TimeUtils::now() - startTimestamp >= operationTimeut_)) {
169170
return ResultTimeout;
170171
} else {
171172
return result;

lib/ProducerImpl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "OpSendMsg.h"
3737
#include "ProducerConfigurationImpl.h"
3838
#include "PulsarApi.pb.h"
39+
#include "ResultUtils.h"
3940
#include "Semaphore.h"
4041
#include "TimeUtils.h"
4142
#include "TopicName.h"
@@ -272,7 +273,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
272273
} else {
273274
// Producer was not yet created, retry to connect to broker if it's possible
274275
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
275-
if (result == ResultRetryable) {
276+
if (isResultRetryable(result)) {
276277
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
277278
scheduleReconnection();
278279
} else {

lib/ResultUtils.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <pulsar/Result.h>
22+
23+
namespace pulsar {
24+
25+
inline bool isResultRetryable(Result result) {
26+
return result == ResultRetryable || result == ResultDisconnected;
27+
}
28+
29+
} // namespace pulsar

lib/RetryableOperation.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "ExecutorService.h"
3030
#include "Future.h"
3131
#include "LogUtils.h"
32+
#include "ResultUtils.h"
3233

3334
namespace pulsar {
3435

@@ -95,7 +96,7 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio
9596
promise_.setValue(value);
9697
return;
9798
}
98-
if (result != ResultRetryable) {
99+
if (!isResultRetryable(result)) {
99100
promise_.setFailed(result);
100101
return;
101102
}

0 commit comments

Comments
 (0)