Skip to content

Commit 8654e8e

Browse files
[+] tracing
1 parent 449031c commit 8654e8e

File tree

5 files changed

+162
-108
lines changed

5 files changed

+162
-108
lines changed

ydb/core/persqueue/pqtablet/partition/partition.cpp

Lines changed: 44 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <util/folder/path.h>
2828
#include <util/string/escape.h>
2929
#include <util/system/byteorder.h>
30+
#include <ydb/library/dbgtrace/debug_trace.h>
3031

3132
namespace NKafka {
3233

@@ -2600,49 +2601,49 @@ bool TPartition::TryAddDeleteHeadKeysToPersistRequest()
26002601
return haveChanges;
26012602
}
26022603

2603-
//void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request)
2604-
//{
2605-
// DBGTRACE_LOG("=== DumpKeyValueRequest ===");
2606-
// DBGTRACE_LOG("--- delete ----------------");
2607-
// for (size_t i = 0; i < request.CmdDeleteRangeSize(); ++i) {
2608-
// const auto& cmd = request.GetCmdDeleteRange(i);
2609-
// const auto& range = cmd.GetRange();
2610-
// Y_UNUSED(range);
2611-
// DBGTRACE_LOG((range.GetIncludeFrom() ? '[' : '(') << range.GetFrom() <<
2612-
// ", " <<
2613-
// range.GetTo() << (range.GetIncludeTo() ? ']' : ')'));
2614-
// }
2615-
// DBGTRACE_LOG("--- write -----------------");
2616-
// for (size_t i = 0; i < request.CmdWriteSize(); ++i) {
2617-
// const auto& cmd = request.GetCmdWrite(i);
2618-
// Y_UNUSED(cmd);
2619-
// DBGTRACE_LOG(cmd.GetKey());
2620-
// }
2621-
// DBGTRACE_LOG("--- rename ----------------");
2622-
// for (size_t i = 0; i < request.CmdRenameSize(); ++i) {
2623-
// const auto& cmd = request.GetCmdRename(i);
2624-
// Y_UNUSED(cmd);
2625-
// DBGTRACE_LOG(cmd.GetOldKey() << ", " << cmd.GetNewKey());
2626-
// }
2627-
// DBGTRACE_LOG("===========================");
2628-
//}
2629-
2630-
//void TPartition::DumpZones(const char* file, unsigned line) const
2631-
//{
2632-
// DBGTRACE("TPartition::DumpZones");
2633-
//
2634-
// if (file) {
2635-
// Y_UNUSED(line);
2636-
// DBGTRACE_LOG(file << "(" << line << ")");
2637-
// }
2638-
//
2639-
// DBGTRACE_LOG("=== DumpPartitionZones ===");
2640-
// DBGTRACE_LOG("--- Compaction -----------");
2641-
// CompactionBlobEncoder.Dump();
2642-
// DBGTRACE_LOG("--- FastWrite ------------");
2643-
// BlobEncoder.Dump();
2644-
// DBGTRACE_LOG("==========================");
2645-
//}
2604+
void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request)
2605+
{
2606+
DBGTRACE_LOG("=== DumpKeyValueRequest ===");
2607+
DBGTRACE_LOG("--- delete ----------------");
2608+
for (size_t i = 0; i < request.CmdDeleteRangeSize(); ++i) {
2609+
const auto& cmd = request.GetCmdDeleteRange(i);
2610+
const auto& range = cmd.GetRange();
2611+
Y_UNUSED(range);
2612+
DBGTRACE_LOG((range.GetIncludeFrom() ? '[' : '(') << range.GetFrom() <<
2613+
", " <<
2614+
range.GetTo() << (range.GetIncludeTo() ? ']' : ')'));
2615+
}
2616+
DBGTRACE_LOG("--- write -----------------");
2617+
for (size_t i = 0; i < request.CmdWriteSize(); ++i) {
2618+
const auto& cmd = request.GetCmdWrite(i);
2619+
Y_UNUSED(cmd);
2620+
DBGTRACE_LOG(cmd.GetKey());
2621+
}
2622+
DBGTRACE_LOG("--- rename ----------------");
2623+
for (size_t i = 0; i < request.CmdRenameSize(); ++i) {
2624+
const auto& cmd = request.GetCmdRename(i);
2625+
Y_UNUSED(cmd);
2626+
DBGTRACE_LOG(cmd.GetOldKey() << ", " << cmd.GetNewKey());
2627+
}
2628+
DBGTRACE_LOG("===========================");
2629+
}
2630+
2631+
void TPartition::DumpZones(const char* file, unsigned line) const
2632+
{
2633+
DBGTRACE("TPartition::DumpZones");
2634+
2635+
if (file) {
2636+
Y_UNUSED(line);
2637+
DBGTRACE_LOG(file << "(" << line << ")");
2638+
}
2639+
2640+
DBGTRACE_LOG("=== DumpPartitionZones ===");
2641+
DBGTRACE_LOG("--- Compaction -----------");
2642+
CompactionBlobEncoder.Dump();
2643+
DBGTRACE_LOG("--- FastWrite ------------");
2644+
BlobEncoder.Dump();
2645+
DBGTRACE_LOG("==========================");
2646+
}
26462647

