Skip to content

Commit b727270

Browse files
authored
[ISSUE #969] C++ client support attemptId in PushConsumer (#970)
1 parent bd1e7ab commit b727270

File tree

9 files changed

+78
-55
lines changed

9 files changed

+78
-55
lines changed

cpp/source/base/Message.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include <memory>
2121

2222
#include "UniqueIdGenerator.h"
23-
#include "absl/memory/memory.h"
2423

2524
ROCKETMQ_NAMESPACE_BEGIN
2625

cpp/source/base/Protocol.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ bool operator==(const rmq::MessageQueue& lhs, const rmq::MessageQueue& rhs) {
7272
}
7373

7474
std::string simpleNameOf(const rmq::MessageQueue& m) {
75-
return fmt::format("{}{}-{}-{}", m.topic().resource_namespace(), m.topic().name(), m.id(), m.broker().name());
75+
return fmt::format("{}@{}@{}@{}",
76+
m.topic().resource_namespace(), m.topic().name(), m.id(), m.broker().name());
7677
}
7778

7879
bool operator==(const std::vector<rmq::MessageQueue>& lhs, const std::vector<rmq::MessageQueue>& rhs) {

cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,14 @@ ROCKETMQ_NAMESPACE_BEGIN
3030

3131
AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue> process_queue)
3232
: process_queue_(std::move(process_queue)) {
33-
receive_message_later_ = std::bind(&AsyncReceiveMessageCallback::checkThrottleThenReceive, this);
33+
34+
receive_message_later_ = std::bind(
35+
&AsyncReceiveMessageCallback::checkThrottleThenReceive, this, std::placeholders::_1);
3436
}
3537

36-
void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const ReceiveMessageResult& result) {
38+
void AsyncReceiveMessageCallback::onCompletion(
39+
const std::error_code& ec, std::string& attempt_id, const ReceiveMessageResult& result) {
40+
3741
std::shared_ptr<ProcessQueue> process_queue = process_queue_.lock();
3842
if (!process_queue) {
3943
SPDLOG_INFO("Process queue has been destructed.");
@@ -47,31 +51,32 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const
4751

4852
if (ec == ErrorCode::TooManyRequests) {
4953
SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms. Queue={}", process_queue->simpleName());
50-
receiveMessageLater(std::chrono::milliseconds(20));
54+
receiveMessageLater(std::chrono::milliseconds(20), attempt_id);
5155
return;
5256
}
5357

5458
if (ec == ErrorCode::NoContent) {
55-
checkThrottleThenReceive();
59+
checkThrottleThenReceive("");
5660
return;
5761
}
5862

5963
if (ec) {
60-
SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(), ec.message());
61-
receiveMessageLater(std::chrono::seconds (1));
64+
SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(),
65+
ec.message());
66+
receiveMessageLater(std::chrono::seconds(1), attempt_id);
6267
return;
6368
}
6469

6570
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=FOUND, msgListSize={}, queue={}",
6671
result.source_host, result.messages.size(), process_queue->simpleName());
6772
process_queue->accountCache(result.messages);
6873
consumer->getConsumeMessageService()->dispatch(process_queue, result.messages);
69-
checkThrottleThenReceive();
74+
checkThrottleThenReceive("");
7075
}
7176

7277
const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME = "receive-later-task";
7378

