Skip to content

Commit 1d22e66

Browse files
[+] tracing
1 parent 8654e8e commit 1d22e66

File tree

10 files changed

+298
-11
lines changed

10 files changed

+298
-11
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ struct TEvPQ {
198198
EvRunCompaction,
199199
EvMirrorTopicDescription,
200200
EvBroadcastPartitionError,
201+
EvForceCompaction,
201202
EvEnd
202203
};
203204

@@ -1278,6 +1279,15 @@ struct TEvPQ {
12781279

12791280
ui64 BlobsCount = 0;
12801281
};
1282+
1283+
struct TEvForceCompaction : TEventLocal<TEvForceCompaction, EvForceCompaction> {
1284+
explicit TEvForceCompaction(const ui32 partitionId) :
1285+
PartitionId(partitionId)
1286+
{
1287+
}
1288+
1289+
ui32 PartitionId = 0;
1290+
};
12811291
};
12821292

12831293
} //NKikimr

ydb/core/persqueue/pqtablet/partition/partition.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ class TPartition : public TBaseActor<TPartition> {
246246
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
247247
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
248248
void Handle(TEvPQ::TEvRunCompaction::TPtr& ev);
249+
void Handle(TEvPQ::TEvForceCompaction::TPtr& ev);
249250
void Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr& ev);
250251
void Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx);
251252
void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
@@ -596,6 +597,7 @@ class TPartition : public TBaseActor<TPartition> {
596597
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
597598
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
598599
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
600+
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
599601
default:
600602
if (!Initializer.Handle(ev)) {
601603
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
@@ -664,6 +666,7 @@ class TPartition : public TBaseActor<TPartition> {
664666
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
665667
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
666668
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
669+
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
667670
default:
668671
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
669672
break;
@@ -1116,7 +1119,7 @@ class TPartition : public TBaseActor<TPartition> {
11161119
const TEvPQ::TEvBlobResponse* blobResponse,
11171120
const TActorContext& ctx);
11181121

1119-
void TryRunCompaction();
1122+
void TryRunCompaction(bool force = false);
11201123
void BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>& blobs);
11211124
void BlobsForCompactionWereWrite();
11221125
ui64 NextReadCookie();

ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ void TPartition::DumpKeysForBlobsCompaction() const
161161
LOG_D("===================================");
162162
}
163163

164-
void TPartition::TryRunCompaction()
164+
void TPartition::TryRunCompaction(bool force)
165165
{
166166
if (CompactionInProgress) {
167167
LOG_D("Blobs compaction in progress");
@@ -174,12 +174,11 @@ void TPartition::TryRunCompaction()
174174
}
175175

176176
//DumpKeysForBlobsCompaction();
177-
DumpZones(__FILE__, __LINE__);
178177

179178
const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit();
180179
const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound();
181180

182-
if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit())) {
181+
if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit()) && !force) {
183182
LOG_D("No data for blobs compaction");
184183
return;
185184
}
@@ -200,13 +199,19 @@ void TPartition::TryRunCompaction()
200199
LOG_D("Blob key for rename " << k.Key.ToString());
201200
}
202201
}
202+
203203
LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes");
204204

205205
CompactionInProgress = true;
206206

207207
Send(SelfId(), new TEvPQ::TEvRunCompaction(blobsCount));
208208
}
209209

