diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 4d709af406e3..e5cf8e5c0d70 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -198,6 +198,7 @@ struct TEvPQ { EvRunCompaction, EvMirrorTopicDescription, EvBroadcastPartitionError, + EvForceCompaction, EvEnd }; @@ -1278,6 +1279,15 @@ struct TEvPQ { ui64 BlobsCount = 0; }; + + struct TEvForceCompaction : TEventLocal { + explicit TEvForceCompaction(const ui32 partitionId) : + PartitionId(partitionId) + { + } + + ui32 PartitionId = 0; + }; }; } //NKikimr diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index 3ae53f8a9f25..d6fbdd0bb12e 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -27,6 +27,7 @@ #include #include #include +#include namespace NKafka { @@ -2600,49 +2601,49 @@ bool TPartition::TryAddDeleteHeadKeysToPersistRequest() return haveChanges; } -//void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) -//{ -// DBGTRACE_LOG("=== DumpKeyValueRequest ==="); -// DBGTRACE_LOG("--- delete ----------------"); -// for (size_t i = 0; i < request.CmdDeleteRangeSize(); ++i) { -// const auto& cmd = request.GetCmdDeleteRange(i); -// const auto& range = cmd.GetRange(); -// Y_UNUSED(range); -// DBGTRACE_LOG((range.GetIncludeFrom() ? '[' : '(') << range.GetFrom() << -// ", " << -// range.GetTo() << (range.GetIncludeTo() ? ']' : ')')); -// } -// DBGTRACE_LOG("--- write -----------------"); -// for (size_t i = 0; i < request.CmdWriteSize(); ++i) { -// const auto& cmd = request.GetCmdWrite(i); -// Y_UNUSED(cmd); -// DBGTRACE_LOG(cmd.GetKey()); -// } -// DBGTRACE_LOG("--- rename ----------------"); -// for (size_t i = 0; i < request.CmdRenameSize(); ++i) { -// const auto& cmd = request.GetCmdRename(i); -// Y_UNUSED(cmd); -// DBGTRACE_LOG(cmd.GetOldKey() << ", " << cmd.GetNewKey()); -// } -// DBGTRACE_LOG("==========================="); -//} - -//void TPartition::DumpZones(const char* file, unsigned line) const -//{ -// DBGTRACE("TPartition::DumpZones"); -// -// if (file) { -// Y_UNUSED(line); -// DBGTRACE_LOG(file << "(" << line << ")"); -// } -// -// DBGTRACE_LOG("=== DumpPartitionZones ==="); -// DBGTRACE_LOG("--- Compaction -----------"); -// CompactionBlobEncoder.Dump(); -// DBGTRACE_LOG("--- FastWrite ------------"); -// BlobEncoder.Dump(); -// DBGTRACE_LOG("=========================="); -//} +void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) +{ + DBGTRACE_LOG("=== DumpKeyValueRequest ==="); + DBGTRACE_LOG("--- delete ----------------"); + for (size_t i = 0; i < request.CmdDeleteRangeSize(); ++i) { + const auto& cmd = request.GetCmdDeleteRange(i); + const auto& range = cmd.GetRange(); + Y_UNUSED(range); + DBGTRACE_LOG((range.GetIncludeFrom() ? '[' : '(') << range.GetFrom() << + ", " << + range.GetTo() << (range.GetIncludeTo() ? ']' : ')')); + } + DBGTRACE_LOG("--- write -----------------"); + for (size_t i = 0; i < request.CmdWriteSize(); ++i) { + const auto& cmd = request.GetCmdWrite(i); + Y_UNUSED(cmd); + DBGTRACE_LOG(cmd.GetKey()); + } + DBGTRACE_LOG("--- rename ----------------"); + for (size_t i = 0; i < request.CmdRenameSize(); ++i) { + const auto& cmd = request.GetCmdRename(i); + Y_UNUSED(cmd); + DBGTRACE_LOG(cmd.GetOldKey() << ", " << cmd.GetNewKey()); + } + DBGTRACE_LOG("==========================="); +} + +void TPartition::DumpZones(const char* file, unsigned line) const +{ + DBGTRACE("TPartition::DumpZones"); + + if (file) { + Y_UNUSED(line); + DBGTRACE_LOG(file << "(" << line << ")"); + } + + DBGTRACE_LOG("=== DumpPartitionZones ==="); + DBGTRACE_LOG("--- Compaction -----------"); + CompactionBlobEncoder.Dump(); + DBGTRACE_LOG("--- FastWrite ------------"); + BlobEncoder.Dump(); + DBGTRACE_LOG("=========================="); +} TBlobKeyTokenPtr TPartition::MakeBlobKeyToken(const TString& key) { diff --git a/ydb/core/persqueue/pqtablet/partition/partition.h b/ydb/core/persqueue/pqtablet/partition/partition.h index bc4649eaf97c..cbb44040cdb1 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.h +++ b/ydb/core/persqueue/pqtablet/partition/partition.h @@ -246,6 +246,7 @@ class TPartition : public TBaseActor { void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvRunCompaction::TPtr& ev); + void Handle(TEvPQ::TEvForceCompaction::TPtr& ev); void Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr& ev); void Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx); void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx); @@ -596,6 +597,7 @@ class TPartition : public TBaseActor { HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit); IgnoreFunc(TEvPQ::TEvTxBatchComplete); hFuncTraced(TEvPQ::TEvRunCompaction, Handle); + hFuncTraced(TEvPQ::TEvForceCompaction, Handle); default: if (!Initializer.Handle(ev)) { ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev)); @@ -664,6 +666,7 @@ class TPartition : public TBaseActor { HFuncTraced(TEvPQ::TEvDeletePartition, Handle); IgnoreFunc(TEvPQ::TEvTxBatchComplete); hFuncTraced(TEvPQ::TEvRunCompaction, Handle); + hFuncTraced(TEvPQ::TEvForceCompaction, Handle); default: ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev)); break; @@ -1116,7 +1119,7 @@ class TPartition : public TBaseActor { const TEvPQ::TEvBlobResponse* blobResponse, const TActorContext& ctx); - void TryRunCompaction(); + void TryRunCompaction(bool force = false); void BlobsForCompactionWereRead(const TVector& blobs); void BlobsForCompactionWereWrite(); ui64 NextReadCookie(); diff --git a/ydb/core/persqueue/pqtablet/partition/partition_blob_encoder.cpp b/ydb/core/persqueue/pqtablet/partition/partition_blob_encoder.cpp index c1ded51e200a..e90ce7e75861 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_blob_encoder.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_blob_encoder.cpp @@ -1,5 +1,6 @@ #include "partition_blob_encoder.h" #include "partition_util.h" +#include namespace NKikimr::NPQ { @@ -554,68 +555,68 @@ std::pair TPartitionBlobEncoder::Compact(const TKey& key, bool headC return res; } -//void TPartitionBlobEncoder::Dump() const -//{ -// auto dumpCompactedKeys = [this](const std::deque>& keys, const char* prefix) { -// Y_UNUSED(this); -// Y_UNUSED(prefix); -// for (size_t i = 0; i < keys.size(); ++i) { -// DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].first.ToString() << " (" << keys[i].second << ")"); -// } -// }; -// auto dumpKeys = [this](const std::deque& keys, const char* prefix) { -// Y_UNUSED(this); -// Y_UNUSED(prefix); -// if (keys.size() > 10) { -// auto dumpSubkeys = [this](const std::deque& keys, size_t begin, size_t end, const char* prefix) { -// Y_UNUSED(this); -// Y_UNUSED(keys); -// Y_UNUSED(prefix); -// for (size_t i = begin; i < end; ++i) { -// DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() << -// ", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize); -// } -// }; -// dumpSubkeys(keys, 0, 3, prefix); -// DBGTRACE_LOG("..."); -// dumpSubkeys(keys, keys.size() - 3, keys.size(), prefix); -// return; -// } -// for (size_t i = 0; i < keys.size(); ++i) { -// DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() << -// ", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize); -// } -// }; -// auto dumpHead = [this](const THead& head, const char* prefix) { -// Y_UNUSED(this); -// Y_UNUSED(head); -// Y_UNUSED(prefix); -// DBGTRACE_LOG(prefix << -// ": Offset=" << head.Offset << ", PartNo=" << head.PartNo << -// ", PackedSize=" << head.PackedSize << -// ", Batches.size=" << head.GetBatches().size()); -// }; -// auto dumpDataKeysHead = [this](const TVector& levels, const char* prefix) { -// Y_UNUSED(this); -// Y_UNUSED(prefix); -// for (size_t i = 0; i < levels.size(); ++i) { -// const auto& level = levels[i]; -// DBGTRACE_LOG(prefix << "[" << i << "] " << level.Sum() << " / " << level.Border()); -// for (ui32 j = 0; j < level.KeysCount(); ++j) { -// DBGTRACE_LOG(" [" << j << "] " << level.GetKey(j).ToString() << " (" << level.GetSize(j) << ")"); -// } -// } -// }; -// -// DBGTRACE_LOG("StartOffset=" << StartOffset << ", EndOffset=" << EndOffset); -// dumpCompactedKeys(CompactedKeys, "CompactedKeys"); -// DBGTRACE_LOG("BodySize=" << BodySize); -// dumpKeys(DataKeysBody, "Body"); -// dumpKeys(HeadKeys, "Head"); -// dumpHead(Head, "Head"); -// dumpDataKeysHead(DataKeysHead, "Levels"); -// dumpHead(NewHead, "NewHead"); -// DBGTRACE_LOG("NewHeadKey=" << NewHeadKey.Key.ToString() << " (" << NewHeadKey.Size << ")"); -//} +void TPartitionBlobEncoder::Dump() const +{ + auto dumpCompactedKeys = [this](const std::deque>& keys, const char* prefix) { + Y_UNUSED(this); + Y_UNUSED(prefix); + for (size_t i = 0; i < keys.size(); ++i) { + DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].first.ToString() << " (" << keys[i].second << ")"); + } + }; + auto dumpKeys = [this](const std::deque& keys, const char* prefix) { + Y_UNUSED(this); + Y_UNUSED(prefix); + if (keys.size() > 100) { + auto dumpSubkeys = [this](const std::deque& keys, size_t begin, size_t end, const char* prefix) { + Y_UNUSED(this); + Y_UNUSED(keys); + Y_UNUSED(prefix); + for (size_t i = begin; i < end; ++i) { + DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() << + ", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize); + } + }; + dumpSubkeys(keys, 0, 3, prefix); + DBGTRACE_LOG("..."); + dumpSubkeys(keys, keys.size() - 3, keys.size(), prefix); + return; + } + for (size_t i = 0; i < keys.size(); ++i) { + DBGTRACE_LOG(prefix << "[" << i << "]=" << keys[i].Key.ToString() << + ", Size=" << keys[i].Size << ", CumulativeSize=" << keys[i].CumulativeSize); + } + }; + auto dumpHead = [this](const THead& head, const char* prefix) { + Y_UNUSED(this); + Y_UNUSED(head); + Y_UNUSED(prefix); + DBGTRACE_LOG(prefix << + ": Offset=" << head.Offset << ", PartNo=" << head.PartNo << + ", PackedSize=" << head.PackedSize << + ", Batches.size=" << head.GetBatches().size()); + }; + auto dumpDataKeysHead = [this](const TVector& levels, const char* prefix) { + Y_UNUSED(this); + Y_UNUSED(prefix); + for (size_t i = 0; i < levels.size(); ++i) { + const auto& level = levels[i]; + DBGTRACE_LOG(prefix << "[" << i << "] " << level.Sum() << " / " << level.Border()); + for (ui32 j = 0; j < level.KeysCount(); ++j) { + DBGTRACE_LOG(" [" << j << "] " << level.GetKey(j).ToString() << " (" << level.GetSize(j) << ")"); + } + } + }; + + DBGTRACE_LOG("StartOffset=" << StartOffset << ", EndOffset=" << EndOffset); + dumpCompactedKeys(CompactedKeys, "CompactedKeys"); + DBGTRACE_LOG("BodySize=" << BodySize); + dumpKeys(DataKeysBody, "Body"); + dumpKeys(HeadKeys, "Head"); + dumpHead(Head, "Head"); + dumpDataKeysHead(DataKeysHead, "Levels"); + dumpHead(NewHead, "NewHead"); + DBGTRACE_LOG("NewHeadKey=" << NewHeadKey.Key.ToString() << " (" << NewHeadKey.Size << ")"); +} } diff --git a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp index 58136a8b4507..3b5c2c13abb0 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp @@ -161,7 +161,7 @@ void TPartition::DumpKeysForBlobsCompaction() const LOG_D("==================================="); } -void TPartition::TryRunCompaction() +void TPartition::TryRunCompaction(bool force) { if (CompactionInProgress) { LOG_D("Blobs compaction in progress"); @@ -178,7 +178,7 @@ void TPartition::TryRunCompaction() const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit(); const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound(); - if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit())) { + if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit()) && !force) { LOG_D("No data for blobs compaction"); return; } @@ -199,6 +199,7 @@ void TPartition::TryRunCompaction() LOG_D("Blob key for rename " << k.Key.ToString()); } } + LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes"); CompactionInProgress = true; @@ -206,6 +207,11 @@ void TPartition::TryRunCompaction() Send(SelfId(), new TEvPQ::TEvRunCompaction(blobsCount)); } +void TPartition::Handle(TEvPQ::TEvForceCompaction::TPtr&) +{ + TryRunCompaction(true); +} + void TPartition::Handle(TEvPQ::TEvRunCompaction::TPtr& ev) { const ui64 blobsCount = ev->Get()->BlobsCount; @@ -260,11 +266,14 @@ bool TPartition::CompactRequestedBlob(const TRequestedBlob& requestedBlob, TMaybe firstBlobOffset = requestedBlob.Offset; for (TBlobIterator it(requestedBlob.Key, requestedBlob.Value); it.IsValid(); it.Next()) { + LOG_D("Compaction: case 18"); TBatch batch = it.GetBatch(); batch.Unpack(); for (const auto& blob : batch.Blobs) { + LOG_D("Compaction: case 19"); if (wasThePreviousBlobBig && blob.PartData && (blob.PartData->PartNo != 0)) { + LOG_D("Compaction: case 20"); // надо продолжить писать большое сообщение CompactionBlobEncoder.NewHead.PartNo = blob.PartData->PartNo; CompactionBlobEncoder.NewPartitionedBlob(Partition, @@ -277,6 +286,8 @@ bool TPartition::CompactRequestedBlob(const TRequestedBlob& requestedBlob, needToCompactHead, MaxBlobSize, blob.PartData->PartNo); + } else { + LOG_D("Compaction: case 21"); } wasThePreviousBlobBig = false; @@ -309,12 +320,17 @@ bool TPartition::CompactRequestedBlob(const TRequestedBlob& requestedBlob, blobCreationUnixTime = std::max(blobCreationUnixTime, blob.WriteTimestamp); if (!ExecRequestForCompaction(msg, parameters, compactionRequest, blobCreationUnixTime)) { + LOG_D("Compaction: case 22"); return false; + } else { + LOG_D("Compaction: case 23"); } firstBlobOffset = Nothing(); } + LOG_D("Compaction: case 25"); } + LOG_D("Compaction: case 26"); return true; } @@ -328,6 +344,7 @@ void TPartition::RenameCompactedBlob(TDataKey& k, const auto& ctx = ActorContext(); if (!CompactionBlobEncoder.PartitionedBlob.IsInited()) { + LOG_D("Compaction: case 12"); CompactionBlobEncoder.NewPartitionedBlob(Partition, CompactionBlobEncoder.NewHead.Offset, "", // SourceId @@ -337,15 +354,21 @@ void TPartition::RenameCompactedBlob(TDataKey& k, parameters.HeadCleared, // headCleared needToCompactHead, // needCompactHead MaxBlobSize); + } else { + LOG_D("Compaction: case 13"); } auto write = CompactionBlobEncoder.PartitionedBlob.Add(k.Key, size, k.Timestamp, false); if (write && !write->Value.empty()) { + LOG_D("Compaction: case 14"); // надо записать содержимое головы перед первым большим блобом AddCmdWrite(write, compactionRequest, k.Timestamp, ctx); CompactionBlobEncoder.CompactedKeys.emplace_back(write->Key, write->Value.size()); + } else { + LOG_D("Compaction: case 15"); } if (const auto& formedBlobs = CompactionBlobEncoder.PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) { + LOG_D("Compaction: case 16"); ui32 curWrites = RenameTmpCmdWrites(compactionRequest); RenameFormedBlobs(formedBlobs, parameters, @@ -353,6 +376,8 @@ void TPartition::RenameCompactedBlob(TDataKey& k, compactionRequest, CompactionBlobEncoder, ctx); + } else { + LOG_D("Compaction: case 17"); } k.BlobKeyToken->NeedDelete = false; @@ -366,6 +391,8 @@ void TPartition::BlobsForCompactionWereRead(const TVector& LOG_D("Continue blobs compaction"); + DumpZones(__FILE__, __LINE__); + AFL_ENSURE(CompactionInProgress); AFL_ENSURE(blobs.size() == CompactionBlobsCount); @@ -373,8 +400,11 @@ void TPartition::BlobsForCompactionWereRead(const TVector& if (!CompactionBlobEncoder.Head.GetCount() && !CompactionBlobEncoder.NewHead.GetCount() && CompactionBlobEncoder.IsEmpty()) { + LOG_D("Compaction: case 09"); // если это первое сообщение, то надо поправить StartOffset CompactionBlobEncoder.StartOffset = BlobEncoder.StartOffset; + } else { + LOG_D("Compaction: case 10"); } CompactionBlobEncoder.NewHead.Clear(); @@ -393,16 +423,23 @@ void TPartition::BlobsForCompactionWereRead(const TVector& TInstant blobCreationUnixTime = TInstant::Zero(); + DumpZones(__FILE__, __LINE__); + for (size_t i = 0; i < KeysForCompaction.size(); ++i) { + LOG_D("Compaction: case 11"); auto& [k, pos] = KeysForCompaction[i]; bool needToCompactHead = (parameters.CurOffset < k.Key.GetOffset()); if (pos == Max()) { + LOG_D("Compaction: case 01"); // большой блоб надо переименовать LOG_D("Rename key " << k.Key.ToString()); if (!WasTheLastBlobBig) { + LOG_D("Compaction: case 02"); needToCompactHead = true; + } else { + LOG_D("Compaction: case 03"); } LOG_D("Need to compact head " << needToCompactHead); @@ -422,31 +459,43 @@ void TPartition::BlobsForCompactionWereRead(const TVector& WasTheLastBlobBig = true; } else { + LOG_D("Compaction: case 04"); // маленький блоб надо дописать LOG_D("Append blob for key " << k.Key.ToString()); LOG_D("Need to compact head " << needToCompactHead); const TRequestedBlob& requestedBlob = blobs[pos]; if (!CompactRequestedBlob(requestedBlob, parameters, needToCompactHead, compactionRequest.Get(), blobCreationUnixTime, WasTheLastBlobBig)) { + LOG_D("Compaction: case 05"); LOG_D("Can't append blob for key " << k.Key.ToString()); Y_FAIL("Something went wrong"); return; + } else { + LOG_D("Compaction: case 06"); } WasTheLastBlobBig = false; } + + DumpZones(__FILE__, __LINE__); } + LOG_D("Compaction: case 24"); if (!CompactionBlobEncoder.IsLastBatchPacked()) { + LOG_D("Compaction: case 07"); CompactionBlobEncoder.PackLastBatch(); + } else { + LOG_D("Compaction: case 08"); } CompactionBlobEncoder.HeadCleared = parameters.HeadCleared; EndProcessWritesForCompaction(compactionRequest.Get(), blobCreationUnixTime, ctx); + DumpZones(__FILE__, __LINE__); + // for debugging purposes - //DumpKeyValueRequest(compactionRequest->Record); + DumpKeyValueRequest(compactionRequest->Record); ctx.Send(BlobCache, compactionRequest.Release(), 0, 0); } diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index b837dc82fc79..f54719f74543 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -5238,6 +5238,23 @@ void TPersQueue::ProcessPendingEvents() } } +void TPersQueue::Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx) +{ + PQ_LOG_D("TPersQueue::Handle(TEvPQ::TEvForceCompaction)"); + + const auto& event = *ev->Get(); + const TPartitionId partitionId(event.PartitionId); + + if (!Partitions.contains(partitionId)) { + PQ_LOG_D("Unknown partition id " << event.PartitionId); + return; + } + + auto p = Partitions.find(partitionId); + ctx.Send(p->second.Actor, + new TEvPQ::TEvForceCompaction(event.PartitionId)); +} + bool TPersQueue::HandleHook(STFUNC_SIG) { TRACE_EVENT(NKikimrServices::PERSQUEUE); @@ -5285,6 +5302,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG) HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle); HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle); HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle); + HFuncTraced(TEvPQ::TEvForceCompaction, Handle); default: return false; } diff --git a/ydb/core/persqueue/pqtablet/pq_impl.h b/ydb/core/persqueue/pqtablet/pq_impl.h index 210734d59acb..bafc37ac98fd 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.h +++ b/ydb/core/persqueue/pqtablet/pq_impl.h @@ -603,6 +603,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void ResendSplitMergeRequests(const TActorContext& ctx); + void Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx); TIntrusivePtr SamplingControl; NWilson::TSpan WriteTxsSpan; diff --git a/ydb/core/persqueue/pqtablet/ya.make b/ydb/core/persqueue/pqtablet/ya.make index 15f5b9e29856..02ac19985e58 100644 --- a/ydb/core/persqueue/pqtablet/ya.make +++ b/ydb/core/persqueue/pqtablet/ya.make @@ -17,6 +17,7 @@ PEERDIR( ydb/core/persqueue/pqtablet/cache ydb/core/persqueue/pqtablet/partition ydb/core/persqueue/pqtablet/readproxy + ydb/library/dbgtrace ) END() diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index 8f51bbe685bc..39b17e3e3af4 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -1250,4 +1250,19 @@ THolder GetReadBalancerPeriodicTopicStats(T return runtime.GrabEdgeEvent(TDuration::Seconds(2)); } +void CmdRunCompaction(TTestActorRuntime& runtime, + ui64 tabletId, + const TActorId& sender, + const ui32 partition) +{ + auto event = MakeHolder(partition); + runtime.SendToPipe(tabletId, sender, event.Release(), 0, GetPipeConfigWithRetries()); +} + +void CmdRunCompaction(const ui32 partition, + TTestContext& tc) +{ + CmdRunCompaction(*tc.Runtime, tc.TabletId, tc.Edge, partition); +} + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 21034932e1c3..7682c4d12625 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -644,6 +644,13 @@ struct TCmdWriteOptions { }; void CmdWrite(const TCmdWriteOptions&); +void CmdRunCompaction(TTestActorRuntime& runtime, + ui64 tabletId, + const TActorId& sender, + const ui32 partition); +void CmdRunCompaction(const ui32 partition, + TTestContext& tc); + THolder GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId); } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 78b617361b9d..b4b3bf6d21d0 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -12,6 +12,8 @@ #include #include +#include +#include namespace NKikimr::NPQ { @@ -52,6 +54,82 @@ TMaybe PQGetStartOffset(TTestContext& tc) return Nothing(); } +Y_UNIT_TEST(TestCompaction) { + DBGTRACE("TestCompaction"); + TTestContext tc; + tc.EnableDetailedPQLog = true; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function setup, bool& activeZone) { + DBGTRACE("=== " << dispatchName << " ==="); + activeZone = false; + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; + tc.Runtime->SetScheduledLimit(1000); + + auto cmdWrite = [&](const TString& sourceId, const TVector& sizes) { + TVector> data; + for (size_t k = 1; k <= sizes.size(); ++k) { + data.emplace_back(k, TString(sizes[k - 1], 'x')); + } + CmdWrite(0, sourceId, data, tc, false, {}, false, "", -1, -1, false, false, true); + }; + auto cmdCompaction = [&]() { + CmdRunCompaction(0, tc); + }; + + PQTabletPrepare({.partitions = 1, .writeSpeed = 50_MB}, {{"user1", true}}, tc); + + //for (size_t i = 0; i < 1'000; ++i) { + // auto size = RandomNumber(180) + 1; + // size *= 100_KB; + + // cmdWrite(TStringBuilder() << "sourceid" << i, {size}); + //} + + //cmdCompaction(); + + cmdWrite("sourceid1", {2_MB}); + cmdWrite("sourceid2", {5700_KB, 7_MB}); + cmdCompaction(); + + //cmdWrite("sourceid1", {1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB}); + //cmdWrite("sourceid2", {10_MB}); + //cmdCompaction(); + + //cmdWrite("sourceid1", {25_KB, 25_KB, 25_KB}); + //cmdCompaction(); + + //cmdWrite("sourceid2", {7_MB}); + //cmdWrite("sourceid3", {7_MB}); + //cmdWrite("sourceid4", {7_MB}); + //cmdCompaction(); + + //cmdWrite("sourceid3", {1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB, 1_MB}); + //cmdWrite("sourceid3", {1_MB}); + //cmdCompaction(); + + //cmdWrite("sourceid1", {500_KB, 500_KB, 500_KB, 500_KB, 500_KB, 500_KB, 500_KB, 500_KB}); + //cmdCompaction(); + + //cmdWrite("sourceid2", {1_MB}); + //cmdCompaction(); + + //cmdWrite("sourceid3", {300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 300_KB, 100_KB, 100_KB}); + //cmdCompaction(); + + //cmdWrite("sourceid1", {1_MB, 400_KB, 400_KB, 400_KB, 400_KB}); + //cmdCompaction(); + + cmdWrite("sourceidX", {1_KB}); + + //cmdWrite("sourceid2", {100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 100_KB, 7000_KB}); + //cmdWrite("sourceid3", {1MB, 100_KB, 100_KB, 100_KB}); + //cmdCompaction(); + }); +} + Y_UNIT_TEST(TestCmdReadWithLastOffset) { TTestContext tc; tc.EnableDetailedPQLog = true; diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp index 457d440cbf83..48eb0656c9ab 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp @@ -25,6 +25,7 @@ #include #include +#include using namespace std::chrono_literals; @@ -888,7 +889,7 @@ void TFixture::TTopicWriteSessionContext::WaitForEvent() } } } else if ([[maybe_unused]] auto* e = std::get_if(&event)) { - UNIT_FAIL(""); + UNIT_FAIL(e->DebugString(true)); } } } @@ -3416,6 +3417,161 @@ Y_UNIT_TEST_F(Write_50k_100times_50tx, TFixtureTable) } } +Y_UNIT_TEST_F(Foo_1, TFixtureTable) +{ + const size_t COUNT = 10; + const size_t SMALL = 50'000; + const size_t SMALL_COUNT = 1; + const size_t BIG = 29'000'000; + const size_t BIG_COUNT = 1; + + CreateTopic("topic_A", TEST_CONSUMER); + CreateTopic("topic_B", TEST_CONSUMER); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + auto session = CreateSession(); + auto tx = session->BeginTx(); + + for (size_t j = 0; j < COUNT; ++j) { + for (size_t k = 0; k < SMALL_COUNT; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(SMALL, 'x'), tx.get(), 0); + } + for (size_t k = 0; k < BIG_COUNT; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(BIG, 'x'), tx.get(), 0); + } + } + + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, std::string(50'000, 'x'), tx.get(), 0); + + session->CommitTx(*tx, EStatus::SUCCESS); + + RestartPQTablet("topic_A", 0); + + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, (SMALL_COUNT + BIG_COUNT) * COUNT); + + for (size_t j = 0; j < messages.size(); ) { + for (size_t k = 0; k < SMALL_COUNT; ++k, ++j) { + UNIT_ASSERT_VALUES_EQUAL(messages[j].size(), SMALL); + } + for (size_t k = 0; k < BIG_COUNT; ++k, ++j) { + UNIT_ASSERT_VALUES_EQUAL(messages[j].size(), BIG); + } + } +} + +Y_UNIT_TEST_F(Foo_2, TFixtureTable) +{ + DBGTRACE("Foo_2"); + + CreateTopic("topic_A", TEST_CONSUMER, 0); + CreateTopic("topic_B", TEST_CONSUMER, 2); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + DBGTRACE_LOG("transaction"); + auto session = CreateSession(); + auto tx = session->BeginTx(); + + // D0000252569_00000000000000000000_00000_0000000005_00001 + // D0000252569_00000000000000000005_00000_0000000014_00012 + // D0000252569_00000000000000000019_00000_0000000019_00008 + // D0000252569_00000000000000000019_00001_0000000005_00002 + // D0000252569_00000000000000000043_00000_0000000006_00003 + auto write = [&](size_t count, size_t size) { + for (size_t i = 0; i < count; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(size, 'x'), tx.get(), 0); + } + }; + + write(1, 1'000'000); + write(4, 400'000); + write(1, 6'000'000); + write(13, 400'000); + write(1, 900'000); + write(2, 1'000'000); + write(2, 400'000); + write(4, 1'000'000); + write(15, 400'000); + write(3, 1'000'000); + write(3, 400'000); + + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, std::string(50'000, 'x'), tx.get(), 1); + + DBGTRACE_LOG("commit"); + session->CommitTx(*tx, EStatus::SUCCESS); + + DBGTRACE_LOG("restart"); + RestartPQTablet("topic_A", 0); + + DBGTRACE_LOG("read"); + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 49); + + DBGTRACE_LOG("checks"); +} + +Y_UNIT_TEST_F(Foo_3, TFixtureTable) +{ + DBGTRACE("Foo_3"); + + DBGTRACE_LOG("=== prepare data"); + std::vector sizes; + sizes.push_back(65'000); + sizes.push_back(900'000); + sizes.push_back(9'500'000); + //sizes.push_back(61'000'000); + + std::vector permutations; + + std::sort(sizes.begin(), sizes.end()); + do { + for (auto size : sizes) { + permutations.push_back(size); + } + } while (std::next_permutation(sizes.begin(), sizes.end())); + + std::vector messages; + + DBGTRACE_LOG("=== transaction"); + + CreateTopic("topic_A", TEST_CONSUMER); + CreateTopic("topic_B", TEST_CONSUMER); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + auto session = CreateSession(); + auto tx = session->BeginTx(); + + DBGTRACE_LOG("=== writing"); + for (auto size : permutations) { + const auto count = RandomNumber(5) + 1; + for (size_t i = 0; i < count; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(size, 'x'), tx.get(), 0); + if (RandomNumber(2)) { + DBGTRACE_LOG("=== local restarting"); + RestartPQTablet("topic_A", 0); + } + messages.push_back(size); + } + } + + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, std::string(50'000, 'x'), tx.get(), 0); + + DBGTRACE_LOG("=== commiting"); + session->CommitTx(*tx, EStatus::SUCCESS); + + DBGTRACE_LOG("=== restarting"); + RestartPQTablet("topic_A", 0); + + DBGTRACE_LOG("=== reading"); + auto m = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, messages.size()); + + DBGTRACE_LOG("=== checking"); + for (size_t i = 0; i < messages.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(m[i].size(), messages[i]); + } +} + } } diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 5c83279f1c01..b2956ea9c7a6 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -205,12 +205,13 @@ NKikimr::Tests::TServerSettings TTopicSdkTestSetup::MakeServerSettings() settings.PQConfig.SetDatabase("/Root"); settings.SetLoggerInitializer([](NActors::TTestActorRuntime& runtime) { - runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PQ_MIRRORER, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_DEBUG); + Y_UNUSED(runtime); + //runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG); + //runtime.SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG); + //runtime.SetLogPriority(NKikimrServices::PQ_MIRRORER, NActors::NLog::PRI_DEBUG); + //runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NActors::NLog::PRI_DEBUG); + //runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG); + //runtime.SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_DEBUG); }); return settings;