Skip to content

Commit d201e43

Browse files
[+] tracing
1 parent 8ff93bb commit d201e43

File tree

10 files changed

+298
-12
lines changed

10 files changed

+298
-12
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ struct TEvPQ {
198198
EvRunCompaction,
199199
EvMirrorTopicDescription,
200200
EvBroadcastPartitionError,
201+
EvForceCompaction,
201202
EvEnd
202203
};
203204

@@ -1278,6 +1279,15 @@ struct TEvPQ {
12781279

12791280
ui64 BlobsCount = 0;
12801281
};
1282+
1283+
struct TEvForceCompaction : TEventLocal<TEvForceCompaction, EvForceCompaction> {
1284+
explicit TEvForceCompaction(const ui32 partitionId) :
1285+
PartitionId(partitionId)
1286+
{
1287+
}
1288+
1289+
ui32 PartitionId = 0;
1290+
};
12811291
};
12821292

12831293
} //NKikimr

ydb/core/persqueue/pqtablet/partition/partition.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ class TPartition : public TBaseActor<TPartition> {
246246
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
247247
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
248248
void Handle(TEvPQ::TEvRunCompaction::TPtr& ev);
249+
void Handle(TEvPQ::TEvForceCompaction::TPtr& ev);
249250
void Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr& ev);
250251
void Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx);
251252
void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
@@ -596,6 +597,7 @@ class TPartition : public TBaseActor<TPartition> {
596597
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
597598
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
598599
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
600+
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
599601
default:
600602
if (!Initializer.Handle(ev)) {
601603
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
@@ -664,6 +666,7 @@ class TPartition : public TBaseActor<TPartition> {
664666
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
665667
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
666668
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
669+
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
667670
default:
668671
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
669672
break;
@@ -1116,7 +1119,7 @@ class TPartition : public TBaseActor<TPartition> {
11161119
const TEvPQ::TEvBlobResponse* blobResponse,
11171120
const TActorContext& ctx);
11181121

1119-
void TryRunCompaction();
1122+
void TryRunCompaction(bool force = false);
11201123
void BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>& blobs);
11211124
void BlobsForCompactionWereWrite();
11221125
ui64 NextReadCookie();

ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ void TPartition::DumpKeysForBlobsCompaction() const
161161
LOG_D("===================================");
162162
}
163163

164-
void TPartition::TryRunCompaction()
164+
void TPartition::TryRunCompaction(bool force)
165165
{
166166
if (CompactionInProgress) {
167167
LOG_D("Blobs compaction in progress");
@@ -174,12 +174,11 @@ void TPartition::TryRunCompaction()
174174
}
175175

176176
//DumpKeysForBlobsCompaction();
177-
DumpZones(__FILE__, __LINE__);
178177

179178
const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit();
180179
const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound();
181180

182-
if (BlobEncoder.DataKeysBody.size() >= blobsKeyCountLimit) {
181+
if ((BlobEncoder.DataKeysBody.size() >= blobsKeyCountLimit) && !force) {
183182
CompactionInProgress = true;
184183
Send(SelfId(), new TEvPQ::TEvRunCompaction(BlobEncoder.DataKeysBody.size()));
185184
return;
@@ -206,7 +205,7 @@ void TPartition::TryRunCompaction()
206205
}
207206
LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes (" << totalSize << ")");
208207

209-
if (totalSize < GetCumulativeSizeLimit()) {
208+
if ((totalSize < GetCumulativeSizeLimit()) && !force) {
210209
LOG_D("Need more data for compaction. " <<
211210
"Blobs " << BlobEncoder.DataKeysBody.size() <<
212211
", size " << totalSize << " (" << GetCumulativeSizeLimit() << ")");
@@ -220,6 +219,11 @@ void TPartition::TryRunCompaction()
220219
Send(SelfId(), new TEvPQ::TEvRunCompaction(blobsCount));
221220
}
222221

222+
void TPartition::Handle(TEvPQ::TEvForceCompaction::TPtr&)
223+
{
224+
TryRunCompaction(true);
225+
}
226+
223227
void TPartition::Handle(TEvPQ::TEvRunCompaction::TPtr& ev)
224228
{
225229
const ui64 blobsCount = ev->Get()->BlobsCount;
@@ -274,11 +278,14 @@ bool TPartition::CompactRequestedBlob(const TRequestedBlob& requestedBlob,
274278
TMaybe<ui64> firstBlobOffset = requestedBlob.Offset;
275279

276280
for (TBlobIterator it(requestedBlob.Key, requestedBlob.Value); it.IsValid(); it.Next()) {
281+
LOG_D("Compaction: case 18");
277282
TBatch batch = it.GetBatch();
278283
batch.Unpack();
279284

280285
for (const auto& blob : batch.Blobs) {
286+
LOG_D("Compaction: case 19");
281287
if (wasThePreviousBlobBig && blob.PartData && (blob.PartData->PartNo != 0)) {
288+
LOG_D("Compaction: case 20");
282289
// надо продолжить писать большое сообщение
283290
CompactionBlobEncoder.NewHead.PartNo = blob.PartData->PartNo;
284291
CompactionBlobEncoder.NewPartitionedBlob(Partition,
@@ -291,6 +298,8 @@ bool TPartition::CompactRequestedBlob(const TRequestedBlob& requestedBlob,
291298
needToCompactHead,
292299
MaxBlobSize,
293300
blob.PartData->PartNo);
301+
} else {
302+
LOG_D("Compaction: case 21");
294303
}
295304
wasThePreviousBlobBig = false;
296305

@@ -323,12 +332,17 @@ bool TPartition::CompactRequestedBlob(const TRequestedBlob& requestedBlob,
323332

324333
blobCreationUnixTime = std::max(blobCreationUnixTime, blob.WriteTimestamp);
325334
if (!ExecRequestForCompaction(msg, parameters, compactionRequest, blobCreationUnixTime)) {
335+
LOG_D("Compaction: case 22");
326336
return false;
337+
} else {
338+
LOG_D("Compaction: case 23");
327339
}
328340

329341
firstBlobOffset = Nothing();
330342
}
343+
LOG_D("Compaction: case 25");
331344
}
345+
LOG_D("Compaction: case 26");
332346

333347
return true;
334348
}
@@ -342,6 +356,7 @@ void TPartition::RenameCompactedBlob(TDataKey& k,
342356
const auto& ctx = ActorContext();
343357

344358
if (!CompactionBlobEncoder.PartitionedBlob.IsInited()) {
359+
LOG_D("Compaction: case 12");
345360
CompactionBlobEncoder.NewPartitionedBlob(Partition,
346361
CompactionBlobEncoder.NewHead.Offset,
347362
"", // SourceId
@@ -351,22 +366,30 @@ void TPartition::RenameCompactedBlob(TDataKey& k,
351366
parameters.HeadCleared, // headCleared
352367
needToCompactHead, // needCompactHead
353368
MaxBlobSize);
369+
} else {
370+
LOG_D("Compaction: case 13");
354371
}
355372
auto write = CompactionBlobEncoder.PartitionedBlob.Add(k.Key, size, k.Timestamp, false);
356373
if (write && !write->Value.empty()) {
374+
LOG_D("Compaction: case 14");
357375
// надо записать содержимое головы перед первым большим блобом
358376
AddCmdWrite(write, compactionRequest, k.Timestamp, ctx);
359377
CompactionBlobEncoder.CompactedKeys.emplace_back(write->Key, write->Value.size());
378+
} else {
379+
LOG_D("Compaction: case 15");
360380
}
361381

362382
if (const auto& formedBlobs = CompactionBlobEncoder.PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
383+
LOG_D("Compaction: case 16");
363384
ui32 curWrites = RenameTmpCmdWrites(compactionRequest);
364385
RenameFormedBlobs(formedBlobs,
365386
parameters,
366387
curWrites,
367388
compactionRequest,
368389
CompactionBlobEncoder,
369390
ctx);
391+
} else {
392+
LOG_D("Compaction: case 17");
370393
}
371394

372395
k.BlobKeyToken->NeedDelete = false;
@@ -389,8 +412,11 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
389412
if (!CompactionBlobEncoder.Head.GetCount() &&
390413
!CompactionBlobEncoder.NewHead.GetCount() &&
391414
CompactionBlobEncoder.IsEmpty()) {
415+
LOG_D("Compaction: case 09");
392416
// если это первое сообщение, то надо поправить StartOffset
393417
CompactionBlobEncoder.StartOffset = BlobEncoder.StartOffset;
418+
} else {
419+
LOG_D("Compaction: case 10");
394420
}
395421

396422
CompactionBlobEncoder.NewHead.Clear();
@@ -412,15 +438,20 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
412438
DumpZones(__FILE__, __LINE__);
413439

414440
for (size_t i = 0; i < KeysForCompaction.size(); ++i) {
441+
LOG_D("Compaction: case 11");
415442
auto& [k, pos] = KeysForCompaction[i];
416443
bool needToCompactHead = (parameters.CurOffset < k.Key.GetOffset());
417444

418445
if (pos == Max<size_t>()) {
446+
LOG_D("Compaction: case 01");
419447
// большой блоб надо переименовать
420448
LOG_D("Rename key " << k.Key.ToString());
421449

422450
if (!WasTheLastBlobBig) {
451+
LOG_D("Compaction: case 02");
423452
needToCompactHead = true;
453+
} else {
454+
LOG_D("Compaction: case 03");
424455
}
425456
LOG_D("Need to compact head " << needToCompactHead);
426457

@@ -440,32 +471,41 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
440471

441472
WasTheLastBlobBig = true;
442473
} else {
474+
LOG_D("Compaction: case 04");
443475
// маленький блоб надо дописать
444476
LOG_D("Append blob for key " << k.Key.ToString());
445477
LOG_D("Need to compact head " << needToCompactHead);
446478

447479
const TRequestedBlob& requestedBlob = blobs[pos];
448480
if (!CompactRequestedBlob(requestedBlob, parameters, needToCompactHead, compactionRequest.Get(), blobCreationUnixTime, WasTheLastBlobBig)) {
481+
LOG_D("Compaction: case 05");
449482
LOG_D("Can't append blob for key " << k.Key.ToString());
450483
Y_FAIL("Something went wrong");
451484
return;
485+
} else {
486+
LOG_D("Compaction: case 06");
452487
}
453488

454489
WasTheLastBlobBig = false;
455490
}
456491

457492
DumpZones(__FILE__, __LINE__);
458493
}
494+
LOG_D("Compaction: case 24");
459495

460496
if (!CompactionBlobEncoder.IsLastBatchPacked()) {
497+
LOG_D("Compaction: case 07");
461498
CompactionBlobEncoder.PackLastBatch();
499+
} else {
500+
LOG_D("Compaction: case 08");
462501
}
463502

464503
CompactionBlobEncoder.HeadCleared = parameters.HeadCleared;
465504

466505
EndProcessWritesForCompaction(compactionRequest.Get(), blobCreationUnixTime, ctx);
467506

468507
DumpZones(__FILE__, __LINE__);
508+
469509
// for debugging purposes
470510
DumpKeyValueRequest(compactionRequest->Record);
471511

ydb/core/persqueue/pqtablet/pq_impl.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5238,6 +5238,23 @@ void TPersQueue::ProcessPendingEvents()
52385238
}
52395239
}
52405240

5241+
void TPersQueue::Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx)
5242+
{
5243+
PQ_LOG_D("TPersQueue::Handle(TEvPQ::TEvForceCompaction)");
5244+
5245+
const auto& event = *ev->Get();
5246+
const TPartitionId partitionId(event.PartitionId);
5247+
5248+
if (!Partitions.contains(partitionId)) {
5249+
PQ_LOG_D("Unknown partition id " << event.PartitionId);
5250+
return;
5251+
}
5252+
5253+
auto p = Partitions.find(partitionId);
5254+
ctx.Send(p->second.Actor,
5255+
new TEvPQ::TEvForceCompaction(event.PartitionId));
5256+
}
5257+
52415258
bool TPersQueue::HandleHook(STFUNC_SIG)
52425259
{
52435260
TRACE_EVENT(NKikimrServices::PERSQUEUE);
@@ -5285,6 +5302,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
52855302
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
52865303
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
52875304
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);
5305+
HFuncTraced(TEvPQ::TEvForceCompaction, Handle);
52885306
default:
52895307
return false;
52905308
}

ydb/core/persqueue/pqtablet/pq_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
603603

604604
void ResendSplitMergeRequests(const TActorContext& ctx);
605605

606+
void Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx);
606607

607608
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
608609
NWilson::TSpan WriteTxsSpan;

ydb/core/persqueue/ut/common/pq_ut_common.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,4 +1250,19 @@ THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(T
12501250
return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(TDuration::Seconds(2));
12511251
}
12521252

1253+
void CmdRunCompaction(TTestActorRuntime& runtime,
1254+
ui64 tabletId,
1255+
const TActorId& sender,
1256+
const ui32 partition)
1257+
{
1258+
auto event = MakeHolder<TEvPQ::TEvForceCompaction>(partition);
1259+
runtime.SendToPipe(tabletId, sender, event.Release(), 0, GetPipeConfigWithRetries());
1260+
}
1261+
1262+
void CmdRunCompaction(const ui32 partition,
1263+
TTestContext& tc)
1264+
{
1265+
CmdRunCompaction(*tc.Runtime, tc.TabletId, tc.Edge, partition);
1266+
}
1267+
12531268
} // namespace NKikimr::NPQ

ydb/core/persqueue/ut/common/pq_ut_common.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,13 @@ struct TCmdWriteOptions {
644644
};
645645
void CmdWrite(const TCmdWriteOptions&);
646646

647+
void CmdRunCompaction(TTestActorRuntime& runtime,
648+
ui64 tabletId,
649+
const TActorId& sender,
650+
const ui32 partition);
651+
void CmdRunCompaction(const ui32 partition,
652+
TTestContext& tc);
653+
647654
THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId);
648655

649656
} // namespace NKikimr::NPQ

0 commit comments

Comments
 (0)