210+
void TPartition::Handle(TEvPQ::TEvForceCompaction::TPtr&)
211+
{
212+
TryRunCompaction(true);
213+
}
214+
210215
void TPartition::Handle(TEvPQ::TEvRunCompaction::TPtr& ev)
211216
{
212217
const ui64 blobsCount = ev->Get()->BlobsCount;
@@ -261,11 +266,14 @@ bool TPartition::CompactRequestedBlob(const TRequestedBlob& requestedBlob,
261266
TMaybe<ui64> firstBlobOffset = requestedBlob.Offset;
262267

263268
for (TBlobIterator it(requestedBlob.Key, requestedBlob.Value); it.IsValid(); it.Next()) {
269+
LOG_D("Compaction: case 18");
264270
TBatch batch = it.GetBatch();
265271
batch.Unpack();
266272

267273
for (const auto& blob : batch.Blobs) {
274+
LOG_D("Compaction: case 19");
268275
if (wasThePreviousBlobBig && blob.PartData && (blob.PartData->PartNo != 0)) {
276+
LOG_D("Compaction: case 20");
269277
// надо продолжить писать большое сообщение
270278
CompactionBlobEncoder.NewHead.PartNo = blob.PartData->PartNo;
271279
CompactionBlobEncoder.NewPartitionedBlob(Partition,
@@ -278,6 +286,8 @@ bool TPartition::CompactRequestedBlob(const TRequestedBlob& requestedBlob,
278286
needToCompactHead,
279287
MaxBlobSize,
280288
blob.PartData->PartNo);
289+
} else {
290+
LOG_D("Compaction: case 21");
281291
}
282292
wasThePreviousBlobBig = false;
283293

@@ -310,12 +320,17 @@ bool TPartition::CompactRequestedBlob(const TRequestedBlob& requestedBlob,
310320

311321
blobCreationUnixTime = std::max(blobCreationUnixTime, blob.WriteTimestamp);
312322
if (!ExecRequestForCompaction(msg, parameters, compactionRequest, blobCreationUnixTime)) {
323+
LOG_D("Compaction: case 22");
313324
return false;
325+
} else {
326+
LOG_D("Compaction: case 23");
314327
}
315328

316329
firstBlobOffset = Nothing();
317330
}
331+
LOG_D("Compaction: case 25");
318332
}
333+
LOG_D("Compaction: case 26");
319334

320335
return true;
321336
}
@@ -329,6 +344,7 @@ void TPartition::RenameCompactedBlob(TDataKey& k,
329344
const auto& ctx = ActorContext();
330345

331346
if (!CompactionBlobEncoder.PartitionedBlob.IsInited()) {
347+
LOG_D("Compaction: case 12");
332348
CompactionBlobEncoder.NewPartitionedBlob(Partition,
333349
CompactionBlobEncoder.NewHead.Offset,
334350
"", // SourceId
@@ -338,22 +354,30 @@ void TPartition::RenameCompactedBlob(TDataKey& k,
338354
parameters.HeadCleared, // headCleared
339355
needToCompactHead, // needCompactHead
340356
MaxBlobSize);
357+
} else {
358+
LOG_D("Compaction: case 13");
341359
}
342360
auto write = CompactionBlobEncoder.PartitionedBlob.Add(k.Key, size, k.Timestamp, false);
343361
if (write && !write->Value.empty()) {
362+
LOG_D("Compaction: case 14");
344363
// надо записать содержимое головы перед первым большим блобом
345364
AddCmdWrite(write, compactionRequest, k.Timestamp, ctx);
346365
CompactionBlobEncoder.CompactedKeys.emplace_back(write->Key, write->Value.size());
366+
} else {
367+
LOG_D("Compaction: case 15");
347368
}
348369

349370
if (const auto& formedBlobs = CompactionBlobEncoder.PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
371+
LOG_D("Compaction: case 16");
350372
ui32 curWrites = RenameTmpCmdWrites(compactionRequest);
351373
RenameFormedBlobs(formedBlobs,
352374
parameters,
353375
curWrites,
354376
compactionRequest,
355377
CompactionBlobEncoder,
356378
ctx);
379+
} else {
380+
LOG_D("Compaction: case 17");
357381
}
358382

359383
k.BlobKeyToken->NeedDelete = false;
@@ -376,8 +400,11 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
376400
if (!CompactionBlobEncoder.Head.GetCount() &&
377401
!CompactionBlobEncoder.NewHead.GetCount() &&
378402
CompactionBlobEncoder.IsEmpty()) {
403+
LOG_D("Compaction: case 09");
379404
// если это первое сообщение, то надо поправить StartOffset
380405
CompactionBlobEncoder.StartOffset = BlobEncoder.StartOffset;
406+
} else {
407+
LOG_D("Compaction: case 10");
381408
}
382409

383410
CompactionBlobEncoder.NewHead.Clear();
@@ -399,15 +426,20 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
399426
DumpZones(__FILE__, __LINE__);
400427

401428
for (size_t i = 0; i < KeysForCompaction.size(); ++i) {
429+
LOG_D("Compaction: case 11");
402430
auto& [k, pos] = KeysForCompaction[i];
403431
bool needToCompactHead = (parameters.CurOffset < k.Key.GetOffset());
404432

405433
if (pos == Max<size_t>()) {
434+
LOG_D("Compaction: case 01");
406435
// большой блоб надо переименовать
407436
LOG_D("Rename key " << k.Key.ToString());
408437

409438
if (!WasTheLastBlobBig) {
439+
LOG_D("Compaction: case 02");
410440
needToCompactHead = true;
441+
} else {
442+
LOG_D("Compaction: case 03");
411443
}
412444
LOG_D("Need to compact head " << needToCompactHead);
413445

@@ -427,32 +459,41 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
427459

428460
WasTheLastBlobBig = true;
429461
} else {
462+
LOG_D("Compaction: case 04");
430463
// маленький блоб надо дописать
431464
LOG_D("Append blob for key " << k.Key.ToString());
432465
LOG_D("Need to compact head " << needToCompactHead);
433466

434467
const TRequestedBlob& requestedBlob = blobs[pos];
435468
if (!CompactRequestedBlob(requestedBlob, parameters, needToCompactHead, compactionRequest.Get(), blobCreationUnixTime, WasTheLastBlobBig)) {
469+
LOG_D("Compaction: case 05");
436470
LOG_D("Can't append blob for key " << k.Key.ToString());
437471
Y_FAIL("Something went wrong");
438472
return;
473+
} else {
474+
LOG_D("Compaction: case 06");
439475
}
440476

441477
WasTheLastBlobBig = false;
442478
}
443479

444480
DumpZones(__FILE__, __LINE__);
445481
}
482+
LOG_D("Compaction: case 24");
446483

447484
if (!CompactionBlobEncoder.IsLastBatchPacked()) {
485+
LOG_D("Compaction: case 07");
448486
CompactionBlobEncoder.PackLastBatch();
487+
} else {
488+
LOG_D("Compaction: case 08");
449489
}
450490

451491
CompactionBlobEncoder.HeadCleared = parameters.HeadCleared;
452492

453493
EndProcessWritesForCompaction(compactionRequest.Get(), blobCreationUnixTime, ctx);
454494

455495
DumpZones(__FILE__, __LINE__);
496+
456497
// for debugging purposes
457498
DumpKeyValueRequest(compactionRequest->Record);
458499

ydb/core/persqueue/pqtablet/pq_impl.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5238,6 +5238,23 @@ void TPersQueue::ProcessPendingEvents()
52385238
}
52395239
}
52405240

5241+
void TPersQueue::Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx)
5242+
{
5243+
PQ_LOG_D("TPersQueue::Handle(TEvPQ::TEvForceCompaction)");
5244+
5245+
const auto& event = *ev->Get();
5246+
const TPartitionId partitionId(event.PartitionId);
5247+
5248+
if (!Partitions.contains(partitionId)) {
5249+
PQ_LOG_D("Unknown partition id " << event.PartitionId);
5250+
return;
5251+
}
5252+
5253+
auto p = Partitions.find(partitionId);
5254+
ctx.Send(p->second.Actor,
5255+
new TEvPQ::TEvForceCompaction(event.PartitionId));
5256+
}
5257+
52415258
bool TPersQueue::HandleHook(STFUNC_SIG)
52425259
{
52435260
TRACE_EVENT(NKikimrServices::PERSQUEUE);
@@ -5285,6 +5302,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
52855302
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
52865303
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
52875304
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);
5305+
HFuncTraced(TEvPQ::TEvForceCompaction, Handle);
52885306
default:
52895307
return false;
52905308
}

ydb/core/persqueue/pqtablet/pq_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
603603

604604
void ResendSplitMergeRequests(const TActorContext& ctx);
605605

606+
void Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx);
606607

