Skip to content

Commit 1139cfb

Browse files
authored
[ISSUE #1174] [C++] Optimize the logic for generating the uniqueId of messages (#1175)
1 parent 601aee5 commit 1139cfb

File tree

4 files changed

+48
-23
lines changed

4 files changed

+48
-23
lines changed

cpp/source/base/UniqueIdGenerator.cpp

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "UniqueIdGenerator.h"
1818

1919
#include <cstring>
20+
#include <random>
2021

2122
#include "spdlog/spdlog.h"
2223
#include "MixAll.h"
@@ -62,19 +63,15 @@ std::string UniqueIdGenerator::next() {
6263
Slot slot = {};
6364
{
6465
absl::MutexLock lk(&mtx_);
65-
uint32_t delta = deltaSeconds();
66-
if (seconds_ != delta) {
67-
seconds_ = delta;
68-
sequence_ = 0;
69-
SPDLOG_DEBUG("Second: {} and sequence: {}", seconds_, sequence_);
70-
} else {
71-
sequence_++;
72-
}
73-
slot.seconds = seconds_;
74-
slot.sequence = sequence_;
66+
seconds_ = deltaSeconds();
67+
slot.seconds = absl::big_endian::FromHost32(seconds_);
68+
slot.sequence = absl::big_endian::FromHost32(sequence_);
69+
sequence_++;
7570
}
7671
std::array<uint8_t, 17> raw{};
7772
raw[0] = VERSION;
73+
74+
// 9 bytes prefix: VERSION(1) + MAC(6) + PID(low2)
7875
memcpy(raw.data() + sizeof(VERSION), prefix_.data(), prefix_.size());
7976
memcpy(raw.data() + sizeof(VERSION) + prefix_.size(), &slot, sizeof(slot));
8077
return MixAll::hex(raw.data(), raw.size());
@@ -90,4 +87,41 @@ uint32_t UniqueIdGenerator::deltaSeconds() {
9087
.count();
9188
}
9289

90+
std::string UniqueIdGenerator::nextUuidV4Std() {
91+
std::array<uint8_t, 16> b{};
92+
93+
static thread_local std::random_device rd;
94+
for (size_t i = 0; i < b.size();) {
95+
uint32_t v = static_cast<uint32_t>(rd());
96+
for (int k = 0; k < 4 && i < b.size(); ++k, ++i) {
97+
b[i] = static_cast<uint8_t>(v & 0xFF);
98+
v >>= 8;
99+
}
100+
}
101+
102+
// RFC 4122
103+
b[6] = static_cast<uint8_t>((b[6] & 0x0F) | 0x40);
104+
b[8] = static_cast<uint8_t>((b[8] & 0x3F) | 0x80);
105+
106+
static constexpr char hex[] = "0123456789abcdef";
107+
108+
std::string out;
109+
out.resize(36);
110+
111+
auto put_byte = [&](size_t& p, uint8_t x) {
112+
out[p++] = hex[(x >> 4) & 0xF];
113+
out[p++] = hex[x & 0xF];
114+
};
115+
116+
size_t p = 0;
117+
for (int i = 0; i < 16; ++i) {
118+
if (i == 4 || i == 6 || i == 8 || i == 10) {
119+
out[p++] = '-';
120+
}
121+
put_byte(p, b[i]);
122+
}
123+
124+
return out;
125+
}
126+
93127
ROCKETMQ_NAMESPACE_END

cpp/source/base/include/InvocationContext.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ ROCKETMQ_NAMESPACE_BEGIN
4040
* async_stream.h
4141
*/
4242
struct BaseInvocationContext {
43-
BaseInvocationContext() : request_id_(UniqueIdGenerator::instance().next()) {
43+
BaseInvocationContext() : request_id_(UniqueIdGenerator::nextUuidV4Std()) {
4444
context.AddMetadata(MetadataConstants::REQUEST_ID_KEY, request_id_);
4545
}
4646

cpp/source/base/include/UniqueIdGenerator.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class UniqueIdGenerator {
3333

3434
std::string next() LOCKS_EXCLUDED(mtx_);
3535

36+
static std::string nextUuidV4Std();
37+
3638
UniqueIdGenerator(const UniqueIdGenerator&) = delete;
3739

3840
UniqueIdGenerator(UniqueIdGenerator&&) = delete;

cpp/source/rocketmq/ProcessQueueImpl.cpp

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include <system_error>
2222
#include <utility>
2323

24-
#include "UniqueIdGenerator.h"
2524
#include "AsyncReceiveMessageCallback.h"
2625
#include "MetadataConstants.h"
2726
#include "Protocol.h"
@@ -183,16 +182,6 @@ void ProcessQueueImpl::wrapFilterExpression(rmq::FilterExpression* filter_expres
183182
}
184183
}
185184

186-
void generateAttemptId(std::string& attempt_id) {
187-
const std::string unique_id = UniqueIdGenerator::instance().next();
188-
if (unique_id.size() < 34) {
189-
return;
190-
}
191-
attempt_id = fmt::format(
192-
"{}-{}-{}-{}-{}", unique_id.substr(0, 8), unique_id.substr(8, 4),
193-
unique_id.substr(12, 4), unique_id.substr(16, 4), unique_id.substr(20, 12));
194-
}
195-
196185
void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata,
197186
rmq::ReceiveMessageRequest& request, std::string& attempt_id) {
198187
std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
@@ -216,7 +205,7 @@ void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, st
216205
request.mutable_invisible_duration()->set_nanos(nano_seconds);
217206

218207
if (attempt_id.empty()) {
219-
generateAttemptId(attempt_id);
208+
attempt_id = UniqueIdGenerator::nextUuidV4Std();
220209
}
221210
request.set_attempt_id(attempt_id);
222211
}

0 commit comments

Comments
 (0)