74-
void AsyncReceiveMessageCallback::checkThrottleThenReceive() {
79+
void AsyncReceiveMessageCallback::checkThrottleThenReceive(std::string attempt_id) {
7580
auto process_queue = process_queue_.lock();
7681
if (!process_queue) {
7782
SPDLOG_WARN("Process queue should have been destructed");
@@ -82,14 +87,14 @@ void AsyncReceiveMessageCallback::checkThrottleThenReceive() {
8287
SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive messages later.",
8388
process_queue->simpleName());
8489
process_queue->syncIdleState();
85-
receiveMessageLater(std::chrono::seconds(1));
90+
receiveMessageLater(std::chrono::seconds(1), attempt_id);
8691
} else {
8792
// Receive message immediately
88-
receiveMessageImmediately();
93+
receiveMessageImmediately(attempt_id);
8994
}
9095
}
9196

92-
void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds delay) {
97+
void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds delay, std::string& attempt_id) {
9398
auto process_queue = process_queue_.lock();
9499
if (!process_queue) {
95100
return;
@@ -98,17 +103,18 @@ void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
98103
auto client_instance = process_queue->getClientManager();
99104
std::weak_ptr<AsyncReceiveMessageCallback> receive_callback_weak_ptr(shared_from_this());
100105

101-
auto task = [receive_callback_weak_ptr]() {
106+
auto task = [receive_callback_weak_ptr, &attempt_id]() {
102107
auto async_receive_ptr = receive_callback_weak_ptr.lock();
103108
if (async_receive_ptr) {
104-
async_receive_ptr->checkThrottleThenReceive();
109+
async_receive_ptr->checkThrottleThenReceive(attempt_id);
105110
}
106111
};
107112

108-
client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
113+
client_instance->getScheduler()->schedule(
114+
task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
109115
}
110116

111-
void AsyncReceiveMessageCallback::receiveMessageImmediately() {
117+
void AsyncReceiveMessageCallback::receiveMessageImmediately(std::string& attempt_id) {
112118
auto process_queue_shared_ptr = process_queue_.lock();
113119
if (!process_queue_shared_ptr) {
114120
SPDLOG_INFO("ProcessQueue has been released. Ignore further receive message request-response cycles");
@@ -121,7 +127,9 @@ void AsyncReceiveMessageCallback::receiveMessageImmediately() {
121127
process_queue_shared_ptr->simpleName());
122128
return;
123129
}
124-
impl->receiveMessage(process_queue_shared_ptr->messageQueue(), process_queue_shared_ptr->getFilterExpression());
130+
131+
impl->receiveMessage(process_queue_shared_ptr->messageQueue(),
132+
process_queue_shared_ptr->getFilterExpression(), attempt_id);
125133
}
126134

127135
ROCKETMQ_NAMESPACE_END

cpp/source/rocketmq/ProcessQueueImpl.cpp

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
*/
1717
#include "ProcessQueueImpl.h"
1818

19-
#include <atomic>
2019
#include <chrono>
2120
#include <memory>
2221
#include <system_error>
2322
#include <utility>
2423

24+
#include "UniqueIdGenerator.h"
2525
#include "AsyncReceiveMessageCallback.h"
26-
#include "ClientManagerImpl.h"
2726
#include "MetadataConstants.h"
2827
#include "Protocol.h"
2928
#include "PushConsumerImpl.h"
@@ -98,39 +97,39 @@ bool ProcessQueueImpl::shouldThrottle() const {
9897
return false;
9998
}
10099

101-
void ProcessQueueImpl::receiveMessage() {
100+
void ProcessQueueImpl::receiveMessage(std::string& attempt_id) {
102101
auto consumer = consumer_.lock();
103102
if (!consumer) {
104103
return;
105104
}
106-
107-
popMessage();
105+
popMessage(attempt_id);
108106
}
109107

110-
void ProcessQueueImpl::popMessage() {
108+
void ProcessQueueImpl::popMessage(std::string& attempt_id) {
111109
rmq::ReceiveMessageRequest request;
112110
absl::flat_hash_map<std::string, std::string> metadata;
113111
auto consumer_client = consumer_.lock();
114112
if (!consumer_client) {
115113
return;
116114
}
115+
117116
Signature::sign(consumer_client->config(), metadata);
118-
wrapPopMessageRequest(metadata, request);
117+
wrapPopMessageRequest(metadata, request, attempt_id);
119118
syncIdleState();
120-
SPDLOG_DEBUG("Try to pop message from {}", simpleNameOf(message_queue_));
119+
SPDLOG_DEBUG("Receive message from={}, attemptId={}", simpleNameOf(message_queue_), attempt_id);
121120

122121
std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_};
123-
auto callback = [cb](const std::error_code& ec, const ReceiveMessageResult& result) {
124-
std::shared_ptr<AsyncReceiveMessageCallback> recv_cb = cb.lock();
125-
if (recv_cb) {
126-
recv_cb->onCompletion(ec, result);
122+
auto callback =
123+
[cb, &attempt_id](const std::error_code& ec, const ReceiveMessageResult& result) {
124+
std::shared_ptr<AsyncReceiveMessageCallback> receive_cb = cb.lock();
125+
if (receive_cb) {
126+
receive_cb->onCompletion(ec, attempt_id, result);
127127
}
128128
};
129129

130-
client_manager_->receiveMessage(urlOf(message_queue_), metadata, request,
131-
absl::ToChronoMilliseconds(consumer_client->config().subscriber.polling_timeout +
132-
consumer_client->config().request_timeout),
133-
callback);
130+
auto timeout = absl::ToChronoMilliseconds(
131+
consumer_client->config().subscriber.polling_timeout + consumer_client->config().request_timeout);
132+
client_manager_->receiveMessage(urlOf(message_queue_), metadata, request, timeout, callback);
134133
}
135134

136135
void ProcessQueueImpl::accountCache(const std::vector<MessageConstSharedPtr>& messages) {
@@ -184,8 +183,18 @@ void ProcessQueueImpl::wrapFilterExpression(rmq::FilterExpression* filter_expres
184183
}
185184
}
186185

186+
void generateAttemptId(std::string& attempt_id) {
187+
const std::string unique_id = UniqueIdGenerator::instance().next();
188+
if (unique_id.size() < 34) {
189+
return;
190+
}
191+
attempt_id = fmt::format(
192+
"{}-{}-{}-{}-{}", unique_id.substr(0, 8), unique_id.substr(8, 4),
193+
unique_id.substr(12, 4), unique_id.substr(16, 4), unique_id.substr(20, 12));
194+
}
195+
187196
void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata,
188-
rmq::ReceiveMessageRequest& request) {
197+
rmq::ReceiveMessageRequest& request, std::string& attempt_id) {
189198
std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
190199
assert(consumer);
191200
request.mutable_group()->CopyFrom(consumer->config().subscriber.group);
@@ -205,6 +214,11 @@ void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, st
205214
auto fraction = invisible_time_ - std::chrono::duration_cast<std::chrono::seconds>(invisible_time_);
206215
int32_t nano_seconds = static_cast<int32_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(fraction).count());
207216
request.mutable_invisible_duration()->set_nanos(nano_seconds);
217+
218+
if (attempt_id.empty()) {
219+
generateAttemptId(attempt_id);
220+
}
221+
request.set_attempt_id(attempt_id);
208222
}
209223

210224
std::weak_ptr<PushConsumerImpl> ProcessQueueImpl::getConsumer() {

cpp/source/rocketmq/PushConsumerImpl.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616
*/
1717
#include "PushConsumerImpl.h"
1818

19-
#include <atomic>
2019
#include <cassert>
2120
#include <chrono>
22-
#include <cstdint>
2321
#include <cstdlib>
2422
#include <string>
2523
#include <system_error>
@@ -324,9 +322,10 @@ void PushConsumerImpl::syncProcessQueue(const std::string& topic,
324322
for (const auto& message_queue : message_queue_list) {
325323
if (std::none_of(current.cbegin(), current.cend(),
326324
[&](const rmq::MessageQueue& item) { return item == message_queue; })) {
327-
SPDLOG_INFO("Start to receive message from {} according to latest assignment info from load balancer",
325+
SPDLOG_DEBUG("Start to receive message from {} according to latest assignment info from load balancer",
328326
simpleNameOf(message_queue));
329-
if (!receiveMessage(message_queue, filter_expression)) {
327+
std::string attempt_id;
328+
if (!receiveMessage(message_queue, filter_expression, attempt_id)) {
330329
if (!active()) {
331330
SPDLOG_WARN("Failed to initiate receive message request-response-cycle for {}", simpleNameOf(message_queue));
332331
// TODO: remove it from current assignment such that a second attempt will be made again in the next round.
@@ -350,9 +349,9 @@ std::shared_ptr<ProcessQueue> PushConsumerImpl::getOrCreateProcessQueue(const rm
350349
process_queue = process_queue_table_.at(simpleNameOf(message_queue));
351350
} else {
352351
SPDLOG_INFO("Create ProcessQueue for message queue[{}]", simpleNameOf(message_queue));
353-
// create ProcessQueue
354-
process_queue =
355-
std::make_shared<ProcessQueueImpl>(message_queue, filter_expression, shared_from_this(), client_manager_);
352+
// create process queue object
353+
process_queue = std::make_shared<ProcessQueueImpl>(
354+
message_queue, filter_expression, shared_from_this(), client_manager_);
356355
std::shared_ptr<AsyncReceiveMessageCallback> receive_callback =
357356
std::make_shared<AsyncReceiveMessageCallback>(process_queue);
358357
process_queue->callback(receive_callback);
@@ -363,7 +362,8 @@ std::shared_ptr<ProcessQueue> PushConsumerImpl::getOrCreateProcessQueue(const rm
363362
}
364363

365364
bool PushConsumerImpl::receiveMessage(const rmq::MessageQueue& message_queue,
366-
const FilterExpression& filter_expression) {
365+
const FilterExpression& filter_expression,
366+
std::string& attempt_id) {
367367
if (!active()) {
368368
SPDLOG_INFO("PushConsumer has stopped. Drop further receive message request");
369369
return false;
@@ -379,7 +379,7 @@ bool PushConsumerImpl::receiveMessage(const rmq::MessageQueue& message_queue,
379379
SPDLOG_ERROR("Failed to resolve address for brokerName={}", message_queue.broker().name());
380380
return false;
381381
}
382-
process_queue_ptr->receiveMessage();
382+
process_queue_ptr->receiveMessage(attempt_id);
383383
return true;
384384
}
385385

cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ class AsyncReceiveMessageCallback : public std::enable_shared_from_this<AsyncRec
2929
public:
3030
explicit AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue> process_queue);
3131

32-
void onCompletion(const std::error_code& ec, const ReceiveMessageResult& result);
32+
void onCompletion(const std::error_code& ec, std::string& attempt_id, const ReceiveMessageResult& result);
3333

34-
void receiveMessageLater(std::chrono::milliseconds delay);
34+
void receiveMessageLater(std::chrono::milliseconds delay, std::string& attempt_id);
3535

36-
void receiveMessageImmediately();
36+
void receiveMessageImmediately(std::string& attempt_id);
3737

3838
private:
3939
/**
@@ -42,9 +42,9 @@ class AsyncReceiveMessageCallback : public std::enable_shared_from_this<AsyncRec
4242
*/
4343
std::weak_ptr<ProcessQueue> process_queue_;
4444

45-
std::function<void(void)> receive_message_later_;
45+
std::function<void(std::string)> receive_message_later_;
4646

47-
void checkThrottleThenReceive();
47+
void checkThrottleThenReceive(std::string attempt_id);
4848

4949
static const char* RECEIVE_LATER_TASK_NAME;
5050
};

cpp/source/rocketmq/include/ProcessQueue.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ProcessQueue {
3838

3939
virtual void callback(std::shared_ptr<AsyncReceiveMessageCallback> callback) = 0;
4040

41-
virtual void receiveMessage() = 0;
41+
virtual void receiveMessage(std::string& attempt_id) = 0;
4242

4343
virtual std::string topic() const = 0;
4444

cpp/source/rocketmq/include/ProcessQueueImpl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class ProcessQueueImpl : virtual public ProcessQueue {
6363

6464
std::shared_ptr<ClientManager> getClientManager() override;
6565

66-
void receiveMessage() override;
66+
void receiveMessage(std::string& attempt_id) override;
6767

6868
const std::string& simpleName() const override {
6969
return simple_name_;
@@ -127,10 +127,10 @@ class ProcessQueueImpl : virtual public ProcessQueue {
127127
*/
128128
std::atomic<uint64_t> cached_message_memory_;
129129

130-
void popMessage();
130+
void popMessage(std::string& attempt_id);
131131

132132
void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata,
133-
rmq::ReceiveMessageRequest& request);
133+
rmq::ReceiveMessageRequest& request, std::string& attempt_id);
134134

135135
void wrapFilterExpression(rmq::FilterExpression* filter_expression);
136136
};

cpp/source/rocketmq/include/PushConsumerImpl.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ class PushConsumerImpl : virtual public ClientImpl, public std::enable_shared_fr
9999
const FilterExpression& filter_expression)
100100
LOCKS_EXCLUDED(process_queue_table_mtx_);
101101

102-
bool receiveMessage(const rmq::MessageQueue& message_queue, const FilterExpression& filter_expression)
103-
LOCKS_EXCLUDED(process_queue_table_mtx_);
102+
bool receiveMessage(const rmq::MessageQueue& message_queue,
103+
const FilterExpression& filter_expression,
104+
std::string& attempt_id) LOCKS_EXCLUDED(process_queue_table_mtx_);
104105

105106
uint32_t consumeThreadPoolSize() const;
106107

0 commit comments

Comments
 (0)