Skip to content

Commit 48e2293

Browse files
authored
Order by pk with limit final fix (#33610)
1 parent e233be5 commit 48e2293

File tree

4 files changed

+295
-50
lines changed

4 files changed

+295
-50
lines changed

ydb/core/formats/arrow/reader/position.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ class TSortableScanData {
181181
return CompareImpl(position, item, itemPosition, item.PositionAddress.size());
182182
}
183183

184+
std::partial_ordering ComparePrefix(const ui64 position, const TSortableScanData& item, const ui64 itemPosition, const ui32 prefixSize) const {
185+
return CompareImpl(position, item, itemPosition, prefixSize);
186+
}
187+
184188
std::partial_ordering ComparePartial(const ui64 position, const TSortableScanData& item, const ui64 itemPosition) const {
185189
return CompareImpl(position, item, itemPosition, std::min<ui32>(PositionAddress.size(), item.PositionAddress.size()));
186190
}
@@ -485,6 +489,12 @@ class TSortableBatchPosition {
485489
return ApplyOptionalReverseForCompareResult(directResult);
486490
}
487491

492+
std::partial_ordering ComparePrefix(const TSortableBatchPosition& item, const ui32 prefixSize) const {
493+
Y_ABORT_UNLESS(item.ReverseSort == ReverseSort);
494+
const auto directResult = Sorting->ComparePrefix(Position, *item.Sorting, item.GetPosition(), prefixSize);
495+
return ApplyOptionalReverseForCompareResult(directResult);
496+
}
497+
488498
std::partial_ordering ComparePartial(const TSortableBatchPosition& item) const {
489499
Y_ABORT_UNLESS(item.ReverseSort == ReverseSort);
490500
const auto directResult = Sorting->ComparePartial(Position, *item.Sorting, item.GetPosition());

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp

Lines changed: 47 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,62 +18,71 @@ bool TSyncPointLimitControl::DrainToLimit() {
1818
nextInHeap = TSourceIterator(Collection->GetNextSource());
1919
}
2020

21-
while (Iterators.size() && (!nextInHeap || !(Iterators.front() < *nextInHeap))) {
22-
if (!Iterators.front().IsFilled()) {
23-
return false;
24-
}
25-
std::pop_heap(Iterators.begin(), Iterators.end());
26-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "LimitIteratorNext")("source_id", Iterators.back().GetSourceIdx())(
27-
"fetched", FetchedCount)("limit", Limit)("iterators", Iterators.size());
28-
if (!Iterators.back().Next()) {
29-
Iterators.pop_back();
21+
while (FilledIterators.size() &&
22+
(!nextInHeap || FilledIterators.front().ComparePrefix(*nextInHeap, *PKPrefixSize) == std::partial_ordering::less) &&
23+
(!UnfilledIterators.size() || FilledIterators.front().ComparePrefix(UnfilledIterators.front(), *PKPrefixSize) == std::partial_ordering::less)) {
24+
25+
std::pop_heap(FilledIterators.begin(), FilledIterators.end());
26+
27+
if (!FilledIterators.back().Next()) {
28+
FilledIterators.pop_back();
3029
} else {
31-
std::push_heap(Iterators.begin(), Iterators.end());
32-
if (++FetchedCount >= Limit) {
33-
return true;
34-
}
30+
std::push_heap(FilledIterators.begin(), FilledIterators.end());
31+
}
32+
if (++FetchedCount >= Limit) {
33+
return true;
3534
}
3635
}
3736
return false;
3837
}
3938

39+
std::shared_ptr<NCommon::IDataSource> TSyncPointLimitControl::OnAddSource(const std::shared_ptr<NCommon::IDataSource>& source) {
40+
AFL_VERIFY(FetchedCount < Limit)("fetched", FetchedCount)("limit", Limit);
41+
UnfilledIterators.emplace_back(TSourceIterator(source));
42+
43+
return TBase::OnAddSource(source);
44+
}
45+
4046
ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady(
4147
const std::shared_ptr<NCommon::IDataSource>& source, TPlainReadData& /*reader*/) {
4248
if (FetchedCount >= Limit) {
4349
return ESourceAction::Finish;
4450
}
45-
const auto& rk = *source->GetSourceSchema()->GetIndexInfo().GetReplaceKey();
46-
const auto& g = source->GetStageResult().GetBatch();
47-
AFL_VERIFY(Iterators.size());
48-
if (Iterators.front().GetSourceIdx() != source->GetSourceIdx()) {
49-
for (auto it : Iterators) {
50-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Iterator", it.DebugString());
51+
52+
AFL_VERIFY(UnfilledIterators.size());
53+
54+
if (UnfilledIterators.front().GetSourceIdx() != source->GetSourceIdx()) {
55+
for (auto it : UnfilledIterators) {
56+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("UnfilledIterators", it.DebugString());
5157
}
52-
for (auto it : DebugOrder) {
53-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it.DebugString());
58+
for (auto it : FilledIterators) {
59+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("FilledIterators", it.DebugString());
5460
}
5561
for (auto it : SourcesSequentially) {
5662
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("SourcesSequentially", it->GetSourceIdx());
5763
}
58-
if (FindIf(Iterators, [&](const auto& item) {
64+
if (FindIf(UnfilledIterators, [&](const auto& item) {
5965
return item.GetSourceIdx() == source->GetSourceIdx();
60-
}) != Iterators.end()) {
61-
AFL_VERIFY(Iterators.front().GetSourceIdx() == source->GetSourceIdx())("issue #28037", "portion is in heap")("front", Iterators.front().DebugString())
62-
("back", Iterators.back().DebugString())("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())("source_idx", source->GetSourceIdx());
63-
} else if (FindIf(DebugOrder, [&](const auto& item) {
66+
}) != UnfilledIterators.end()) {
67+
AFL_VERIFY(UnfilledIterators.front().GetSourceIdx() == source->GetSourceIdx())("issue #28037", "portion is in UnfilledIterators")("front", UnfilledIterators.front().DebugString())
68+
("back", UnfilledIterators.back().DebugString())("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())("source_idx", source->GetSourceIdx());
69+
} else if (FindIf(FilledIterators, [&](const auto& item) {
6470
return item.GetSourceIdx() == source->GetSourceIdx();
65-
}) != Iterators.end()) {
66-
AFL_VERIFY(Iterators.front().GetSourceIdx() == source->GetSourceIdx())("issue #28037", "known portion, not in heap")("front", Iterators.front().DebugString())
67-
("back", Iterators.back().DebugString())("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())("source_idx", source->GetSourceIdx());
71+
}) != FilledIterators.end()) {
72+
AFL_VERIFY(UnfilledIterators.front().GetSourceIdx() == source->GetSourceIdx())("issue #28037", "portion is in FilledIterators")("front", UnfilledIterators.front().DebugString())
73+
("back", UnfilledIterators.back().DebugString())("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())("source_idx", source->GetSourceIdx());
6874
} else {
69-
AFL_VERIFY(Iterators.front().GetSourceIdx() == source->GetSourceIdx())("issue #28037", "unknown portion")("front", Iterators.front().DebugString())
70-
("back", Iterators.back().DebugString())("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())("source_idx", source->GetSourceIdx());
75+
AFL_VERIFY(UnfilledIterators.front().GetSourceIdx() == source->GetSourceIdx())("issue #28037", "unknown portion")("front", UnfilledIterators.front().DebugString())
76+
("back", UnfilledIterators.back().DebugString())("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())("source_idx", source->GetSourceIdx());
7177
}
7278
}
73-
std::pop_heap(Iterators.begin(), Iterators.end());
74-
if (!g || !g->GetRecordsCount()) {
75-
Iterators.pop_back();
76-
} else {
79+
80+
UnfilledIterators.pop_front();
81+
82+
const auto& rk = *source->GetSourceSchema()->GetIndexInfo().GetReplaceKey();
83+
const auto& g = source->GetStageResult().GetBatch();
84+
85+
if (g && g->GetRecordsCount()) {
7786
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> arrs;
7887
for (auto&& i : rk.fields()) {
7988
auto acc = g->GetAccessorByNameOptional(i->name());
@@ -90,9 +99,9 @@ ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady(
9099
}
91100
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoOnSourceCheckLimitFillIterator")("source_idx", source->GetSourceIdx())(
92101
"fetched", FetchedCount)("limit", Limit);
93-
Iterators.back() = TSourceIterator(arrs, source->GetStageResult().GetNotAppliedFilter(), source);
94-
AFL_VERIFY(Iterators.back().IsFilled());
95-
std::push_heap(Iterators.begin(), Iterators.end());
102+
FilledIterators.emplace_back(arrs, source->GetStageResult().GetNotAppliedFilter(), source);
103+
AFL_VERIFY(FilledIterators.back().IsFilled());
104+
std::push_heap(FilledIterators.begin(), FilledIterators.end());
96105
}
97106
if (DrainToLimit()) {
98107
Collection->Clear();

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,31 +108,31 @@ class TSyncPointLimitControl: public ISyncPoint {
108108
}
109109

110110
bool operator<(const TSourceIterator& item) const {
111-
const auto cmp = SortableRecord->ComparePartial(*item.SortableRecord);
111+
const auto cmp = SortableRecord->Compare(*item.SortableRecord);
112112
if (cmp == std::partial_ordering::equivalent) {
113113
return item.Source->GetSourceIdx() < Source->GetSourceIdx();
114114
}
115115
return cmp == std::partial_ordering::greater;
116116
}
117+
118+
std::partial_ordering ComparePrefix(const TSourceIterator& item, const ui32 prefixSize) const {
119+
return SortableRecord->ComparePrefix(*item.SortableRecord, prefixSize);
120+
}
121+
117122
};
118123

119-
std::vector<TSourceIterator> Iterators;
120-
std::vector<TSourceIterator> DebugOrder;
124+
std::vector<TSourceIterator> FilledIterators;
125+
std::deque<TSourceIterator> UnfilledIterators;
121126

122127
virtual bool IsFinished() const override {
123128
return FetchedCount >= Limit || TBase::IsFinished();
124129
}
125130

126-
virtual std::shared_ptr<NCommon::IDataSource> OnAddSource(const std::shared_ptr<NCommon::IDataSource>& source) override {
127-
AFL_VERIFY(FetchedCount < Limit)("fetched", FetchedCount)("limit", Limit);
128-
Iterators.emplace_back(TSourceIterator(source));
129-
DebugOrder.emplace_back(TSourceIterator(source));
130-
std::push_heap(Iterators.begin(), Iterators.end());
131-
return TBase::OnAddSource(source);
132-
}
131+
virtual std::shared_ptr<NCommon::IDataSource> OnAddSource(const std::shared_ptr<NCommon::IDataSource>& source) override;
133132

134133
virtual void DoAbort() override {
135-
Iterators.clear();
134+
FilledIterators.clear();
135+
UnfilledIterators.clear();
136136
}
137137

138138
virtual ESourceAction OnSourceReady(const std::shared_ptr<NCommon::IDataSource>& source, TPlainReadData& reader) override;

0 commit comments

Comments
 (0)