26472648
TBlobKeyTokenPtr TPartition::MakeBlobKeyToken(const TString& key)
26482649
{

ydb/core/persqueue/pqtablet/partition/partition_blob_encoder.cpp

Lines changed: 64 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "partition_blob_encoder.h"
22
#include "partition_util.h"
3+
#include <ydb/library/dbgtrace/debug_trace.h>
34

45
namespace NKikimr::NPQ {
56

@@ -554,68 +555,68 @@ std::pair<TKey, ui32> TPartitionBlobEncoder::Compact(const TKey& key, bool headC
554555
return res;
555556
}
556557

557-
//void TPartitionBlobEncoder::Dump() const
558-
//{
559-
// auto dumpCompactedKeys = [this](const std::deque<std::pair<TKey, ui32>>& keys, const char* prefix) {
560-
// Y_UNUSED(this);
561-
// Y_UNUSED(prefix);
562-
// for (size_t i = 0; i < keys.size(); ++i) {
563-
// DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].first.ToString() << " (" << keys[i].second << ")");
564-
// }
565-
// };
566-
// auto dumpKeys = [this](const std::deque<TDataKey>& keys, const char* prefix) {
567-
// Y_UNUSED(this);
568-
// Y_UNUSED(prefix);
569-
// if (keys.size() > 10) {
570-
// auto dumpSubkeys = [this](const std::deque<TDataKey>& keys, size_t begin, size_t end, const char* prefix) {
571-
// Y_UNUSED(this);
572-
// Y_UNUSED(keys);
573-
// Y_UNUSED(prefix);
574-
// for (size_t i = begin; i < end; ++i) {
575-
// DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() <<
576-
// ", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize);
577-
// }
578-
// };
579-
// dumpSubkeys(keys, 0, 3, prefix);
580-
// DBGTRACE_LOG("...");
581-
// dumpSubkeys(keys, keys.size() - 3, keys.size(), prefix);
582-
// return;
583-
// }
584-
// for (size_t i = 0; i < keys.size(); ++i) {
585-
// DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() <<
586-
// ", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize);
587-
// }
588-
// };
589-
// auto dumpHead = [this](const THead& head, const char* prefix) {
590-
// Y_UNUSED(this);
591-
// Y_UNUSED(head);
592-
// Y_UNUSED(prefix);
593-
// DBGTRACE_LOG(prefix <<
594-
// ": Offset=" << head.Offset << ", PartNo=" << head.PartNo <<
595-
// ", PackedSize=" << head.PackedSize <<
596-
// ", Batches.size=" << head.GetBatches().size());
597-
// };
598-
// auto dumpDataKeysHead = [this](const TVector<TKeyLevel>& levels, const char* prefix) {
599-
// Y_UNUSED(this);
600-
// Y_UNUSED(prefix);
601-
// for (size_t i = 0; i < levels.size(); ++i) {
602-
// const auto& level = levels[i];
603-
// DBGTRACE_LOG(prefix << "[" << i << "] " << level.Sum() << " / " << level.Border());
604-
// for (ui32 j = 0; j < level.KeysCount(); ++j) {
605-
// DBGTRACE_LOG(" [" << j << "] " << level.GetKey(j).ToString() << " (" << level.GetSize(j) << ")");
606-
// }
607-
// }
608-
// };
609-
//
610-
// DBGTRACE_LOG("StartOffset=" << StartOffset << ", EndOffset=" << EndOffset);
611-
// dumpCompactedKeys(CompactedKeys, "CompactedKeys");
612-
// DBGTRACE_LOG("BodySize=" << BodySize);
613-
// dumpKeys(DataKeysBody, "Body");
614-
// dumpKeys(HeadKeys, "Head");
615-
// dumpHead(Head, "Head");
616-
// dumpDataKeysHead(DataKeysHead, "Levels");
617-
// dumpHead(NewHead, "NewHead");
618-
// DBGTRACE_LOG("NewHeadKey=" << NewHeadKey.Key.ToString() << " (" << NewHeadKey.Size << ")");
619-
//}
558+
void TPartitionBlobEncoder::Dump() const
559+
{
560+
auto dumpCompactedKeys = [this](const std::deque<std::pair<TKey, ui32>>& keys, const char* prefix) {
561+
Y_UNUSED(this);
562+
Y_UNUSED(prefix);
563+
for (size_t i = 0; i < keys.size(); ++i) {
564+
DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].first.ToString() << " (" << keys[i].second << ")");
565+
}
566+
};
567+
auto dumpKeys = [this](const std::deque<TDataKey>& keys, const char* prefix) {
568+
Y_UNUSED(this);
569+
Y_UNUSED(prefix);
570+
if (keys.size() > 100) {
571+
auto dumpSubkeys = [this](const std::deque<TDataKey>& keys, size_t begin, size_t end, const char* prefix) {
572+
Y_UNUSED(this);
573+
Y_UNUSED(keys);
574+
Y_UNUSED(prefix);
575+
for (size_t i = begin; i < end; ++i) {
576+
DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() <<
577+
", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize);
578+
}
579+
};
580+
dumpSubkeys(keys, 0, 3, prefix);
581+
DBGTRACE_LOG("...");
582+
dumpSubkeys(keys, keys.size() - 3, keys.size(), prefix);
583+
return;
584+
}
585+
for (size_t i = 0; i < keys.size(); ++i) {
586+
DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() <<
587+
", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize);
588+
}
589+
};
590+
auto dumpHead = [this](const THead& head, const char* prefix) {
591+
Y_UNUSED(this);
592+
Y_UNUSED(head);
593+
Y_UNUSED(prefix);
594+
DBGTRACE_LOG(prefix <<
595+
": Offset=" << head.Offset << ", PartNo=" << head.PartNo <<
596+
", PackedSize=" << head.PackedSize <<
597+
", Batches.size=" << head.GetBatches().size());
598+
};
599+
auto dumpDataKeysHead = [this](const TVector<TKeyLevel>& levels, const char* prefix) {
600+
Y_UNUSED(this);
601+
Y_UNUSED(prefix);
602+
for (size_t i = 0; i < levels.size(); ++i) {
603+
const auto& level = levels[i];
604+
DBGTRACE_LOG(prefix << "[" << i << "] " << level.Sum() << " / " << level.Border());
605+
for (ui32 j = 0; j < level.KeysCount(); ++j) {
606+
DBGTRACE_LOG(" [" << j << "] " << level.GetKey(j).ToString() << " (" << level.GetSize(j) << ")");
607+
}
608+
}
609+
};
610+
611+
DBGTRACE_LOG("StartOffset=" << StartOffset << ", EndOffset=" << EndOffset);
612+
dumpCompactedKeys(CompactedKeys, "CompactedKeys");
613+
DBGTRACE_LOG("BodySize=" << BodySize);
614+
dumpKeys(DataKeysBody, "Body");
615+
dumpKeys(HeadKeys, "Head");
616+
dumpHead(Head, "Head");
617+
dumpDataKeysHead(DataKeysHead, "Levels");
618+
dumpHead(NewHead, "NewHead");
619+
DBGTRACE_LOG("NewHeadKey=" << NewHeadKey.Key.ToString() << " (" << NewHeadKey.Size << ")");
620+
}
620621

