Skip to content

Commit 6ae5c00

Browse files
[-] incorrect verification
1 parent a777fcf commit 6ae5c00

File tree

4 files changed

+194
-13
lines changed

4 files changed

+194
-13
lines changed

ydb/core/persqueue/pqtablet/common/logging.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,6 @@ inline TString LogPrefix() { return {}; }
2222
#define PQ_LOG_TX_I(stream) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_TX, LogPrefix() << stream)
2323
#define PQ_LOG_TX_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::PQ_TX, LogPrefix() << stream)
2424

25+
#define PQ_INIT_LOG_D(stream) if (NActors::TlsActivationContext) { LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream); }
26+
2527
} // namespace NKikimr::NPQ
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#pragma once
2+
3+
#include <ydb/core/persqueue/common/partition_id.h>
4+
#include <ydb/core/protos/msgbus_kv.pb.h>
5+
6+
#include <util/generic/hash_set.h>
7+
#include <util/generic/string.h>
8+
9+
namespace NKikimr::NPQ {
10+
11+
THashSet<TString> FilterBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range,
12+
const TPartitionId& partitionId);
13+
14+
}

ydb/core/persqueue/pqtablet/partition/partition_init.cpp

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -552,48 +552,57 @@ THashSet<TString> FilterBlobsMetaData(const NKikimrClient::TKeyValueResponse::TR
552552
std::sort(keys.begin(), keys.end(), compare);
553553

554554
for (size_t i = 0; i < keys.size(); ++i) {
555-
PQ_LOG_D("key[" << i << "]: " << keys[i]);
555+
PQ_INIT_LOG_D("key[" << i << "]: " << keys[i]);
556556
}
557557

558558
TVector<TString> filtered;
559559
TKey lastKey;
560560

561561
for (auto& k : keys) {
562562
if (filtered.empty()) {
563-
PQ_LOG_D("add key " << k);
563+
PQ_INIT_LOG_D("add key " << k);
564564
filtered.push_back(std::move(k));
565565
lastKey = TKey::FromString(filtered.back(), partitionId);
566566
} else {
567567
auto candidate = TKey::FromString(k, partitionId);
568568

569569
if (lastKey.GetOffset() == candidate.GetOffset()) {
570570
if (lastKey.GetPartNo() == candidate.GetPartNo()) {
571-
// candidate содержит lastKey
572-
AFL_ENSURE(lastKey.GetCount() <= candidate.GetCount())
573-
("lastKey", lastKey.ToString())("candidate", candidate.ToString().data());
574571
if (lastKey.GetCount() < candidate.GetCount()) {
575-
PQ_LOG_D("replace key " << filtered.back() << " to " << k);
572+
// candidate содержит lastKey
573+
PQ_INIT_LOG_D("replace key " << filtered.back() << " to " << k);
576574
filtered.back() = std::move(k);
577575
lastKey = candidate;
576+
} else if (lastKey.GetCount() == candidate.GetCount()) {
577+
if (lastKey.GetInternalPartsCount() < candidate.GetInternalPartsCount()) {
578+
// candidate содержит lastKey
579+
PQ_INIT_LOG_D("replace key " << filtered.back() << " to " << k);
580+
filtered.back() = std::move(k);
581+
lastKey = candidate;
582+
} else {
583+
// lastKey содержит candidate
584+
PQ_INIT_LOG_D("ignore key " << k);
585+
}
586+
} else {
587+
// lastKey содержит candidate
588+
PQ_INIT_LOG_D("ignore key " << k);
578589
}
579590
} else if (lastKey.GetPartNo() > candidate.GetPartNo()) {
580-
PQ_LOG_D("ignore key " << k);
591+
// lastKey содержит candidate
592+
PQ_INIT_LOG_D("ignore key " << k);
581593
} else {
582594
// candidate после lastKey
583-
//PQ_INIT_ENSURE(lastKey.GetPartNo() + lastKey.GetInternalPartsCount() == candidate.GetPartNo(),
584-
// "lastKey=%s, candidate=%s",
585-
// lastKey.ToString().data(), candidate.ToString().data());
586-
PQ_LOG_D("add key " << k);
595+
PQ_INIT_LOG_D("add key " << k);
587596
filtered.push_back(std::move(k));
588597
lastKey = candidate;
589598
}
590599
} else {
591600
if (const ui64 nextOffset = lastKey.GetOffset() + lastKey.GetCount(); nextOffset > candidate.GetOffset()) {
592601
// lastKey содержит candidate
593-
PQ_LOG_D("ignore key " << k);
602+
PQ_INIT_LOG_D("ignore key " << k);
594603
} else {
595604
// candidate после lastKey или пропуск между lastKey и candidate
596-
PQ_LOG_D("add key " << k);
605+
PQ_INIT_LOG_D("add key " << k);
597606
filtered.push_back(std::move(k));
598607
lastKey = candidate;
599608
}

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <ydb/core/keyvalue/keyvalue_events.h>
22
#include <ydb/core/persqueue/events/internal.h>
33
#include <ydb/core/persqueue/pqtablet/partition/partition.h>
4+
#include <ydb/core/persqueue/pqtablet/partition/blob_key_filter.h>
45
#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
56
#include <ydb/core/protos/counters_keyvalue.pb.h>
67
#include <ydb/core/protos/pqconfig.pb.h>
@@ -3682,6 +3683,161 @@ Y_UNIT_TEST_F(TEvTxCalcPredicate_With_Conflicts, TPartitionTxTestHelper)
36823683
WaitTxPredicateReply(tx2);
36833684
}
36843685

3686+
Y_UNIT_TEST(BlobKeyFilfer)
3687+
{
3688+
auto filterKeys = [](const TVector<TString>& keys, const TPartitionId& partitionId) -> THashSet<TString> {
3689+
NKikimrClient::TKeyValueResponse::TReadRangeResult result;
3690+
for (const auto& k : keys) {
3691+
auto* pair = result.AddPair();
3692+
pair->SetStatus(NKikimrProto::OK);
3693+
pair->SetKey(k);
3694+
}
3695+
return FilterBlobsMetaData(result, partitionId);
3696+
};
3697+
3698+
TVector<TString> actualKeys{
3699+
"d0000000000_00000000000000000000_00000_0000000001_00000?",
3700+
"d0000000000_00000000000000000001_00000_0000000001_00000?"
3701+
};
3702+
THashSet<TString> expectedKeys{
3703+
"d0000000000_00000000000000000000_00000_0000000001_00000?",
3704+
"d0000000000_00000000000000000001_00000_0000000001_00000?"
3705+
};
3706+
auto filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3707+
3708+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3709+
3710+
actualKeys = {
3711+
"d0000000000_00000000000000000000_00000_0000000001_00000?",
3712+
"d0000000000_00000000000000000000_00000_0000000002_00000|",
3713+
"d0000000000_00000000000000000001_00000_0000000001_00000?"
3714+
};
3715+
expectedKeys = {
3716+
"d0000000000_00000000000000000000_00000_0000000002_00000|"
3717+
};
3718+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3719+
3720+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3721+
3722+
actualKeys = {
3723+
"d0000000000_00000000000000000000_00000_0000000001_00000?",
3724+
"d0000000000_00000000000000000000_00000_0000000001_00000|",
3725+
"d0000000000_00000000000000000001_00000_0000000001_00000?",
3726+
"d0000000000_00000000000000000001_00000_0000000002_00000|",
3727+
"d0000000000_00000000000000000002_00000_0000000001_00000?"
3728+
};
3729+
expectedKeys = {
3730+
"d0000000000_00000000000000000000_00000_0000000001_00000|",
3731+
"d0000000000_00000000000000000001_00000_0000000002_00000|"
3732+
};
3733+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3734+
3735+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3736+
3737+
actualKeys = {
3738+
"d0000000000_00000000000000000000_00000_0000000000_00002|",
3739+
"d0000000000_00000000000000000000_00002_0000000001_00002|"
3740+
};
3741+
expectedKeys = {
3742+
"d0000000000_00000000000000000000_00000_0000000000_00002|",
3743+
"d0000000000_00000000000000000000_00002_0000000001_00002|"
3744+
};
3745+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3746+
3747+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3748+
3749+
actualKeys = {
3750+
"d0000000000_00000000000000000000_00000_0000000001_00000?",
3751+
"d0000000000_00000000000000000000_00000_0000000003_00000|",
3752+
"d0000000000_00000000000000000001_00000_0000000002_00000?",
3753+
"d0000000000_00000000000000000003_00000_0000000001_00000?",
3754+
"d0000000000_00000000000000000003_00000_0000000001_00000|"
3755+
};
3756+
expectedKeys = {
3757+
"d0000000000_00000000000000000000_00000_0000000003_00000|",
3758+
"d0000000000_00000000000000000003_00000_0000000001_00000|"
3759+
};
3760+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3761+
3762+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3763+
3764+
actualKeys = {
3765+
"d0000000000_00000000000000000000_00000_0000000000_00002?",
3766+
"d0000000000_00000000000000000000_00000_0000000000_00002|",
3767+
"d0000000000_00000000000000000000_00002_0000000001_00002?",
3768+
"d0000000000_00000000000000000000_00002_0000000001_00002|"
3769+
};
3770+
expectedKeys = {
3771+
"d0000000000_00000000000000000000_00000_0000000000_00002|",
3772+
"d0000000000_00000000000000000000_00002_0000000001_00002|"
3773+
};
3774+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3775+
3776+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3777+
3778+
actualKeys = {
3779+
"d0000000000_00000000000000000000_00000_0000000002_00000|",
3780+
"d0000000000_00000000000000000000_00000_0000000003_00000|"
3781+
};
3782+
expectedKeys = {
3783+
"d0000000000_00000000000000000000_00000_0000000003_00000|"
3784+
};
3785+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3786+
3787+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3788+
3789+
actualKeys = {
3790+
"d0000000000_00000000000000000000_00000_0000000002_00000?",
3791+
"d0000000000_00000000000000000000_00000_0000000003_00000|",
3792+
"d0000000000_00000000000000000002_00000_0000000001_00000?",
3793+
};
3794+
expectedKeys = {
3795+
"d0000000000_00000000000000000000_00000_0000000003_00000|"
3796+
};
3797+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3798+
3799+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3800+
3801+
actualKeys = {
3802+
"d0000000000_00000000000000000000_00000_0000000002_00004?",
3803+
"d0000000000_00000000000000000000_00000_0000000002_00005?",
3804+
"d0000000000_00000000000000000002_00000_0000000001_00002?"
3805+
};
3806+
expectedKeys = {
3807+
"d0000000000_00000000000000000000_00000_0000000002_00005?",
3808+
"d0000000000_00000000000000000002_00000_0000000001_00002?"
3809+
};
3810+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3811+
3812+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3813+
3814+
actualKeys = {
3815+
"d0000000000_00000000000000000000_00000_0000000002_00004?",
3816+
"d0000000000_00000000000000000000_00000_0000000002_00005|",
3817+
"d0000000000_00000000000000000002_00000_0000000001_00002?"
3818+
};
3819+
expectedKeys = {
3820+
"d0000000000_00000000000000000000_00000_0000000002_00005|",
3821+
"d0000000000_00000000000000000002_00000_0000000001_00002?"
3822+
};
3823+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3824+
3825+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3826+
3827+
actualKeys = {
3828+
"d0000000000_00000000000000000000_00000_0000000002_00004?",
3829+
"d0000000000_00000000000000000000_00000_0000000002_00004|",
3830+
"d0000000000_00000000000000000002_00000_0000000001_00002?"
3831+
};
3832+
expectedKeys = {
3833+
"d0000000000_00000000000000000000_00000_0000000002_00004|",
3834+
"d0000000000_00000000000000000002_00000_0000000001_00002?"
3835+
};
3836+
filteredKeys = filterKeys(actualKeys, TPartitionId(0));
3837+
3838+
UNIT_ASSERT_EQUAL(filteredKeys, expectedKeys);
3839+
}
3840+
36853841
} // End of suite
36863842

36873843
} // namespace

0 commit comments

Comments
 (0)