Skip to content

Commit 9adc3ce

Browse files
authored
[ISSUE #1035] [C++] Support continuous visibility timeout adjustment for SimpleConsumer (#1039)
1 parent a7d43d1 commit 9adc3ce

File tree

10 files changed

+61
-33
lines changed

10 files changed

+61
-33
lines changed

.github/workflows/cpp_build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
# Disable VS 2022 before https://github.com/bazelbuild/bazel/issues/18592 issue is solved
1212
# Remove macos-11 since there is no such runner available
1313
# os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, windows-2022]
14-
os: [ubuntu-22.04, windows-2019]
14+
os: [ubuntu-22.04]
1515
steps:
1616
- uses: actions/checkout@v2
1717
- name: Compile On Linux

cpp/examples/ExampleSimpleConsumer.cpp

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "rocketmq/ErrorCode.h"
2222
#include "rocketmq/Logger.h"
2323
#include "rocketmq/SimpleConsumer.h"
24+
#include "spdlog/spdlog.h"
2425

2526
using namespace ROCKETMQ_NAMESPACE;
2627

@@ -58,34 +59,43 @@ int main(int argc, char* argv[]) {
5859
.subscribe(FLAGS_topic, tag)
5960
.withAwaitDuration(std::chrono::seconds(10))
6061
.build();
62+
std::size_t total = 0;
6163

6264
// Should use while (true) instead
6365
for (int j = 0; j < 30; j++) {
6466
std::vector<MessageConstSharedPtr> messages;
6567
std::error_code ec;
6668
simple_consumer.receive(4, std::chrono::seconds(15), ec, messages);
69+
6770
if (ec) {
6871
std::cerr << "Failed to receive messages. Cause: " << ec.message() << std::endl;
72+
} else {
73+
std::cout << "Received " << messages.size() << " messages" << std::endl;
6974
}
7075

71-
std::cout << "Received " << messages.size() << " messages" << std::endl;
72-
std::size_t i = 0;
73-
7476
for (const auto& message : messages) {
75-
std::cout << "Received a message[topic=" << message->topic()
76-
<< ", message-id=" << message->id()
77-
<< ", receipt-handle='" << message->extension().receipt_handle
78-
<< "']" << std::endl;
77+
std::string receipt_handle = message->extension().receipt_handle;
78+
SPDLOG_INFO("Receive message, topic={}, message-id={}, receipt-handle={}]", message->topic(), message->id(), receipt_handle);
7979

80-
if (++i % 2 == 0) {
80+
if (total++ % 2 == 0) {
81+
// Consume message successfully then ack it
8182
simple_consumer.ack(*message, ec);
8283
if (ec) {
83-
std::cerr << "Failed to ack message. Cause: " << ec.message() << std::endl;
84+
SPDLOG_ERROR("Failed to ack message. Cause: {}", ec.message());
85+
} else {
86+
SPDLOG_INFO("Ack message, topic={}, message-id={}, receipt-handle={}]", message->topic(), message->id(), receipt_handle);
8487
}
8588
} else {
86-
simple_consumer.changeInvisibleDuration(*message, std::chrono::seconds(3), ec);
87-
if (ec) {
88-
std::cerr << "Failed to change invisible duration of message. Cause: " << ec.message() << std::endl;
89+
// Extend the message consumption time by modifying the invisible duration API
90+
for (int k = 0; k < 3; k++) {
91+
simple_consumer.changeInvisibleDuration(
92+
*message, receipt_handle, std::chrono::seconds(15), ec);
93+
if (ec) {
94+
SPDLOG_WARN("Failed to change invisible duration of message. Cause: ", ec.message());
95+
} else {
96+
SPDLOG_INFO("Change invisible duration, topic={}, message-id={}, times={}, receipt-handle={}]",
97+
message->topic(), message->id(), k, receipt_handle);
98+
}
8999
}
90100
}
91101
}

cpp/include/rocketmq/SimpleConsumer.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ using ReceiveCallback = std::function<void(const std::error_code&, const std::ve
3535

3636
using AckCallback = std::function<void(const std::error_code&)>;
3737

38-
using ChangeInvisibleDurationCallback = std::function<void(const std::error_code&)>;
38+
using ChangeInvisibleDurationCallback = std::function<void(const std::error_code&, std::string& receipt_handle)>;
3939

4040
class SimpleConsumerImpl;
4141

@@ -60,9 +60,9 @@ class SimpleConsumer {
6060

6161
void asyncAck(const Message& message, AckCallback callback);
6262

63-
void changeInvisibleDuration(const Message& message, std::chrono::milliseconds duration, std::error_code& ec);
63+
void changeInvisibleDuration(const Message& message, std::string& receipt_handle, std::chrono::milliseconds duration, std::error_code& ec);
6464

65-
void asyncChangeInvisibleDuration(const Message& message,
65+
void asyncChangeInvisibleDuration(const Message& message, std::string& receipt_handle,
6666
std::chrono::milliseconds duration,
6767
ChangeInvisibleDurationCallback callback);
6868

cpp/source/client/ClientManagerImpl.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,7 +1100,8 @@ void ClientManagerImpl::changeInvisibleDuration(
11001100
const Metadata& metadata,
11011101
const ChangeInvisibleDurationRequest& request,
11021102
std::chrono::milliseconds timeout,
1103-
const std::function<void(const std::error_code&)>& completion_callback) {
1103+
const std::function<void(const std::error_code&, const ChangeInvisibleDurationResponse&)>& completion_callback) {
1104+
11041105
RpcClientSharedPtr client = getRpcClient(target_host);
11051106
assert(client);
11061107
auto invocation_context = new InvocationContext<ChangeInvisibleDurationResponse>();
@@ -1118,7 +1119,7 @@ void ClientManagerImpl::changeInvisibleDuration(
11181119
SPDLOG_WARN("Failed to write Nack request to wire. gRPC-code: {}, gRPC-message: {}",
11191120
invocation_context->status.error_code(), invocation_context->status.error_message());
11201121
std::error_code ec = ErrorCode::RequestTimeout;
1121-
completion_callback(ec);
1122+
completion_callback(ec, invocation_context->response);
11221123
return;
11231124
}
11241125

@@ -1185,7 +1186,7 @@ void ClientManagerImpl::changeInvisibleDuration(
11851186
break;
11861187
}
11871188
}
1188-
completion_callback(ec);
1189+
completion_callback(ec, invocation_context->response);
11891190
};
11901191
invocation_context->callback = callback;
11911192
client->asyncChangeInvisibleDuration(request, invocation_context);

cpp/source/client/include/ClientManager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class ClientManager {
7070

7171
virtual void changeInvisibleDuration(const std::string& target_host, const Metadata& metadata,
7272
const ChangeInvisibleDurationRequest&, std::chrono::milliseconds timeout,
73-
const std::function<void(const std::error_code&)>&) = 0;
73+
const std::function<void(const std::error_code&, const ChangeInvisibleDurationResponse&)>&) = 0;
7474

7575
virtual void forwardMessageToDeadLetterQueue(
7676
const std::string& target_host, const Metadata& metadata, const ForwardMessageToDeadLetterQueueRequest& request,

cpp/source/client/include/ClientManagerImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share
148148
const Metadata& metadata,
149149
const ChangeInvisibleDurationRequest&,
150150
std::chrono::milliseconds timeout,
151-
const std::function<void(const std::error_code&)>&) override;
151+
const std::function<void(const std::error_code&, const ChangeInvisibleDurationResponse&)>&) override;
152152

153153
void forwardMessageToDeadLetterQueue(const std::string& target_host,
154154
const Metadata& metadata,

cpp/source/rocketmq/PushConsumerImpl.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,13 @@ void PushConsumerImpl::nack(const Message& message, const std::function<void(con
413413
request.set_message_id(message.id());
414414
request.mutable_invisible_duration()->CopyFrom(
415415
google::protobuf::util::TimeUtil::MillisecondsToDuration(duration.count()));
416+
417+
auto cb =
418+
[callback](const std::error_code& ec, const ChangeInvisibleDurationResponse& response) {
419+
callback(ec);
420+
};
416421
client_manager_->changeInvisibleDuration(target_host, metadata, request,
417-
absl::ToChronoMilliseconds(client_config_.request_timeout), callback);
422+
absl::ToChronoMilliseconds(client_config_.request_timeout), cb);
418423
}
419424

420425
void PushConsumerImpl::forwardToDeadLetterQueue(const Message& message,

cpp/source/rocketmq/SimpleConsumer.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,22 +89,27 @@ void SimpleConsumer::asyncAck(const Message& message, AckCallback callback) {
8989
impl_->ackAsync(message, callback);
9090
}
9191

92-
void SimpleConsumer::changeInvisibleDuration(const Message& message,
92+
void SimpleConsumer::changeInvisibleDuration(const Message& message, std::string& receipt_handle,
9393
std::chrono::milliseconds duration,
9494
std::error_code& ec) {
9595
auto mtx = std::make_shared<absl::Mutex>();
9696
auto cv = std::make_shared<absl::CondVar>();
9797
bool completed = false;
98-
auto callback = [&, mtx, cv](const std::error_code& code) {
98+
99+
auto callback =
100+
[&, mtx, cv](const std::error_code& code, std::string& server_receipt_handle) {
99101
{
100102
absl::MutexLock lk(mtx.get());
101103
completed = true;
102104
ec = code;
105+
if (!ec) {
106+
receipt_handle = server_receipt_handle;
107+
}
103108
}
104109
cv->Signal();
105110
};
106111

107-
impl_->changeInvisibleDuration(message, duration, callback);
112+
impl_->changeInvisibleDuration(message, receipt_handle, duration, callback);
108113

109114
{
110115
absl::MutexLock lk(mtx.get());
@@ -114,10 +119,10 @@ void SimpleConsumer::changeInvisibleDuration(const Message& message,
114119
}
115120
}
116121

117-
void SimpleConsumer::asyncChangeInvisibleDuration(const Message& message,
122+
void SimpleConsumer::asyncChangeInvisibleDuration(const Message& message, std::string& receipt_handle,
118123
std::chrono::milliseconds duration,
119124
ChangeInvisibleDurationCallback callback) {
120-
impl_->changeInvisibleDuration(message, duration, callback);
125+
impl_->changeInvisibleDuration(message, receipt_handle, duration, callback);
121126
}
122127

123128
SimpleConsumer SimpleConsumerBuilder::build() {

cpp/source/rocketmq/SimpleConsumerImpl.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,9 +409,9 @@ void SimpleConsumerImpl::ackAsync(const Message& message, AckCallback callback)
409409
absl::ToChronoMilliseconds(client_config_.request_timeout), callback);
410410
}
411411

412-
void SimpleConsumerImpl::changeInvisibleDuration(const Message& message,
412+
void SimpleConsumerImpl::changeInvisibleDuration(const Message& message, std::string& receipt_handle,
413413
std::chrono::milliseconds duration,
414-
ChangeInvisibleDurationCallback callback) {
414+
const ChangeInvisibleDurationCallback callback) {
415415
Metadata metadata;
416416
Signature::sign(client_config_, metadata);
417417

@@ -420,11 +420,18 @@ void SimpleConsumerImpl::changeInvisibleDuration(const Message& message,
420420
request.mutable_topic()->set_resource_namespace(resourceNamespace());
421421
request.mutable_topic()->set_name(message.topic());
422422
request.set_message_id(message.id());
423-
request.set_receipt_handle(message.extension().receipt_handle);
423+
request.set_receipt_handle(receipt_handle);
424424
auto d = google::protobuf::util::TimeUtil::MillisecondsToDuration(duration.count());
425425
request.mutable_invisible_duration()->CopyFrom(d);
426426

427-
manager()->changeInvisibleDuration(message.extension().target_endpoint, metadata, request, duration, callback);
427+
auto cb =
428+
[callback](const std::error_code& ec, const ChangeInvisibleDurationResponse& response) {
429+
std::string server_receipt_handle = response.receipt_handle();
430+
callback(ec, server_receipt_handle);
431+
};
432+
433+
manager()->changeInvisibleDuration(
434+
message.extension().target_endpoint, metadata, request, duration, cb);
428435
}
429436

430437
ROCKETMQ_NAMESPACE_END

cpp/source/rocketmq/include/SimpleConsumerImpl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ class SimpleConsumerImpl : virtual public ClientImpl, public std::enable_shared_
5353

5454
void ackAsync(const Message& message, AckCallback callback);
5555

56-
void changeInvisibleDuration(const Message& message,
56+
void changeInvisibleDuration(const Message& message, std::string& receipt_handle,
5757
std::chrono::milliseconds duration,
58-
ChangeInvisibleDurationCallback callback);
58+
const ChangeInvisibleDurationCallback callback);
5959

6060
void withReceiveMessageTimeout(std::chrono::milliseconds receive_timeout) {
6161
long_polling_duration_ = receive_timeout;

0 commit comments

Comments
 (0)