Skip to content

Commit de94bc5

Browse files
authored
Fix getLastMessageId method dead recursion. (#117)
1 parent 1a0ce69 commit de94bc5

File tree

8 files changed

+36
-5
lines changed

8 files changed

+36
-5
lines changed

lib/Consumer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,8 @@ void Consumer::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
295295
callback(ResultConsumerNotInitialized, MessageId());
296296
return;
297297
}
298-
getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {
298+
299+
impl_->getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {
299300
callback(result, response.getLastMessageId());
300301
});
301302
}

lib/ConsumerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1343,7 +1343,7 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
13431343
if (state == Closed || state == Closing) {
13441344
LOG_ERROR(getName() << "Client connection already closed.");
13451345
if (callback) {
1346-
callback(ResultAlreadyClosed, MessageId());
1346+
callback(ResultAlreadyClosed, GetLastMessageIdResponse());
13471347
}
13481348
return;
13491349
}

lib/ConsumerImpl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ class BatchAcknowledgementTracker;
4444
class MessageCrypto;
4545
class GetLastMessageIdResponse;
4646
typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
47-
typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;
4847
typedef std::shared_ptr<Backoff> BackoffPtr;
4948

5049
class AckGroupingTracker;
@@ -124,6 +123,7 @@ class ConsumerImpl : public ConsumerImplBase {
124123
const std::string& getName() const override;
125124
int getNumOfPrefetchedMessages() const override;
126125
void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override;
126+
void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) override;
127127
void seekAsync(const MessageId& msgId, ResultCallback callback) override;
128128
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
129129
void negativeAcknowledge(const MessageId& msgId) override;
@@ -139,7 +139,6 @@ class ConsumerImpl : public ConsumerImplBase {
139139

140140
virtual bool isReadCompacted();
141141
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
142-
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
143142
void beforeConnectionChange(ClientConnection& cnx) override;
144143

145144
protected:

lib/ConsumerImplBase.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
#include <set>
2626

2727
#include "Future.h"
28+
#include "GetLastMessageIdResponse.h"
2829
#include "HandlerBase.h"
2930

3031
namespace pulsar {
3132
class ConsumerImplBase;
3233
using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;
33-
3434
class OpBatchReceive {
3535
public:
3636
OpBatchReceive();
@@ -68,6 +68,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
6868
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0;
6969
virtual int getNumOfPrefetchedMessages() const = 0;
7070
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
71+
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) = 0;
7172
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
7273
virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0;
7374
virtual void negativeAcknowledge(const MessageId& msgId) = 0;

lib/GetLastMessageIdResponse.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#pragma once
2020

2121
#include <pulsar/MessageId.h>
22+
#include <pulsar/Result.h>
2223

2324
#include <iostream>
2425

@@ -54,4 +55,6 @@ class GetLastMessageIdResponse {
5455
bool hasMarkDeletePosition_;
5556
};
5657

58+
typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;
59+
5760
} // namespace pulsar

lib/MultiTopicsConsumerImpl.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,10 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
803803
});
804804
}
805805

806+
void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
807+
callback(ResultOperationNotSupported, GetLastMessageIdResponse());
808+
}
809+
806810
void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats,
807811
LatchPtr latchPtr,
808812
MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index,

lib/MultiTopicsConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
8282
const std::string& getName() const override;
8383
int getNumOfPrefetchedMessages() const override;
8484
void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override;
85+
void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) override;
8586
void seekAsync(const MessageId& msgId, ResultCallback callback) override;
8687
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
8788
void negativeAcknowledge(const MessageId& msgId) override;

tests/ConsumerTest.cc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,28 @@ TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
841841
thread.join();
842842
}
843843

844+
TEST(ConsumerTest, testGetLastMessageId) {
845+
Client client(lookupUrl);
846+
const std::string topic = "testGetLastMessageId-" + std::to_string(time(nullptr));
847+
848+
Consumer consumer;
849+
ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
850+
851+
MessageId msgId;
852+
ASSERT_EQ(ResultOk, consumer.getLastMessageId(msgId));
853+
ASSERT_EQ(msgId, MessageId(-1, -1, -1, -1));
854+
855+
Producer producer;
856+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
857+
Message msg = MessageBuilder().setContent("message").build();
858+
ASSERT_EQ(ResultOk, producer.send(msg));
859+
860+
ASSERT_EQ(ResultOk, consumer.getLastMessageId(msgId));
861+
ASSERT_NE(msgId, MessageId(-1, -1, -1, -1));
862+
863+
client.close();
864+
}
865+
844866
TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
845867
int operationTimeout = 5;
846868
ClientConfiguration clientConfiguration;

0 commit comments

Comments
 (0)