Skip to content

Commit 74ef1a0

Browse files
[feat] Support expiration for chunked messages (#71)
### Motivation Add support for checking expiration for incomplete chunked messages. ### Modifications * Add configuration `expireTimeOfIncompleteChunkedMessageMs` to the consumer. * Add timer to check the expiration incomplete chunked messages Co-authored-by: Yunze Xu <[email protected]>
1 parent efd2a1e commit 74ef1a0

File tree

11 files changed

+200
-3
lines changed

11 files changed

+200
-3
lines changed

include/pulsar/ConsumerConfiguration.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,26 @@ class PULSAR_PUBLIC ConsumerConfiguration {
519519
*/
520520
bool isAutoAckOldestChunkedMessageOnQueueFull() const;
521521

522+
/**
523+
* If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if
524+
* consumer won't be able to receive all chunks in expire times. Use value 0 to disable this feature.
525+
*
526+
* Default: 60000, which means 1 minutes
527+
*
528+
* @param expireTimeOfIncompleteChunkedMessageMs expire time in milliseconds
529+
* @return Consumer Configuration
530+
*/
531+
ConsumerConfiguration& setExpireTimeOfIncompleteChunkedMessageMs(
532+
long expireTimeOfIncompleteChunkedMessageMs);
533+
534+
/**
535+
*
536+
* Get the expire time of incomplete chunked message in milliseconds
537+
*
538+
* @return the expire time of incomplete chunked message in milliseconds
539+
*/
540+
long getExpireTimeOfIncompleteChunkedMessageMs() const;
541+
522542
/**
523543
* Set the consumer to include the given position of any reset operation like Consumer::seek.
524544
*

include/pulsar/Message.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ class PULSAR_PUBLIC Message {
200200

201201
friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const StringMap& map);
202202
friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const Message& msg);
203+
friend class PulsarFriend;
203204
};
204205
} // namespace pulsar
205206

lib/ConsumerConfiguration.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,16 @@ bool ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const {
262262
return impl_->autoAckOldestChunkedMessageOnQueueFull;
263263
}
264264

265+
ConsumerConfiguration& ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs(
266+
long expireTimeOfIncompleteChunkedMessageMs) {
267+
impl_->expireTimeOfIncompleteChunkedMessageMs = expireTimeOfIncompleteChunkedMessageMs;
268+
return *this;
269+
}
270+
271+
long ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs() const {
272+
return impl_->expireTimeOfIncompleteChunkedMessageMs;
273+
}
274+
265275
ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool startMessageIdInclusive) {
266276
impl_->startMessageIdInclusive = startMessageIdInclusive;
267277
return *this;

lib/ConsumerConfigurationImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct ConsumerConfigurationImpl {
5555
size_t maxPendingChunkedMessage{10};
5656
bool autoAckOldestChunkedMessageOnQueueFull{false};
5757
bool startMessageIdInclusive{false};
58+
long expireTimeOfIncompleteChunkedMessageMs{60000};
5859
};
5960
} // namespace pulsar
6061
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */

lib/ConsumerImpl.cc

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
7878
readCompacted_(conf.isReadCompacted()),
7979
startMessageId_(startMessageId),
8080
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
81-
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()) {
81+
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
82+
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()) {
8283
std::stringstream consumerStrStream;
8384
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
8485
consumerStr_ = consumerStrStream.str();
@@ -109,6 +110,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
109110
if (conf.isEncryptionEnabled()) {
110111
msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false);
111112
}
113+
114+
checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
112115
}
113116

114117
ConsumerImpl::~ConsumerImpl() {
@@ -319,6 +322,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
319322
}
320323
}
321324

325+
void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
326+
checkExpiredChunkedTimer_->expires_from_now(
327+
boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
328+
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
329+
checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
330+
auto self = weakSelf.lock();
331+
if (!self) {
332+
return;
333+
}
334+
if (ec) {
335+
LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
336+
<< "].");
337+
return;
338+
}
339+
Lock lock(chunkProcessMutex_);
340+
long currentTimeMs = TimeUtils::currentTimeMillis();
341+
chunkedMessageCache_.removeOldestValuesIf(
342+
[this, currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
343+
bool expired =
344+
currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_;
345+
if (!expired) {
346+
return false;
347+
}
348+
for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
349+
LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId);
350+
doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) {
351+
if (result != ResultOk) {
352+
LOG_WARN("Failed to acknowledge discarded chunk, uuid: "
353+
<< uuid << ", messageId: " << msgId);
354+
}
355+
});
356+
}
357+
return true;
358+
});
359+
triggerCheckExpiredChunkedTimer();
360+
return;
361+
});
362+
}
363+
322364
Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
323365
const proto::MessageMetadata& metadata,
324366
const MessageId& messageId,
@@ -331,6 +373,14 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
331373
<< payload.readableBytes() << " bytes");
332374

