Skip to content

Commit 88682bc

Browse files
authored
Merge pull request ClickHouse#78505 from korowa/fix-zero-streams-reading
Fix zero streams after max_streams_to_max_threads_ratio applied
2 parents d680e0c + be80a0e commit 88682bc

7 files changed

+64
-5
lines changed

src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2652,8 +2652,6 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
26522652
else if (storage)
26532653
{
26542654
/// Table.
2655-
if (max_streams == 0)
2656-
max_streams = 1;
26572655

26582656
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
26592657
if (max_streams > 1 && !is_sync_remote)
@@ -2668,6 +2666,9 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
26682666
streams_with_ratio);
26692667
}
26702668

2669+
if (max_streams == 0)
2670+
max_streams = 1;
2671+
26712672
auto & prewhere_info = analysis_result.prewhere_info;
26722673

26732674
if (prewhere_info)

src/Planner/PlannerJoinTree.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -771,9 +771,6 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
771771
"Setting 'max_block_size' cannot be zero");
772772
}
773773

774-
if (max_streams == 0)
775-
max_streams = 1;
776-
777774
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads
778775
if (max_streams > 1 && !is_sync_remote)
779776
{
@@ -787,6 +784,9 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
787784
streams_with_ratio);
788785
}
789786

787+
if (max_streams == 0)
788+
max_streams = 1;
789+
790790
if (table_node)
791791
table_expression_query_info.table_expression_modifiers = table_node->getTableExpressionModifiers();
792792
else

src/Processors/QueryPlan/ReadFromMergeTree.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,20 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
11261126
return new_ranges;
11271127
};
11281128

1129+
if (num_streams > 1)
1130+
{
1131+
/// Reduce num_streams if requested value is unnecessarily large.
1132+
///
1133+
/// Additional increase of streams number in case of skewed parts, like it's
1134+
/// done in `spreadMarkRangesAmongStreams` won't affect overall performance
1135+
/// due to the single downstream `MergingSortedTransform`.
1136+
if (info.sum_marks < num_streams * info.min_marks_for_concurrent_read && parts_with_ranges.size() < num_streams)
1137+
{
1138+
num_streams = std::max(
1139+
(info.sum_marks + info.min_marks_for_concurrent_read - 1) / info.min_marks_for_concurrent_read, parts_with_ranges.size());
1140+
}
1141+
}
1142+
11291143
const size_t min_marks_per_stream = (info.sum_marks - 1) / num_streams + 1;
11301144
bool need_preliminary_merge = (parts_with_ranges.size() > settings[Setting::read_in_order_two_level_merge_threshold]);
11311145

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
49.5
2+
49.5
3+
49.5
4+
5+
0
6+
0
7+
0
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
DROP TABLE IF EXISTS 03402_data;
2+
3+
CREATE TABLE 03402_data (id UInt32) ENGINE = MergeTree ORDER BY id;
4+
INSERT INTO 03402_data SELECT * FROM numbers(100);
5+
6+
SELECT avg(id) FROM 03402_data SETTINGS max_threads = 4, max_streams_to_max_threads_ratio = 0;
7+
SELECT avg(id) FROM 03402_data SETTINGS max_threads = 0, max_streams_to_max_threads_ratio = 0;
8+
SELECT avg(id) FROM 03402_data SETTINGS max_threads = 2, max_streams_to_max_threads_ratio = 0.2;
9+
10+
SELECT '';
11+
12+
SELECT id FROM 03402_data ORDER BY id LIMIT 1 SETTINGS max_threads = 4, max_streams_to_max_threads_ratio = 0;
13+
SELECT id FROM 03402_data ORDER BY id LIMIT 1 SETTINGS max_threads = 0, max_streams_to_max_threads_ratio = 0;
14+
SELECT id FROM 03402_data ORDER BY id LIMIT 1 SETTINGS max_threads = 2, max_streams_to_max_threads_ratio = 0.2;
15+
16+
DROP TABLE 03402_data;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
1
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
DROP TABLE IF EXISTS 03403_data;
2+
CREATE TABLE 03403_data(id UInt32, val String) ENGINE = MergeTree ORDER BY id AS SELECT 1, 'test';
3+
4+
SELECT *
5+
FROM 03403_data
6+
ORDER BY id
7+
FORMAT Null
8+
SETTINGS max_threads = 1024,
9+
max_streams_to_max_threads_ratio = 10000000;
10+
11+
SYSTEM FLUSH LOGS query_log;
12+
13+
SELECT memory_usage < 10_000_000
14+
FROM system.query_log
15+
WHERE Settings['max_streams_to_max_threads_ratio'] = '10000000'
16+
AND query like '%FROM 03403_data%'
17+
AND type = 'QueryFinish'
18+
AND current_database = currentDatabase();
19+
20+
DROP TABLE 03403_data;

0 commit comments

Comments
 (0)