621622
}

ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ void TPartition::TryRunCompaction()
174174
}
175175

176176
//DumpKeysForBlobsCompaction();
177+
DumpZones(__FILE__, __LINE__);
177178

178179
const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit();
179180
const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound();
@@ -366,6 +367,8 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
366367

367368
LOG_D("Continue blobs compaction");
368369

370+
DumpZones(__FILE__, __LINE__);
371+
369372
AFL_ENSURE(CompactionInProgress);
370373
AFL_ENSURE(blobs.size() == CompactionBlobsCount);
371374

@@ -393,6 +396,8 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
393396

394397
TInstant blobCreationUnixTime = TInstant::Zero();
395398

399+
DumpZones(__FILE__, __LINE__);
400+
396401
for (size_t i = 0; i < KeysForCompaction.size(); ++i) {
397402
auto& [k, pos] = KeysForCompaction[i];
398403
bool needToCompactHead = (parameters.CurOffset < k.Key.GetOffset());
@@ -435,6 +440,8 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
435440

436441
WasTheLastBlobBig = false;
437442
}
443+
444+
DumpZones(__FILE__, __LINE__);
438445
}
439446

440447
if (!CompactionBlobEncoder.IsLastBatchPacked()) {
@@ -445,8 +452,9 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
445452

446453
EndProcessWritesForCompaction(compactionRequest.Get(), blobCreationUnixTime, ctx);
447454

455+
DumpZones(__FILE__, __LINE__);
448456
// for debugging purposes
449-
//DumpKeyValueRequest(compactionRequest->Record);
457+
DumpKeyValueRequest(compactionRequest->Record);
450458

451459
ctx.Send(BlobCache, compactionRequest.Release(), 0, 0);
452460
}