333375
Lock lock(chunkProcessMutex_);
376+
377+
// Lazy task scheduling to expire incomplete chunk message
378+
bool expected = false;
379+
if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
380+
expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) {
381+
triggerCheckExpiredChunkedTimer();
382+
}
383+
334384
auto it = chunkedMessageCache_.find(uuid);
335385

336386
if (chunkId == 0) {
@@ -1448,6 +1498,7 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
14481498
void ConsumerImpl::cancelTimers() noexcept {
14491499
boost::system::error_code ec;
14501500
batchReceiveTimer_->cancel(ec);
1501+
checkExpiredChunkedTimer_->cancel(ec);
14511502
}
14521503

14531504
} /* namespace pulsar */

lib/ConsumerImpl.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "NegativeAcksTracker.h"
3434
#include "Synchronized.h"
3535
#include "TestUtil.h"
36+
#include "TimeUtils.h"
3637
#include "UnboundedBlockingQueue.h"
3738

3839
namespace pulsar {
@@ -259,6 +260,7 @@ class ConsumerImpl : public ConsumerImplBase {
259260
void appendChunk(const MessageId& messageId, const SharedBuffer& payload) {
260261
chunkedMessageIds_.emplace_back(messageId);
261262
chunkedMsgBuffer_.write(payload.data(), payload.readableBytes());
263+
receivedTimeMs_ = TimeUtils::currentTimeMillis();
262264
}
263265

264266
bool isCompleted() const noexcept { return totalChunks_ == numChunks(); }
@@ -267,6 +269,8 @@ class ConsumerImpl : public ConsumerImplBase {
267269

268270
const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
269271

272+
long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }
273+
270274
friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {
271275
return os << "ChunkedMessageCtx " << ctx.chunkedMsgBuffer_.readableBytes() << " of "
272276
<< ctx.chunkedMsgBuffer_.writerIndex() << " bytes, " << ctx.numChunks() << " of "
@@ -277,6 +281,7 @@ class ConsumerImpl : public ConsumerImplBase {
277281
const int totalChunks_;
278282
SharedBuffer chunkedMsgBuffer_;
279283
std::vector<MessageId> chunkedMessageIds_;
284+
long receivedTimeMs_;
280285

281286
int numChunks() const noexcept { return static_cast<int>(chunkedMessageIds_.size()); }
282287
};
@@ -297,6 +302,12 @@ class ConsumerImpl : public ConsumerImplBase {
297302
MapCache<std::string, ChunkedMessageCtx> chunkedMessageCache_;
298303
mutable std::mutex chunkProcessMutex_;
299304

305+
const long expireTimeOfIncompleteChunkedMessageMs_;
306+
DeadlineTimerPtr checkExpiredChunkedTimer_;
307+
std::atomic_bool expireChunkMessageTaskScheduled_{false};
308+
309+
void triggerCheckExpiredChunkedTimer();
310+
300311
/**
301312
* Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the
302313
* payload and return it.

lib/MapCache.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ class MapCache {
7373
}
7474
}
7575

76+
void removeOldestValuesIf(const std::function<bool(const Key&, const Value&)>& condition) {
77+
if (!condition) return;
78+
while (!keys_.empty()) {
79+
const auto key = keys_.front();
80+
auto it = map_.find(key);
81+
if (it == map_.end()) {
82+
continue;
83+
}
84+
if (condition(it->first, it->second)) {
85+
map_.erase(it);
86+
keys_.pop_front();
87+
} else {
88+
return;
89+
}
90+
}
91+
}
92+
7693
void remove(const Key& key) {
7794
auto it = map_.find(key);
7895
if (it != map_.end()) {

tests/MapCacheTest.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,33 @@ TEST(MapCacheTest, testRemoveAllValues) {
7777
ASSERT_TRUE(cache.getKeys().empty());
7878
ASSERT_EQ(cache.size(), 0);
7979
}
80+
81+
TEST(MapCacheTest, testRemoveOldestValuesIf) {
82+
MapCache<int, MoveOnlyInt> cache;
83+
cache.putIfAbsent(1, {100});
84+
cache.putIfAbsent(2, {200});
85+
cache.putIfAbsent(3, {300});
86+
int expireTime = 100;
87+
88+
auto checkCondition = [&expireTime](const int& key, const MoveOnlyInt& value) -> bool {
89+
return expireTime > value.x;
90+
};
91+
92+
cache.removeOldestValuesIf(nullptr);
93+
ASSERT_EQ(cache.size(), 3);
94+
95+
cache.removeOldestValuesIf(checkCondition);
96+
ASSERT_EQ(cache.size(), 3);
97+
98+
expireTime = 200;
99+
cache.removeOldestValuesIf(checkCondition);
100+
101+
auto keys = cache.getKeys();
102+
ASSERT_EQ(cache.size(), 2);
103+
ASSERT_EQ(cache.find(2)->second.x, 200);
104+
ASSERT_EQ(cache.find(3)->second.x, 300);
105+
106+
expireTime = 400;
107+
cache.removeOldestValuesIf(checkCondition);
108+
ASSERT_EQ(cache.size(), 0);
109+
}

tests/MessageChunkingTest.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <random>
2424

2525
#include "PulsarFriend.h"
26+
#include "WaitUtils.h"
2627
#include "lib/LogUtils.h"
2728

2829
DECLARE_LOG_OBJECT()
@@ -81,6 +82,10 @@ class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
8182
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
8283
}
8384

85+
void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) {
86+
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer));
87+
}
88+
8489
private:
8590
Client client_{lookupUrl};
8691
};
@@ -130,6 +135,49 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
130135
// Verify the cache has been cleared
131136
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
132137
ASSERT_EQ(chunkedMessageCache.size(), 0);
138+
139+
producer.close();
140+
consumer.close();
141+
}
142+
143+
TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) {
144+
// This test is time-consuming and is not related to the compressionType. So skip other compressionType
145+
// here.
146+
if (toString(GetParam()) != "None") {
147+
return;
148+
}
149+
const std::string topic = "MessageChunkingTest-testExpireIncompleteChunkMessage-" + toString(GetParam()) +
150+
std::to_string(time(nullptr));
151+
Consumer consumer;
152+
ConsumerConfiguration consumerConf;
153+
consumerConf.setExpireTimeOfIncompleteChunkedMessageMs(5000);
154+
consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true);
155+
createConsumer(topic, consumer, consumerConf);
156+
Producer producer;
157+
createProducer(topic, producer);
158+
159+
auto msg = MessageBuilder().setContent("test-data").build();
160+
auto& metadata = PulsarFriend::getMessageMetadata(msg);
161+
metadata.set_num_chunks_from_msg(2);
162+
metadata.set_chunk_id(0);
163+
metadata.set_total_chunk_msg_size(100);
164+
165+
producer.send(msg);
166+
167+
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
168+
169+
waitUntil(
170+
std::chrono::seconds(2), [&] { return chunkedMessageCache.size() > 0; }, 1000);
171+
ASSERT_EQ(chunkedMessageCache.size(), 1);
172+
173+
// Wait for triggering the check of the expiration.
174+
// Need to wait for 2 * expireTime because there may be a gap in checking the expiration time.
175+
waitUntil(
176+
std::chrono::seconds(10), [&] { return chunkedMessageCache.size() == 0; }, 1000);
177+
ASSERT_EQ(chunkedMessageCache.size(), 0);
178+
179+
producer.close();
180+
consumer.close();
133181
}
134182

135183
// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P

tests/PulsarFriend.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
#ifndef PULSAR_FRIEND_HPP_
20+
#define PULSAR_FRIEND_HPP_
1921

2022
#include <string>
2123

2224
#include "lib/ClientConnection.h"
2325
#include "lib/ClientImpl.h"
2426
#include "lib/ConsumerImpl.h"
27+
#include "lib/MessageImpl.h"
2528
#include "lib/MultiTopicsConsumerImpl.h"
2629
#include "lib/NamespaceName.h"
2730
#include "lib/PartitionedProducerImpl.h"
@@ -180,5 +183,9 @@ class PulsarFriend {
180183
static size_t getNumberOfPendingTasks(const RetryableLookupService& lookupService) {
181184
return lookupService.backoffTimers_.size();
182185
}
186+
187+
static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; }
183188
};
184189
} // namespace pulsar
190+
191+
#endif /* PULSAR_FRIEND_HPP_ */

0 commit comments

Comments
 (0)