Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ struct TEvPQ {
EvRunCompaction,
EvMirrorTopicDescription,
EvBroadcastPartitionError,
EvForceCompaction,
EvEnd
};

Expand Down Expand Up @@ -1278,6 +1279,15 @@ struct TEvPQ {

ui64 BlobsCount = 0;
};

struct TEvForceCompaction : TEventLocal<TEvForceCompaction, EvForceCompaction> {
explicit TEvForceCompaction(const ui32 partitionId) :
PartitionId(partitionId)
{
}

ui32 PartitionId = 0;
};
};

} //NKikimr
87 changes: 44 additions & 43 deletions ydb/core/persqueue/pqtablet/partition/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <util/folder/path.h>
#include <util/string/escape.h>
#include <util/system/byteorder.h>
#include <ydb/library/dbgtrace/debug_trace.h>

namespace NKafka {

Expand Down Expand Up @@ -2600,49 +2601,49 @@ bool TPartition::TryAddDeleteHeadKeysToPersistRequest()
return haveChanges;
}

//void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request)
//{
// DBGTRACE_LOG("=== DumpKeyValueRequest ===");
// DBGTRACE_LOG("--- delete ----------------");
// for (size_t i = 0; i < request.CmdDeleteRangeSize(); ++i) {
// const auto& cmd = request.GetCmdDeleteRange(i);
// const auto& range = cmd.GetRange();
// Y_UNUSED(range);
// DBGTRACE_LOG((range.GetIncludeFrom() ? '[' : '(') << range.GetFrom() <<
// ", " <<
// range.GetTo() << (range.GetIncludeTo() ? ']' : ')'));
// }
// DBGTRACE_LOG("--- write -----------------");
// for (size_t i = 0; i < request.CmdWriteSize(); ++i) {
// const auto& cmd = request.GetCmdWrite(i);
// Y_UNUSED(cmd);
// DBGTRACE_LOG(cmd.GetKey());
// }
// DBGTRACE_LOG("--- rename ----------------");
// for (size_t i = 0; i < request.CmdRenameSize(); ++i) {
// const auto& cmd = request.GetCmdRename(i);
// Y_UNUSED(cmd);
// DBGTRACE_LOG(cmd.GetOldKey() << ", " << cmd.GetNewKey());
// }
// DBGTRACE_LOG("===========================");
//}

//void TPartition::DumpZones(const char* file, unsigned line) const
//{
// DBGTRACE("TPartition::DumpZones");
//
// if (file) {
// Y_UNUSED(line);
// DBGTRACE_LOG(file << "(" << line << ")");
// }
//
// DBGTRACE_LOG("=== DumpPartitionZones ===");
// DBGTRACE_LOG("--- Compaction -----------");
// CompactionBlobEncoder.Dump();
// DBGTRACE_LOG("--- FastWrite ------------");
// BlobEncoder.Dump();
// DBGTRACE_LOG("==========================");
//}
void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request)
{
DBGTRACE_LOG("=== DumpKeyValueRequest ===");
DBGTRACE_LOG("--- delete ----------------");
for (size_t i = 0; i < request.CmdDeleteRangeSize(); ++i) {
const auto& cmd = request.GetCmdDeleteRange(i);
const auto& range = cmd.GetRange();
Y_UNUSED(range);
DBGTRACE_LOG((range.GetIncludeFrom() ? '[' : '(') << range.GetFrom() <<
", " <<
range.GetTo() << (range.GetIncludeTo() ? ']' : ')'));
}
DBGTRACE_LOG("--- write -----------------");
for (size_t i = 0; i < request.CmdWriteSize(); ++i) {
const auto& cmd = request.GetCmdWrite(i);
Y_UNUSED(cmd);
DBGTRACE_LOG(cmd.GetKey());
}
DBGTRACE_LOG("--- rename ----------------");
for (size_t i = 0; i < request.CmdRenameSize(); ++i) {
const auto& cmd = request.GetCmdRename(i);
Y_UNUSED(cmd);
DBGTRACE_LOG(cmd.GetOldKey() << ", " << cmd.GetNewKey());
}
DBGTRACE_LOG("===========================");
}

