Skip to content

Commit 305a438

Browse files
authored
stable-25-2: cherry-pick PR 25095, PR 25725, PR 26340 (#26484)
2 parents 6e95f2c + 09a77ff commit 305a438

File tree

7 files changed

+62
-41
lines changed

7 files changed

+62
-41
lines changed

ydb/core/formats/arrow/program/abstract.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,14 @@ class TFetchingCalculationPolicy: public IMemoryCalculationPolicy {
4848
return EStage::Fetching;
4949
}
5050
virtual ui64 GetReserveMemorySize(
51-
const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> limit, const ui32 recordsCount) const override {
52-
if (limit) {
53-
return std::max<ui64>(blobsSize, rawSize * (1.0 * *limit) / recordsCount);
54-
} else {
55-
return std::max<ui64>(blobsSize, rawSize);
56-
}
51+
const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> /*limit*/, const ui32 /*recordsCount*/) const override {
52+
return std::max<ui64>(blobsSize, rawSize);
53+
// FIXME after futher memory usage investagiation
54+
// if (limit) {
55+
// return std::max<ui64>(blobsSize, rawSize * (1.0 * *limit) / recordsCount);
56+
// } else {
57+
// return std::max<ui64>(blobsSize, rawSize);
58+
// }
5759
}
5860
};
5961

ydb/core/kqp/ut/olap/aggregations_ut.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -488,8 +488,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
488488
LIMIT 2
489489
)")
490490
.AddExpectedPlanOptions("KqpOlapFilter")
491-
.MutableLimitChecker().SetExpectedLimit(2)
492-
.SetExpectedResultCount(400); // FIXME (delete this line)
491+
.MutableLimitChecker().SetExpectedLimit(2);
493492
TestAggregations({ testCase });
494493
}
495494

@@ -504,8 +503,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
504503
LIMIT 2
505504
)")
506505
.AddExpectedPlanOptions("KqpOlapFilter")
507-
.MutableLimitChecker().SetExpectedLimit(2)
508-
.SetExpectedResultCount(400); // FIXME (delete this line)
506+
.MutableLimitChecker().SetExpectedLimit(2);
509507
TestAggregations({ testCase });
510508
}
511509

@@ -520,8 +518,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
520518
)")
521519
.AddExpectedPlanOptions("KqpOlapFilter")
522520
.AddExpectedPlanOptions("KqpOlapExtractMembers")
523-
.MutableLimitChecker().SetExpectedLimit(2)
524-
.SetExpectedResultCount(400); // FIXME (delete this line)
521+
.MutableLimitChecker().SetExpectedLimit(2);
525522
TestAggregations({ testCase });
526523
}
527524

