Skip to content

Commit 5c77648

Browse files
Fix segmentation fault when sending messages after receiving an error (apache#326)
Fixes apache#325 ### Motivation apache#317 introduces a bug that might cause segmentation fault when sending messages after receiving an error, see apache#325 (comment) for the detailed explanation. ### Modifications When calling `asyncWrite`, capture the `shared_ptr` instead of the `weak_ptr` to extend the lifetime of the `socket_` or `tlsSocket_` field in `ClientConnection`. Since the lifetime is extended, in some callbacks, check `isClosed()` before other logic. Add a `ChunkDedupTest` to reproduce this issue based on Pulsar 3.1.0. Run the test for 10 times to ensure it won't crash after this patch.
1 parent eea59bb commit 5c77648

File tree

5 files changed

+148
-52
lines changed

5 files changed

+148
-52
lines changed

lib/ClientConnection.cc

Lines changed: 39 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -519,18 +519,18 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) {
519519
return;
520520
}
521521
// Send CONNECT command to broker
522-
auto weakSelf = weak_from_this();
522+
auto self = shared_from_this();
523523
asyncWrite(buffer.const_asio_buffer(),
524-
customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) {
525-
auto self = weakSelf.lock();
526-
if (self) {
527-
self->handleSentPulsarConnect(err, buffer);
528-
}
524+
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) {
525+
handleSentPulsarConnect(err, buffer);
529526
}));
530527
}
531528

532529
void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& err,
533530
const SharedBuffer& buffer) {
531+
if (isClosed()) {
532+
return;
533+
}
534534
if (err) {
535535
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
536536
close();
@@ -543,6 +543,9 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code&
543543

544544
void ClientConnection::handleSentAuthResponse(const boost::system::error_code& err,
545545
const SharedBuffer& buffer) {
546+
if (isClosed()) {
547+
return;
548+
}
546549
if (err) {
547550
LOG_WARN(cnxString_ << "Failed to send auth response: " << err.message());
548551
close();
@@ -650,6 +653,9 @@ void ClientConnection::readNextCommand() {
650653

651654
void ClientConnection::handleRead(const boost::system::error_code& err, size_t bytesTransferred,
652655
uint32_t minReadSize) {
656+
if (isClosed()) {
657+
return;
658+
}
653659
// Update buffer write idx with new data
654660
incomingBuffer_.bytesWritten(bytesTransferred);
655661

@@ -1085,15 +1091,10 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) {
10851091
}
10861092

10871093
void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
1088-
auto weakSelf = weak_from_this();
1094+
auto self = shared_from_this();
10891095
asyncWrite(cmd.const_asio_buffer(),
1090-
customAllocWriteHandler(
1091-
[weakSelf, cmd](const boost::system::error_code& err, size_t bytesTransferred) {
1092-
auto self = weakSelf.lock();
1093-
if (self) {
1094-
self->handleSend(err, cmd);
1095-
}
1096-
}));
1096+
customAllocWriteHandler([this, self, cmd](const boost::system::error_code& err,
1097+
size_t bytesTransferred) { handleSend(err, cmd); }));
10971098
}
10981099

10991100
void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
@@ -1102,23 +1103,15 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
11021103
pendingWriteBuffers_.emplace_back(args);
11031104
return;
11041105
}
1105-
auto weakSelf = weak_from_this();
1106-
auto sendMessageInternal = [this, weakSelf, args] {
1107-
auto self = weakSelf.lock();
1108-
if (!self) {
1109-
return;
1110-
}
1106+
auto self = shared_from_this();
1107+
auto sendMessageInternal = [this, self, args] {
11111108
BaseCommand outgoingCmd;
11121109
auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
11131110
// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
11141111
// callback is called, an invalid buffer range might be passed to the underlying socket send.
1115-
asyncWrite(buffer, customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err,
1116-
size_t bytesTransferred) {
1117-
auto self = weakSelf.lock();
1118-
if (self) {
1119-
self->handleSendPair(err);
1120-
}
1121-
}));
1112+
asyncWrite(buffer, customAllocWriteHandler(
1113+
[this, self, buffer](const boost::system::error_code& err,
1114+
size_t bytesTransferred) { handleSendPair(err); }));
11221115
};
11231116
if (tlsSocket_) {
11241117
#if BOOST_VERSION >= 106600
@@ -1132,6 +1125,9 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
11321125
}
11331126

11341127
void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) {
1128+
if (isClosed()) {
1129+
return;
1130+
}
11351131
if (err) {
11361132
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
11371133
close(ResultDisconnected);
@@ -1141,6 +1137,9 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh
11411137
}
11421138

11431139
void ClientConnection::handleSendPair(const boost::system::error_code& err) {
1140+
if (isClosed()) {
1141+
return;
1142+
}
11441143
if (err) {
11451144
LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message());
11461145
close(ResultDisconnected);
@@ -1157,17 +1156,12 @@ void ClientConnection::sendPendingCommands() {
11571156
boost::any any = pendingWriteBuffers_.front();
11581157
pendingWriteBuffers_.pop_front();
11591158

1160-
auto weakSelf = weak_from_this();
1159+
auto self = shared_from_this();
11611160
if (any.type() == typeid(SharedBuffer)) {
11621161
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
1163-
asyncWrite(
1164-
buffer.const_asio_buffer(),
1165-
customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) {
1166-
auto self = weakSelf.lock();
1167-
if (self) {
1168-
self->handleSend(err, buffer);
1169-
}
1170-
}));
1162+
asyncWrite(buffer.const_asio_buffer(),
1163+
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err,
1164+
size_t) { handleSend(err, buffer); }));
11711165
} else {
11721166
assert(any.type() == typeid(std::shared_ptr<SendArguments>));
11731167

@@ -1178,13 +1172,9 @@ void ClientConnection::sendPendingCommands() {
11781172

11791173
// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
11801174
// callback is called, an invalid buffer range might be passed to the underlying socket send.
1181-
asyncWrite(buffer, customAllocWriteHandler(
1182-
[weakSelf, buffer](const boost::system::error_code& err, size_t) {
1183-
auto self = weakSelf.lock();
1184-
if (self) {
1185-
self->handleSendPair(err);
1186-
}
1187-
}));
1175+
asyncWrite(buffer,
1176+
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err,
1177+
size_t) { handleSendPair(err); }));
11881178
}
11891179
} else {
11901180
// No more pending writes
@@ -1334,10 +1324,11 @@ void ClientConnection::close(Result result, bool detach) {
13341324
}
13351325

13361326
lock.unlock();
1327+
int refCount = weak_from_this().use_count();
13371328
if (!isResultRetryable(result)) {
1338-
LOG_ERROR(cnxString_ << "Connection closed with " << result);
1329+
LOG_ERROR(cnxString_ << "Connection closed with " << result << " (refCnt: " << refCount << ")");
13391330
} else {
1340-
LOG_INFO(cnxString_ << "Connection disconnected");
1331+
LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount << ")");
13411332
}
13421333
// Remove the connection from the pool before completing any promise
13431334
if (detach) {
@@ -1824,13 +1815,10 @@ void ClientConnection::handleAuthChallenge() {
18241815
close(result);
18251816
return;
18261817
}
1827-
auto weakSelf = weak_from_this();
1818+
auto self = shared_from_this();
18281819
asyncWrite(buffer.const_asio_buffer(),
1829-
customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) {
1830-
auto self = weakSelf.lock();
1831-
if (self) {
1832-
self->handleSentAuthResponse(err, buffer);
1833-
}
1820+
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) {
1821+
handleSentAuthResponse(err, buffer);
18341822
}));
18351823
}
18361824