void TPartition::DumpZones(const char* file, unsigned line) const
{
DBGTRACE("TPartition::DumpZones");

if (file) {
Y_UNUSED(line);
DBGTRACE_LOG(file << "(" << line << ")");
}

DBGTRACE_LOG("=== DumpPartitionZones ===");
DBGTRACE_LOG("--- Compaction -----------");
CompactionBlobEncoder.Dump();
DBGTRACE_LOG("--- FastWrite ------------");
BlobEncoder.Dump();
DBGTRACE_LOG("==========================");
}

TBlobKeyTokenPtr TPartition::MakeBlobKeyToken(const TString& key)
{
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/persqueue/pqtablet/partition/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ class TPartition : public TBaseActor<TPartition> {
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvRunCompaction::TPtr& ev);
void Handle(TEvPQ::TEvForceCompaction::TPtr& ev);
void Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr& ev);
void Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx);
void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -596,6 +597,7 @@ class TPartition : public TBaseActor<TPartition> {
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
default:
if (!Initializer.Handle(ev)) {
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
Expand Down Expand Up @@ -664,6 +666,7 @@ class TPartition : public TBaseActor<TPartition> {
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
default:
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
break;
Expand Down Expand Up @@ -1116,7 +1119,7 @@ class TPartition : public TBaseActor<TPartition> {
const TEvPQ::TEvBlobResponse* blobResponse,
const TActorContext& ctx);

void TryRunCompaction();
void TryRunCompaction(bool force = false);
void BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>& blobs);
void BlobsForCompactionWereWrite();
ui64 NextReadCookie();
Expand Down
127 changes: 64 additions & 63 deletions ydb/core/persqueue/pqtablet/partition/partition_blob_encoder.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "partition_blob_encoder.h"
#include "partition_util.h"
#include <ydb/library/dbgtrace/debug_trace.h>

namespace NKikimr::NPQ {

Expand Down Expand Up @@ -554,68 +555,68 @@ std::pair<TKey, ui32> TPartitionBlobEncoder::Compact(const TKey& key, bool headC
return res;
}

//void TPartitionBlobEncoder::Dump() const
//{
// auto dumpCompactedKeys = [this](const std::deque<std::pair<TKey, ui32>>& keys, const char* prefix) {
// Y_UNUSED(this);
// Y_UNUSED(prefix);
// for (size_t i = 0; i < keys.size(); ++i) {
// DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].first.ToString() << " (" << keys[i].second << ")");
// }
// };
// auto dumpKeys = [this](const std::deque<TDataKey>& keys, const char* prefix) {
// Y_UNUSED(this);
// Y_UNUSED(prefix);
// if (keys.size() > 10) {
// auto dumpSubkeys = [this](const std::deque<TDataKey>& keys, size_t begin, size_t end, const char* prefix) {
// Y_UNUSED(this);
// Y_UNUSED(keys);
// Y_UNUSED(prefix);
// for (size_t i = begin; i < end; ++i) {
// DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() <<
// ", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize);
// }
// };
// dumpSubkeys(keys, 0, 3, prefix);
// DBGTRACE_LOG("...");
// dumpSubkeys(keys, keys.size() - 3, keys.size(), prefix);
// return;
// }
// for (size_t i = 0; i < keys.size(); ++i) {
// DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() <<
// ", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize);
// }
// };
// auto dumpHead = [this](const THead& head, const char* prefix) {
// Y_UNUSED(this);
// Y_UNUSED(head);
// Y_UNUSED(prefix);
// DBGTRACE_LOG(prefix <<
// ": Offset=" << head.Offset << ", PartNo=" << head.PartNo <<
// ", PackedSize=" << head.PackedSize <<
// ", Batches.size=" << head.GetBatches().size());
// };
// auto dumpDataKeysHead = [this](const TVector<TKeyLevel>& levels, const char* prefix) {
// Y_UNUSED(this);
// Y_UNUSED(prefix);
// for (size_t i = 0; i < levels.size(); ++i) {
// const auto& level = levels[i];
// DBGTRACE_LOG(prefix << "[" << i << "] " << level.Sum() << " / " << level.Border());
// for (ui32 j = 0; j < level.KeysCount(); ++j) {
// DBGTRACE_LOG(" [" << j << "] " << level.GetKey(j).ToString() << " (" << level.GetSize(j) << ")");
// }
// }
// };
//
// DBGTRACE_LOG("StartOffset=" << StartOffset << ", EndOffset=" << EndOffset);
// dumpCompactedKeys(CompactedKeys, "CompactedKeys");
// DBGTRACE_LOG("BodySize=" << BodySize);
// dumpKeys(DataKeysBody, "Body");
// dumpKeys(HeadKeys, "Head");
// dumpHead(Head, "Head");
// dumpDataKeysHead(DataKeysHead, "Levels");
// dumpHead(NewHead, "NewHead");
// DBGTRACE_LOG("NewHeadKey=" << NewHeadKey.Key.ToString() << " (" << NewHeadKey.Size << ")");
//}
void TPartitionBlobEncoder::Dump() const
{
auto dumpCompactedKeys = [this](const std::deque<std::pair<TKey, ui32>>& keys, const char* prefix) {
Y_UNUSED(this);
Y_UNUSED(prefix);
for (size_t i = 0; i < keys.size(); ++i) {
DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].first.ToString() << " (" << keys[i].second << ")");
}
};
auto dumpKeys = [this](const std::deque<TDataKey>& keys, const char* prefix) {
Y_UNUSED(this);
Y_UNUSED(prefix);
if (keys.size() > 100) {
auto dumpSubkeys = [this](const std::deque<TDataKey>& keys, size_t begin, size_t end, const char* prefix) {
Y_UNUSED(this);
Y_UNUSED(keys);
Y_UNUSED(prefix);
for (size_t i = begin; i < end; ++i) {
DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() <<
", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize);
}
};
dumpSubkeys(keys, 0, 3, prefix);
DBGTRACE_LOG("...");
dumpSubkeys(keys, keys.size() - 3, keys.size(), prefix);
return;
}
for (size_t i = 0; i < keys.size(); ++i) {
DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() <<
", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize);
}
};
auto dumpHead = [this](const THead& head, const char* prefix) {
Y_UNUSED(this);
Y_UNUSED(head);
Y_UNUSED(prefix);
DBGTRACE_LOG(prefix <<
": Offset=" << head.Offset << ", PartNo=" << head.PartNo <<
", PackedSize=" << head.PackedSize <<
", Batches.size=" << head.GetBatches().size());
};
auto dumpDataKeysHead = [this](const TVector<TKeyLevel>& levels, const char* prefix) {
Y_UNUSED(this);
Y_UNUSED(prefix);
for (size_t i = 0; i < levels.size(); ++i) {
const auto& level = levels[i];
DBGTRACE_LOG(prefix << "[" << i << "] " << level.Sum() << " / " << level.Border());
for (ui32 j = 0; j < level.KeysCount(); ++j) {
DBGTRACE_LOG(" [" << j << "] " << level.GetKey(j).ToString() << " (" << level.GetSize(j) << ")");
}
}
};

DBGTRACE_LOG("StartOffset=" << StartOffset << ", EndOffset=" << EndOffset);
dumpCompactedKeys(CompactedKeys, "CompactedKeys");
DBGTRACE_LOG("BodySize=" << BodySize);
dumpKeys(DataKeysBody, "Body");
dumpKeys(HeadKeys, "Head");
dumpHead(Head, "Head");
dumpDataKeysHead(DataKeysHead, "Levels");
dumpHead(NewHead, "NewHead");
DBGTRACE_LOG("NewHeadKey=" << NewHeadKey.Key.ToString() << " (" << NewHeadKey.Size << ")");
}

}
Loading
Loading