@@ -537,8 +534,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
537534
)")
538535
.AddExpectedPlanOptions("KqpOlapFilter")
539536
.AddExpectedPlanOptions("KqpOlapExtractMembers")
540-
.MutableLimitChecker().SetExpectedLimit(2)
541-
.SetExpectedResultCount(400); // FIXME (delete this line)
537+
.MutableLimitChecker().SetExpectedLimit(2);
542538
TestAggregations({ testCase });
543539
}
544540

ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class TReadMetadataBase {
2424

2525
private:
2626
YDB_ACCESSOR_DEF(TString, ScanIdentifier);
27+
YDB_ACCESSOR_DEF(bool, FakeSort);
2728
std::optional<ui64> FilteredCountLimit;
2829
std::optional<ui64> RequestedLimit;
2930
const ESorting Sorting = ESorting::ASC; // Sorting inside returned batches

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ TConclusionStatus TScanHead::Start() {
2121

2222
TScanHead::TScanHead(std::unique_ptr<NCommon::ISourcesConstructor>&& sourcesConstructor, const std::shared_ptr<TSpecialReadContext>& context)
2323
: Context(context) {
24+
auto readMetadataContext = context->GetReadMetadata();
2425
if (auto script = Context->GetSourcesAggregationScript()) {
2526
SourcesCollection =
26-
std::make_shared<TNotSortedCollection>(Context, std::move(sourcesConstructor), Context->GetReadMetadata()->GetLimitRobustOptional());
27+
std::make_shared<TNotSortedCollection>(Context, std::move(sourcesConstructor), readMetadataContext->GetLimitRobustOptional());
2728
SyncPoints.emplace_back(std::make_shared<TSyncPointResult>(SyncPoints.size(), context, SourcesCollection));
2829
SyncPoints.emplace_back(std::make_shared<TSyncPointResultsAggregationControl>(
2930
SourcesCollection, Context->GetSourcesAggregationScript(), Context->GetRestoreResultScript(), SyncPoints.size(), context));
30-
} else if (Context->GetReadMetadata()->IsSorted()) {
31-
if (Context->GetReadMetadata()->HasLimit()) {
31+
} else if (readMetadataContext->IsSorted()) {
32+
if (readMetadataContext->HasLimit() && !readMetadataContext->GetFakeSort()) {
3233
auto collection = std::make_shared<TScanWithLimitCollection>(Context, std::move(sourcesConstructor));
3334
SourcesCollection = collection;
3435
SyncPoints.emplace_back(std::make_shared<TSyncPointLimitControl>(
@@ -39,7 +40,7 @@ TScanHead::TScanHead(std::unique_ptr<NCommon::ISourcesConstructor>&& sourcesCons
3940
SyncPoints.emplace_back(std::make_shared<TSyncPointResult>(SyncPoints.size(), context, SourcesCollection));
4041
} else {
4142
SourcesCollection =
42-
std::make_shared<TNotSortedCollection>(Context, std::move(sourcesConstructor), Context->GetReadMetadata()->GetLimitRobustOptional());
43+
std::make_shared<TNotSortedCollection>(Context, std::move(sourcesConstructor), readMetadataContext->GetLimitRobustOptional());
4344
SyncPoints.emplace_back(std::make_shared<TSyncPointResult>(SyncPoints.size(), context, SourcesCollection));
4445
}
4546
for (ui32 i = 0; i + 1 < SyncPoints.size(); ++i) {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@ bool TSyncPointLimitControl::DrainToLimit() {
1717
if (Collection->GetNextSource()) {
1818
nextInHeap = TSourceIterator(Collection->GetNextSource());
1919
}
20-
if (Iterators.empty() || (nextInHeap && Iterators.front() < *nextInHeap)) {
21-
return false;
22-
}
2320

24-
while (Iterators.size()) {
21+
while (Iterators.size() && (!nextInHeap || !(Iterators.front() < *nextInHeap))) {
2522
if (!Iterators.front().IsFilled()) {
2623
return false;
2724
}

ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
4646
return TReadMetadataBase::ESorting::NONE;
4747
}
4848
}();
49-
/* FIXME: #22992 */
50-
const auto useLimit = request.HasItemsLimit() && (sorting == ERequestSorting::NONE || !deduplicationEnabled);
51-
TScannerConstructorContext context(snapshot, useLimit ? request.GetItemsLimit() : 0, sorting);
49+
TScannerConstructorContext context(snapshot, request.HasItemsLimit() ? request.GetItemsLimit() : 0, sorting);
5250
const auto scanId = request.GetScanId();
5351
const ui64 txId = request.GetTxId();
5452
const ui32 scanGen = request.GetGeneration();
@@ -150,6 +148,9 @@ void TTxScan::Complete(const TActorContext& ctx) {
150148
}
151149
auto newRange = scannerConstructor->BuildReadMetadata(Self, read);
152150
if (newRange.IsSuccess()) {
151+
if (!request.HasReverse() && deduplicationEnabled) {
152+
(*newRange)->SetFakeSort(true);
153+
}
153154
readMetadataRange = TValidator::CheckNotNull(newRange.DetachResult());
154155
} else {
155156
return SendError("cannot build metadata", newRange.GetErrorMessage(), ctx);

ydb/tests/olap/order_by_with_limit.py

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import yatest.common
44
import ydb
5+
import random
56

67
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
78
from ydb.tests.library.harness.kikimr_runner import KiKiMR
@@ -12,9 +13,11 @@
1213

1314
class TestOrderBy(object):
1415
test_name = "order_by"
16+
n = 200
1517

1618
@classmethod
1719
def setup_class(cls):
20+
random.seed(0xBEDA)
1821
ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
1922
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
2023
config = KikimrConfigGenerator(
@@ -33,20 +36,21 @@ def setup_class(cls):
3336
def write_data(self, table: str):
3437
column_types = ydb.BulkUpsertColumns()
3538
column_types.add_column("id", ydb.PrimitiveType.Uint64)
39+
column_types.add_column("value", ydb.PrimitiveType.Uint64)
3640

37-
for i in range(100):
38-
data = [
39-
{
40-
"id": i,
41-
}
42-
]
43-
44-
data.extend({"id": i * 100000 + j} for j in range(100))
41+
# [2, 4, 6, 8, 10, ...]
42+
# [3, 6, 9, 12, 15, ...]
43+
# [4, 8, 12, 16, 20, ...]
44+
# [5, 10, 15, 20, 25, ...]
45+
# ...
46+
data = [[{"id": j, "value": j} for j in range(1, self.n) if j % i == 0] for i in range(2, self.n)]
47+
random.shuffle(data)
4548

49+
for row in data:
4650
self.ydb_client.bulk_upsert(
4751
table,
4852
column_types,
49-
data,
53+
row,
5054
)
5155

5256
def test(self):
@@ -57,6 +61,7 @@ def test(self):
5761
f"""
5862
CREATE TABLE `{table_path}` (
5963
id Uint64 NOT NULL,
64+
value Uint64 NOT NULL,
6065
PRIMARY KEY(id),
6166
)
6267
WITH (
@@ -68,12 +73,30 @@ def test(self):
6873

6974
self.write_data(table_path)
7075

71-
result_sets = self.ydb_client.query(
72-
f"""
73-
select id from `{table_path}` order by id limit 10
74-
"""
75-
)
76+
for i in range(100):
77+
limit = random.randint(1, self.n)
78+
offset = random.randint(1, self.n)
79+
is_desc = random.randint(0, 1)
80+
is_le = random.randint(0, 1)
81+
order = ["asc", "desc"]
82+
condition = [">", "<"]
83+
data = list(range(2, self.n))
84+
if is_le:
85+
answer = [i for i in data if i < offset]
86+
else:
87+
answer = [i for i in data if i > offset]
88+
if is_desc:
89+
answer = answer[::-1]
90+
answer = answer[:limit]
91+
92+
result_sets = self.ydb_client.query(
93+
f"""
94+
select id from `{table_path}`
95+
where value {condition[is_le]} {offset}
96+
order by id {order[is_desc]} limit {limit}
97+
"""
98+
)
7699

77-
keys = [row['id'] for result_set in result_sets for row in result_set.rows]
100+
keys = [row['id'] for result_set in result_sets for row in result_set.rows]
78101

79-
assert keys == [i for i in range(10)], keys
102+
assert keys == answer, keys

0 commit comments

Comments
 (0)