607608
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
608609
NWilson::TSpan WriteTxsSpan;

ydb/core/persqueue/ut/common/pq_ut_common.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,4 +1250,19 @@ THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(T
12501250
return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(TDuration::Seconds(2));
12511251
}
12521252

1253+
void CmdRunCompaction(TTestActorRuntime& runtime,
1254+
ui64 tabletId,
1255+
const TActorId& sender,
1256+
const ui32 partition)
1257+
{
1258+
auto event = MakeHolder<TEvPQ::TEvForceCompaction>(partition);
1259+
runtime.SendToPipe(tabletId, sender, event.Release(), 0, GetPipeConfigWithRetries());
1260+
}
1261+
1262+
void CmdRunCompaction(const ui32 partition,
1263+
TTestContext& tc)
1264+
{
1265+
CmdRunCompaction(*tc.Runtime, tc.TabletId, tc.Edge, partition);
1266+
}
1267+
12531268
} // namespace NKikimr::NPQ

ydb/core/persqueue/ut/common/pq_ut_common.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,13 @@ struct TCmdWriteOptions {
644644
};
645645
void CmdWrite(const TCmdWriteOptions&);
646646

647+
void CmdRunCompaction(TTestActorRuntime& runtime,
648+
ui64 tabletId,
649+
const TActorId& sender,
650+
const ui32 partition);
651+
void CmdRunCompaction(const ui32 partition,
652+
TTestContext& tc);
653+
647654
THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId);
648655

649656
} // namespace NKikimr::NPQ

0 commit comments

Comments
 (0)