run-unit-tests.sh

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,17 @@ docker compose -f tests/oauth2/docker-compose.yml down
4040

4141
# Run BrokerMetadata tests
4242
docker compose -f tests/brokermetadata/docker-compose.yml up -d
43-
sleep 15
43+
until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
44+
sleep 5
4445
$CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest
4546
docker compose -f tests/brokermetadata/docker-compose.yml down
4647

48+
docker compose -f tests/chunkdedup/docker-compose.yml up -d
49+
until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
50+
sleep 5
51+
$CMAKE_BUILD_DIRECTORY/tests/ChunkDedupTest --gtest_repeat=10
52+
docker compose -f tests/chunkdedup/docker-compose.yml down
53+
4754
./pulsar-test-service-start.sh
4855

4956
pushd $CMAKE_BUILD_DIRECTORY/tests

tests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,6 @@ target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIB
7171
add_executable(Oauth2Test oauth2/Oauth2Test.cc)
7272
target_compile_options(Oauth2Test PRIVATE "-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
7373
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
74+
75+
add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc)
76+
target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})

tests/chunkdedup/ChunkDedupTest.cc

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <gtest/gtest.h>
20+
#include <pulsar/Client.h>
21+
22+
#include "lib/Latch.h"
23+
#include "lib/LogUtils.h"
24+
25+
DECLARE_LOG_OBJECT()
26+
27+
using namespace pulsar;
28+
29+
// Before https://github.com/apache/pulsar/pull/20948, when message deduplication is enabled, sending chunks
30+
// to the broker will receive send error response.
31+
TEST(ChunkDedupTest, testSendChunks) {
32+
Client client{"pulsar://localhost:6650"};
33+
ProducerConfiguration conf;
34+
conf.setBatchingEnabled(false);
35+
conf.setChunkingEnabled(true);
36+
Producer producer;
37+
ASSERT_EQ(ResultOk, client.createProducer("test-send-chunks", conf, producer));
38+
39+
Latch latch{1};
40+
std::string value(1024000 /* max message size */ * 100, 'a');
41+
producer.sendAsync(MessageBuilder().setContent(value).build(),
42+
[&latch](Result result, const MessageId& msgId) {
43+
LOG_INFO("Send to " << msgId << ": " << result);
44+
latch.countdown();
45+
});
46+
ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
47+
client.close();
48+
}
49+
50+
int main(int argc, char* argv[]) {
51+
::testing::InitGoogleTest(&argc, argv);
52+
return RUN_ALL_TESTS();
53+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
version: '3'
21+
networks:
22+
pulsar:
23+
driver: bridge
24+
services:
25+
standalone:
26+
# Don't change the version here to ensure https://github.com/apache/pulsar/pull/20948 is not included
27+
image: apachepulsar/pulsar:3.1.0
28+
container_name: standalone
29+
hostname: local
30+
restart: "no"
31+
networks:
32+
- pulsar
33+
environment:
34+
- metadataStoreUrl=zk:localhost:2181
35+
- clusterName=standalone
36+
- advertisedAddress=localhost
37+
- advertisedListeners=external:pulsar://localhost:6650
38+
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
39+
- PULSAR_PREFIX_maxMessageSize=1024000
40+
- PULSAR_PREFIX_brokerDeduplicationEnabled=true
41+
ports:
42+
- "6650:6650"
43+
- "8080:8080"
44+
command: bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw"
45+

0 commit comments

Comments
 (0)