ydb/core/persqueue/pqtablet/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ PEERDIR(
1717
ydb/core/persqueue/pqtablet/cache
1818
ydb/core/persqueue/pqtablet/partition
1919
ydb/core/persqueue/pqtablet/readproxy
20+
ydb/library/dbgtrace
2021
)
2122

2223
END()

ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ void TFixture::TTopicWriteSessionContext::WaitForEvent()
888888
}
889889
}
890890
} else if ([[maybe_unused]] auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
891-
UNIT_FAIL("");
891+
UNIT_FAIL(e->DebugString(true));
892892
}
893893
}
894894
}
@@ -3416,6 +3416,49 @@ Y_UNIT_TEST_F(Write_50k_100times_50tx, TFixtureTable)
34163416
}
34173417
}
34183418

3419+
Y_UNIT_TEST_F(Foo, TFixtureTable)
3420+
{
3421+
const size_t COUNT = 10;
3422+
const size_t SMALL = 50'000;
3423+
const size_t SMALL_COUNT = 1;
3424+
const size_t BIG = 29'000'000;
3425+
const size_t BIG_COUNT = 1;
3426+
3427+
CreateTopic("topic_A", TEST_CONSUMER);
3428+
CreateTopic("topic_B", TEST_CONSUMER);
3429+
3430+
SetPartitionWriteSpeed("topic_A", 50'000'000);
3431+
3432+
auto session = CreateSession();
3433+
auto tx = session->BeginTx();
3434+
3435+
for (size_t j = 0; j < COUNT; ++j) {
3436+
for (size_t k = 0; k < SMALL_COUNT; ++k) {
3437+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(SMALL, 'x'), tx.get(), 0);
3438+
}
3439+
for (size_t k = 0; k < BIG_COUNT; ++k) {
3440+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(BIG, 'x'), tx.get(), 0);
3441+
}
3442+
}
3443+
3444+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, std::string(50'000, 'x'), tx.get(), 0);
3445+
3446+
session->CommitTx(*tx, EStatus::SUCCESS);
3447+
3448+
RestartPQTablet("topic_A", 0);
3449+
3450+
auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, (SMALL_COUNT + BIG_COUNT) * COUNT);
3451+
3452+
for (size_t j = 0; j < messages.size(); ) {
3453+
for (size_t k = 0; k < SMALL_COUNT; ++k, ++j) {
3454+
UNIT_ASSERT_VALUES_EQUAL(messages[j].size(), SMALL);
3455+
}
3456+
for (size_t k = 0; k < BIG_COUNT; ++k, ++j) {
3457+
UNIT_ASSERT_VALUES_EQUAL(messages[j].size(), BIG);
3458+
}
3459+
}
3460+
}
3461+
34193462
}
34203463

34213464
}

0 commit comments

Comments
 (0)