Skip to content

Commit df1731f

Browse files
authored
stable-25-3-1: Empty blobs for compaction (#26519)
2 parents c91ca45 + 1fe6f79 commit df1731f

File tree

2 files changed

+62
-17
lines changed

2 files changed

+62
-17
lines changed

ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -178,41 +178,28 @@ void TPartition::TryRunCompaction()
178178
const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit();
179179
const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound();
180180

181-
if (BlobEncoder.DataKeysBody.size() >= blobsKeyCountLimit) {
182-
CompactionInProgress = true;
183-
Send(SelfId(), new TEvPQ::TEvRunCompaction(BlobEncoder.DataKeysBody.size()));
181+
if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit())) {
182+
LOG_D("No data for blobs compaction");
184183
return;
185184
}
186185

187-
size_t blobsCount = 0, blobsSize = 0, totalSize = 0;
186+
size_t blobsCount = 0, blobsSize = 0;
188187
for (; blobsCount < BlobEncoder.DataKeysBody.size(); ++blobsCount) {
189188
const auto& k = BlobEncoder.DataKeysBody[blobsCount];
190189
if (k.Size < compactedBlobSizeLowerBound) {
191190
// неполный блоб. можно дописать
192191
blobsSize += k.Size;
193-
totalSize += k.Size;
194192
if (blobsSize > 2 * MaxBlobSize) {
195193
// KV не может отдать много
196194
blobsSize -= k.Size;
197-
totalSize -= k.Size;
198195
break;
199196
}
200197
LOG_D("Blob key for append " << k.Key.ToString());
201198
} else {
202-
totalSize += k.Size;
203199
LOG_D("Blob key for rename " << k.Key.ToString());
204200
}
205201
}
206-
LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes (" << totalSize << ")");
207-
208-
if (totalSize < GetCumulativeSizeLimit()) {
209-
LOG_D("Need more data for compaction. " <<
210-
"Blobs " << BlobEncoder.DataKeysBody.size() <<
211-
", size " << totalSize << " (" << GetCumulativeSizeLimit() << ")");
212-
return;
213-
}
214-
215-
LOG_D("Run compaction for " << blobsCount << " blobs");
202+
LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes");
216203

217204
CompactionInProgress = true;
218205

ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3358,6 +3358,64 @@ Y_UNIT_TEST_F(Write_And_Read_Gigant_Messages_2, TFixtureNoClient)
33583358
TestWriteAndReadMessages(4, 61'000'000, true);
33593359
}
33603360

3361+
Y_UNIT_TEST_F(Write_50k_100times_50tx, TFixtureTable)
3362+
{
3363+
// 100 transactions. Write 100 50KB messages in each folder. Call the commit at the same time.
3364+
// As a result, there will be a lot of small blobs in the FastWrite zone of the main batch,
3365+
// which will be picked up by a compact. The scenario is similar to the work of Ya.Metrika.
3366+
3367+
const std::size_t PARTITIONS_COUNT = 2;
3368+
const std::size_t TXS_COUNT = 50;
3369+
3370+
auto makeSourceId = [](unsigned txId, unsigned partitionId) {
3371+
std::string sourceId = TEST_MESSAGE_GROUP_ID;
3372+
sourceId += "_";
3373+
sourceId += ToString(txId);
3374+
sourceId += "_";
3375+
sourceId += ToString(partitionId);
3376+
return sourceId;
3377+
};
3378+
3379+
CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);
3380+
3381+
SetPartitionWriteSpeed("topic_A", 50'000'000);
3382+
3383+
std::vector<std::unique_ptr<TFixture::ISession>> sessions;
3384+
std::vector<std::unique_ptr<TTransactionBase>> transactions;
3385+
3386+
for (std::size_t i = 0; i < TXS_COUNT; ++i) {
3387+
sessions.push_back(CreateSession());
3388+
auto& session = sessions.back();
3389+
3390+
transactions.push_back(session->BeginTx());
3391+
auto& tx = transactions.back();
3392+
3393+
auto sourceId = makeSourceId(i, 0);
3394+
for (size_t j = 0; j < 100; ++j) {
3395+
WriteToTopic("topic_A", sourceId, std::string(50'000, 'x'), tx.get(), 0);
3396+
}
3397+
WaitForAcks("topic_A", sourceId);
3398+
3399+
sourceId = makeSourceId(i, 1);
3400+
WriteToTopic("topic_A", sourceId, std::string(50'000, 'x'), tx.get(), 1);
3401+
WaitForAcks("topic_A", sourceId);
3402+
}
3403+
3404+
// We are doing an asynchronous commit of transactions. They will be executed simultaneously.
3405+
std::vector<TAsyncStatus> futures;
3406+
3407+
for (std::size_t i = 0; i < TXS_COUNT; ++i) {
3408+
futures.push_back(sessions[i]->AsyncCommitTx(*transactions[i]));
3409+
}
3410+
3411+
// All transactions must be completed successfully.
3412+
for (std::size_t i = 0; i < TXS_COUNT; ++i) {
3413+
futures[i].Wait();
3414+
const auto& result = futures[i].GetValueSync();
3415+
UNIT_ASSERT_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3416+
}
3417+
}
3418+
33613419
}
33623420

33633421
}

0 commit comments

Comments
 (0)