Skip to content

Commit 285464d

Browse files
authored
Fixed WAL storing for message deduplication by id (#29858)
1 parent 14d3397 commit 285464d

File tree

7 files changed

+161
-87
lines changed

7 files changed

+161
-87
lines changed

ydb/core/persqueue/pqtablet/partition/message_id_deduplicator.cpp

Lines changed: 81 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,21 @@ namespace NKikimr::NPQ {
1010

1111
namespace {
1212

13-
constexpr TDuration BucketSize = TDuration::MilliSeconds(100);
13+
constexpr TDuration MaxDeduplicationWindow = TDuration::Minutes(5);
14+
constexpr TDuration BucketSize = TDuration::Seconds(1);
15+
constexpr size_t MaxRPS = 1000;
16+
constexpr size_t MaxBucketCount = MaxDeduplicationWindow.MilliSeconds() / BucketSize.MilliSeconds();
17+
constexpr ui64 MaxDeduplicationIDs = MaxDeduplicationWindow.Seconds() * MaxRPS;
1418

1519
TInstant Trim(TInstant value) {
16-
return TInstant::MilliSeconds(value.MilliSeconds() / BucketSize.MilliSeconds() * BucketSize.MilliSeconds() + BucketSize.MilliSeconds());
20+
return TInstant::MilliSeconds(value.MilliSeconds() / BucketSize.MilliSeconds() * BucketSize.MilliSeconds());
1721
}
1822

1923
}
2024

21-
TMessageIdDeduplicator::TMessageIdDeduplicator(TIntrusivePtr<ITimeProvider> timeProvider, TDuration deduplicationWindow)
22-
: TimeProvider(timeProvider)
25+
TMessageIdDeduplicator::TMessageIdDeduplicator(const TPartitionId& partitionId, TIntrusivePtr<ITimeProvider> timeProvider, TDuration deduplicationWindow)
26+
: PartitionId(partitionId)
27+
, TimeProvider(timeProvider)
2328
, DeduplicationWindow(deduplicationWindow)
2429
{
2530
}
@@ -30,21 +35,17 @@ TDuration TMessageIdDeduplicator::GetDeduplicationWindow() const {
3035
return DeduplicationWindow;
3136
}
3237

33-
TInstant TMessageIdDeduplicator::GetExpirationTime() const {
34-
return Trim(TimeProvider->Now()) + DeduplicationWindow;
35-
}
36-
3738
std::optional<ui64> TMessageIdDeduplicator::AddMessage(const TString& deduplicationId, const ui64 offset) {
3839
auto it = Messages.find(deduplicationId);
3940
if (it != Messages.end()) {
4041
return it->second;
4142
}
4243

43-
const auto now = Trim(TimeProvider->Now());
44+
const auto now = TimeProvider->Now();
4445
const auto expirationTime = now + DeduplicationWindow;
4546

4647
if (!CurrentBucket.StartTime) {
47-
CurrentBucket.StartTime = now;
48+
CurrentBucket.StartTime = Trim(now) + DeduplicationWindow;
4849
CurrentBucket.StartMessageIndex = Queue.size();
4950
}
5051

@@ -61,7 +62,7 @@ size_t TMessageIdDeduplicator::Compact() {
6162

6263
while (!Queue.empty()) {
6364
const auto& message = Queue.front();
64-
if (message.ExpirationTime > now) {
65+
if (Queue.size() <= MaxDeduplicationIDs && message.ExpirationTime > now) {
6566
break;
6667
}
6768
auto it = Messages.find(message.DeduplicationId);
@@ -72,14 +73,18 @@ size_t TMessageIdDeduplicator::Compact() {
7273
++removed;
7374
}
7475

76+
while (!WALKeys.empty() && (WALKeys.front().ExpirationTime <= now || WALKeys.size() > MaxBucketCount)) {
77+
WALKeys.pop_front();
78+
}
79+
7580
auto normalize = [&](size_t value) {
7681
return value > removed ? value - removed : 0;
7782
};
7883

7984
auto compactBucket = [&](TBucket& bucket) {
8085
bucket.StartMessageIndex = normalize(bucket.StartMessageIndex);
8186
bucket.LastWrittenMessageIndex = normalize(bucket.LastWrittenMessageIndex);
82-
bucket.StartTime = Queue.empty() ? TInstant::Zero() : Queue.front().ExpirationTime;
87+
bucket.StartTime = Queue.empty() ? TInstant::Zero() : Trim(Queue.front().ExpirationTime);
8388
};
8489

8590
compactBucket(CurrentBucket);
@@ -98,90 +103,128 @@ void TMessageIdDeduplicator::Commit() {
98103
}
99104
}
100105

101-
bool TMessageIdDeduplicator::ApplyWAL(NKikimrPQ::TMessageDeduplicationIdWAL&& wal) {
106+
bool TMessageIdDeduplicator::ApplyWAL(TString&& key, NKikimrPQ::TMessageDeduplicationIdWAL&& wal) {
107+
const auto now = TimeProvider->Now();
102108
CurrentBucket.StartMessageIndex = Queue.size();
103109

104-
auto expirationTime = TInstant::MilliSeconds(wal.GetExpirationTimestampMilliseconds());
110+
size_t offset = 0;
111+
TInstant expirationTime = TInstant::Zero();
112+
105113
for (auto& message : *wal.MutableMessage()) {
114+
offset += message.GetOffsetDelta();
115+
expirationTime += TDuration::MilliSeconds(message.GetExpirationTimestampMillisecondsDelta());
116+
if (expirationTime <= now) {
117+
continue;
118+
}
119+
106120
auto it = Messages.find(message.GetDeduplicationId());
107-
if (it != Messages.end() && it->second >= message.GetOffset()) {
121+
if (it != Messages.end() && it->second >= offset) {
108122
continue;
109123
}
110-
Queue.emplace_back(message.GetDeduplicationId(), expirationTime, message.GetOffset());
111-
Messages[std::move(*message.MutableDeduplicationId())] = message.GetOffset();
124+
Queue.emplace_back(message.GetDeduplicationId(), expirationTime, offset);
125+
Messages[std::move(*message.MutableDeduplicationId())] = offset;
112126
}
113127

114128
CurrentBucket.LastWrittenMessageIndex = Queue.size();
115-
CurrentBucket.StartTime = Queue.empty() ? TInstant::Zero() : Queue.back().ExpirationTime;
129+
CurrentBucket.StartTime = Queue.empty() ? TInstant::Zero() : Trim(Queue.back().ExpirationTime);
130+
131+
WALKeys.emplace_back(std::move(key), expirationTime);
116132

117133
return true;
118134
}
119135

120-
bool TMessageIdDeduplicator::SerializeTo(NKikimrPQ::TMessageDeduplicationIdWAL& wal) {
136+
std::optional<TString> TMessageIdDeduplicator::SerializeTo(NKikimrPQ::TMessageDeduplicationIdWAL& wal) {
121137
if (Queue.empty() || !HasChanges) {
122-
return false;
138+
return std::nullopt;
123139
}
124140

125-
const auto expirationTime = Queue.back().ExpirationTime;
126-
const bool sameBucket = CurrentBucket.StartTime == expirationTime;
141+
const bool sameBucket = (CurrentBucket.StartTime > Queue.back().ExpirationTime - BucketSize) && (Queue.size() - CurrentBucket.StartMessageIndex <= MaxBucketCount);
127142
size_t startIndex = sameBucket ? CurrentBucket.StartMessageIndex : CurrentBucket.LastWrittenMessageIndex;
128143
if (startIndex == Queue.size()) {
129-
return false;
144+
return std::nullopt;
130145
}
131146

132-
wal.SetExpirationTimestampMilliseconds(expirationTime.MilliSeconds());
147+
ui64 lastOffset = 0;
148+
TInstant lastExpirationTime = TInstant::Zero();
149+
133150
for (size_t i = startIndex; i < Queue.size(); ++i) {
134-
Queue[i].ExpirationTime = expirationTime;
151+
auto messageTime = std::max(Queue[i].ExpirationTime, lastExpirationTime);
152+
135153
auto* message = wal.AddMessage();
136-
message->SetOffset(Queue[i].Offset);
154+
message->SetOffsetDelta(Queue[i].Offset - lastOffset);
137155
message->SetDeduplicationId(Queue[i].DeduplicationId);
156+
message->SetExpirationTimestampMillisecondsDelta(messageTime.MilliSeconds() - lastExpirationTime.MilliSeconds());
157+
158+
lastOffset = Queue[i].Offset;
159+
lastExpirationTime = messageTime;
138160
}
139161

140162
PendingBucket = {
141-
.StartTime = expirationTime,
163+
.StartTime = Trim(lastExpirationTime),
142164
.StartMessageIndex = startIndex,
143165
.LastWrittenMessageIndex = Queue.size(),
144166
};
145167

146-
return true;
168+
if (WALKeys.empty() || !sameBucket) {
169+
WALKeys.emplace_back(MakeDeduplicatorWALKey(PartitionId.OriginalPartitionId, NextMessageIdDeduplicatorWAL), lastExpirationTime);
170+
NextMessageIdDeduplicatorWAL++;
171+
} else {
172+
WALKeys.back().ExpirationTime = lastExpirationTime;
173+
}
174+
175+
return WALKeys.back().Key;
176+
}
177+
178+
TString TMessageIdDeduplicator::GetFirstActualWAL() const {
179+
if (WALKeys.empty()) {
180+
return MakeDeduplicatorWALKey(PartitionId.OriginalPartitionId, NextMessageIdDeduplicatorWAL);
181+
}
182+
return WALKeys.front().Key;
147183
}
148184

149185
const std::deque<TMessageIdDeduplicator::TMessage>& TMessageIdDeduplicator::GetQueue() const {
150186
return Queue;
151187
}
152188

153-
TString MakeDeduplicatorWALKey(ui32 partitionId, const TInstant& expirationTime) {
189+
TString MakeDeduplicatorWALKey(ui32 partitionId, ui64 id) {
154190
static constexpr char WALSeparator = '|';
155191

156192
TKeyPrefix ikey(TKeyPrefix::EType::TypeDeduplicator, TPartitionId(partitionId));
157193
ikey.Append(WALSeparator);
158194

159-
auto bucket = Sprintf("%.16llX", expirationTime.MilliSeconds() / BucketSize.MilliSeconds());
195+
auto bucket = Sprintf("%.16llX", id);
160196
ikey.Append(bucket.data(), bucket.size());
161197

162198
return ikey.ToString();
163199
}
164200

165-
void TPartition::AddMessageDeduplicatorKeys(TEvKeyValue::TEvRequest* request) {
201+
bool TPartition::AddMessageDeduplicatorKeys(TEvKeyValue::TEvRequest* request) {
202+
if (MirroringEnabled(Config) || Partition.IsSupportivePartition()) {
203+
return false;
204+
}
205+
166206
MessageIdDeduplicator.Compact();
167207

168-
NKikimrPQ::TMessageDeduplicationIdWAL wal;
169-
if (MessageIdDeduplicator.SerializeTo(wal)) {
170-
auto expirationTime = TInstant::MilliSeconds(wal.GetExpirationTimestampMilliseconds());
208+
bool hasChanges = false;
171209

210+
NKikimrPQ::TMessageDeduplicationIdWAL wal;
211+
if (auto key = MessageIdDeduplicator.SerializeTo(wal); key) {
172212
auto* writeWAL = request->Record.AddCmdWrite();
173-
writeWAL->SetKey(MakeDeduplicatorWALKey(Partition.OriginalPartitionId, expirationTime));
213+
writeWAL->SetKey(key.value());
174214
writeWAL->SetValue(wal.SerializeAsString());
175215
if (writeWAL->GetValue().size() < 1000) {
176216
writeWAL->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
177217
}
218+
hasChanges = true;
178219
}
179220

180221
auto* deleteExpired = request->Record.AddCmdDeleteRange();
181-
deleteExpired->MutableRange()->SetFrom(MakeDeduplicatorWALKey(Partition.OriginalPartitionId, TInstant::Zero()));
182-
deleteExpired->MutableRange()->SetTo(MakeDeduplicatorWALKey(Partition.OriginalPartitionId, MessageIdDeduplicator.GetExpirationTime()));
222+
deleteExpired->MutableRange()->SetFrom(MakeDeduplicatorWALKey(Partition.OriginalPartitionId, 0));
223+
deleteExpired->MutableRange()->SetTo(MessageIdDeduplicator.GetFirstActualWAL());
183224
deleteExpired->MutableRange()->SetIncludeFrom(true);
184-
deleteExpired->MutableRange()->SetIncludeTo(true);
225+
deleteExpired->MutableRange()->SetIncludeTo(false);
226+
227+
return hasChanges;
185228
}
186229

187230
std::optional<ui64> TPartition::DeduplicateByMessageId(const TEvPQ::TEvWrite::TMsg& msg, const ui64 offset) {

ydb/core/persqueue/pqtablet/partition/message_id_deduplicator.h

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ class TMessageDeduplicationIdWAL;
1111

1212
namespace NKikimr::NPQ {
1313

14-
TString MakeDeduplicatorWALKey(ui32 partitionId, const TInstant& expirationTime);
14+
class TPartitionId;
15+
16+
TString MakeDeduplicatorWALKey(ui32 partitionId, ui64 id);
1517

1618
class TMessageIdDeduplicator {
1719
public:
@@ -23,23 +25,26 @@ class TMessageIdDeduplicator {
2325
bool operator==(const TMessage& other) const = default;
2426
};
2527

26-
TMessageIdDeduplicator(TIntrusivePtr<ITimeProvider> timeProvider = CreateDefaultTimeProvider(), TDuration deduplicationWindow = TDuration::Minutes(5));
28+
TMessageIdDeduplicator(const TPartitionId& partitionId, TIntrusivePtr<ITimeProvider> timeProvider = CreateDefaultTimeProvider(), TDuration deduplicationWindow = TDuration::Minutes(5));
2729
~TMessageIdDeduplicator();
2830

2931
TDuration GetDeduplicationWindow() const;
30-
TInstant GetExpirationTime() const;
3132

3233
std::optional<ui64> AddMessage(const TString& deduplicationId, const ui64 offset);
3334
size_t Compact();
3435

3536
void Commit();
3637

37-
bool ApplyWAL(NKikimrPQ::TMessageDeduplicationIdWAL&& wal);
38-
bool SerializeTo(NKikimrPQ::TMessageDeduplicationIdWAL& wal);
38+
bool ApplyWAL(TString&& key, NKikimrPQ::TMessageDeduplicationIdWAL&& wal);
39+
std::optional<TString> SerializeTo(NKikimrPQ::TMessageDeduplicationIdWAL& wal);
40+
TString GetFirstActualWAL() const;
3941

4042
const std::deque<TMessage>& GetQueue() const;
4143

44+
ui64 NextMessageIdDeduplicatorWAL = 1;
45+
4246
private:
47+
const TPartitionId& PartitionId;
4348
TIntrusivePtr<ITimeProvider> TimeProvider;
4449
TDuration DeduplicationWindow;
4550

@@ -52,9 +57,14 @@ class TMessageIdDeduplicator {
5257
size_t StartMessageIndex = 0;
5358
size_t LastWrittenMessageIndex = 0;
5459
};
55-
5660
TBucket CurrentBucket;
5761
std::optional<TBucket> PendingBucket;
62+
63+
struct WALKey {
64+
TString Key;
65+
TInstant ExpirationTime;
66+
};
67+
std::deque<WALKey> WALKeys;
5868
};
5969

6070
} // namespace NKikimr::NPQ

ydb/core/persqueue/pqtablet/partition/partition.cpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo
365365
, WriteLagMs(TDuration::Minutes(1), 100)
366366
, LastEmittedHeartbeat(TRowVersion::Min())
367367
, SamplingControl(samplingControl)
368+
, MessageIdDeduplicator(Partition, CreateDefaultTimeProvider(), TDuration::Minutes(5))
368369
{
369370
TabletCounters.Populate(*Counters);
370371
TabletCounters.ResetCounters();
@@ -513,6 +514,7 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
513514
//meta.SetEndOffset(Max(BlobEncoder.NewHead.GetNextOffset(), GetEndOffset()));
514515
meta.SetSubDomainOutOfSpace(SubDomainOutOfSpace);
515516
meta.SetEndWriteTimestamp(PendingWriteTimestamp.MilliSeconds());
517+
meta.SetNextMessageIdDeduplicatorWAL(MessageIdDeduplicator.NextMessageIdDeduplicatorWAL);
516518

517519
if (IsSupportive()) {
518520
auto* counterData = meta.MutableCounterData();
@@ -2571,30 +2573,33 @@ void TPartition::RunPersist() {
25712573
EndHandleRequests(PersistRequest.Get(), ctx);
25722574
}
25732575

2574-
if (TryAddDeleteHeadKeysToPersistRequest()) {
2575-
haveChanges = true;
2576-
}
2577-
25782576
if (Compacter) {
25792577
Compacter->TryCompactionIfPossible();
25802578
}
25812579

2582-
if (haveChanges || TxIdHasChanged || !AffectedUsers.empty() || ChangeConfig) {
2580+
haveChanges |= TryAddDeleteHeadKeysToPersistRequest();
2581+
haveChanges |= TxIdHasChanged || !AffectedUsers.empty() || ChangeConfig;
2582+
2583+
if (haveChanges) {
25832584
WriteCycleStartTime = now;
25842585
WriteStartTime = now;
25852586
TopicQuotaWaitTimeForCurrentBlob = TDuration::Zero();
25862587
PartitionQuotaWaitTimeForCurrentBlob = TDuration::Zero();
25872588
WritesTotal.Inc();
25882589
HaveWriteMsg = true;
25892590

2590-
AddMetaKey(PersistRequest.Get());
25912591
AddCmdWriteTxMeta(PersistRequest->Record);
25922592
AddCmdWriteUserInfos(PersistRequest->Record);
25932593
AddCmdWriteConfig(PersistRequest->Record);
25942594
}
25952595

2596-
if (PersistRequest->Record.CmdDeleteRangeSize() || PersistRequest->Record.CmdWriteSize() || PersistRequest->Record.CmdRenameSize()) {
2596+
auto requestEmpty = PersistRequest->Record.CmdDeleteRangeSize() == 0
2597+
&& PersistRequest->Record.CmdWriteSize() == 0
2598+
&& PersistRequest->Record.CmdRenameSize() == 0;
2599+
2600+
if (haveChanges || !requestEmpty) {
25972601
AddMessageDeduplicatorKeys(PersistRequest.Get());
2602+
AddMetaKey(PersistRequest.Get());
25982603

25992604
// Apply counters
26002605
for (const auto& writeInfo : WriteInfosApplied) {

ydb/core/persqueue/pqtablet/partition/partition.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1259,7 +1259,7 @@ class TPartition : public TBaseTabletActor<TPartition> {
12591259
ui64 LastNotifiedEndOffset = 0;
12601260

12611261
TMessageIdDeduplicator MessageIdDeduplicator;
1262-
void AddMessageDeduplicatorKeys(TEvKeyValue::TEvRequest* request);
1262+
bool AddMessageDeduplicatorKeys(TEvKeyValue::TEvRequest* request);
12631263
std::optional<ui64> DeduplicateByMessageId(const TEvPQ::TEvWrite::TMsg& msg, const ui64 offset);
12641264
};
12651265

0 commit comments

Comments
 (0)