Skip to content

Commit dddcffc

Browse files
committed
improve hash dispatch blocks
1) extend remote inserters to accelerate data transform 2) make advantage of low cardinality to improve groupby
1 parent ff781b0 commit dddcffc

10 files changed

+174
-94
lines changed

src/Interpreters/ASTRewriters/CollectRequiredColumnsAction.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,15 @@ void CollectRequiredColumnsAction::visit(const ASTIdentifier * ident_ast)
147147
.alias_name = ident_ast->tryGetAlias(),
148148
.type = col.type
149149
};
150+
/*
151+
LOG_TRACE(
152+
&Poco::Logger::get("CollectRequiredColumnsAction"),
153+
"add ident @ {}, full name:{}, short name:{}, alias:{}.",
154+
*best_pos,
155+
ident_ast->name(),
156+
ident_ast->shortName(),
157+
ident_ast->tryGetAlias());
158+
*/
150159
final_result.required_columns[*best_pos].push_back(column_metadta);
151160
found = true;
152161
added_names.insert(ident_ast->name());

src/Interpreters/ASTRewriters/IdentifierQualiferRemoveAction.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ ASTs IdentifiterQualiferRemoveAction::collectChildren(const ASTPtr & ast)
3232

3333
void IdentifiterQualiferRemoveAction::visit(const ASTPtr & ast)
3434
{
35+
if (!ast)
36+
return;
3537
if (const auto * function_ast = ast->as<ASTFunction>())
3638
{
3739
auto frame = frames.getTopFrame();

src/Interpreters/ASTRewriters/NestedJoinQueryRewriteAction.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ void NestedJoinQueryRewriteAction::visit(const ASTSelectQuery * select_ast)
160160
{
161161
auto ident = std::make_shared<ASTIdentifier>(col.splitedFullName());
162162
ident->alias = col.alias_name;
163-
if (ident->alias.empty())
164-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Alias name is expected for {}", col.full_name);
163+
//if (ident->alias.empty())
164+
// throw Exception(ErrorCodes::LOGICAL_ERROR, "Alias is expected for {}", col.full_name);
165165
nested_select_expr_list->children.emplace_back(ident);
166166
}
167167
nested_select_ast->setExpression(ASTSelectQuery::Expression::SELECT, nested_select_expr_list);

src/Interpreters/ASTRewriters/NestedJoinQueryRewriteAction.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <Parsers/IAST_fwd.h>
1111
#include <base/types.h>
1212
#include <Poco/Logger.h>
13+
#include <Interpreters/DatabaseAndTableWithAlias.h>
1314
namespace DB
1415
{
1516
/**

src/Interpreters/ASTRewriters/StageQueryDistributedAggregationRewriteAction.cpp

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,12 @@ void StageQueryDistributedAggregationRewriteAction::visitSelectQueryWithAggregat
300300

301301
void StageQueryDistributedAggregationRewriteAction::visitSelectQueryWithGroupby(const ASTSelectQuery * select_ast)
302302
{
303+
LOG_TRACE(logger, "visitSelectQueryWithGroupby:{}", queryToString(*select_ast));
303304
auto frame = frames.getTopFrame();
305+
auto tables = getDatabaseAndTablesWithColumns(getTableExpressions(*select_ast), context, true, true);
306+
if (tables.size() != 1)
307+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tables size should be 1");
308+
304309
auto * rewrite_table_expr = frame->children_results[0]->as<ASTTableExpression>();
305310
if (!rewrite_table_expr)
306311
throw Exception(ErrorCodes::LOGICAL_ERROR, "ASTTableExpression is expected. return query is : {}", queryToString(frame->children_results[0]));
@@ -313,21 +318,36 @@ void StageQueryDistributedAggregationRewriteAction::visitSelectQueryWithGroupby(
313318

314319
rewrite_table_expr->subquery->as<ASTSubquery>()->setAlias(table_alias);
315320
}
316-
317-
auto tables = getDatabaseAndTablesWithColumns(getTableExpressions(*select_ast), context, true, true);
318-
if (tables.size() != 1)
319-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tables size should be 1");
321+
322+
if (isAllRequiredColumnsLowCardinality(select_ast->groupBy(), tables))
323+
{
324+
CollectQueryStoragesAction collect_storage_action(context);
325+
ASTDepthFirstVisitor<CollectQueryStoragesAction> collect_storage_visitor(collect_storage_action, frame->children_results[0]);
326+
auto storages = collect_storage_visitor.visit();
327+
/// If all columns used in the groupby clasue are low cardinality, do not shuffle the data and
328+
/// run the groupby in the two-phase way.
329+
if (storages.size() > 1)
330+
visitSelectQueryWithAggregation(select_ast);
331+
return;
332+
}
320333

321334
CollectRequiredColumnsAction collect_columns_action(tables);
322335
ASTDepthFirstVisitor<CollectRequiredColumnsAction> collect_columns_visitor(collect_columns_action, select_ast->clone());
323336
auto required_columns = collect_columns_visitor.visit().required_columns;
324337

325-
auto insert_query = createShuffleInsert(
338+
ASTPtr insert_query = createShuffleInsert(
326339
TableFunctionShuffleAggregation::name,
327340
rewrite_table_expr,
328341
ColumnWithDetailNameAndType::toNamesAndTypesList(required_columns[0]),
329342
select_ast->groupBy());
330343

344+
auto * insert_query_ptr = insert_query->as<ASTInsertQuery>();
345+
auto * insert_select_ptr = insert_query_ptr->select->as<ASTSelectWithUnionQuery>()->list_of_selects->children[0]->as<ASTSelectQuery>();
346+
IdentifiterQualiferRemoveAction remove_qualifier_action;
347+
ASTDepthFirstVisitor<IdentifiterQualiferRemoveAction> remove_qualifier_visitor(remove_qualifier_action, select_ast->where());
348+
auto where_expr = remove_qualifier_visitor.visit();
349+
insert_select_ptr->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expr));
350+
331351
ASTs upstream_queries;
332352
frame->mergeChildrenUpstreamQueries();
333353
if (!frame->upstream_queries[0].empty())
@@ -341,6 +361,7 @@ void StageQueryDistributedAggregationRewriteAction::visitSelectQueryWithGroupby(
341361

342362
frame->result_ast = select_ast->clone();
343363
auto * result_select_ast = frame->result_ast->as<ASTSelectQuery>();
364+
result_select_ast->setExpression(ASTSelectQuery::Expression::WHERE, nullptr);
344365
ASTBuildUtil::updateSelectQueryTables(
345366
result_select_ast,
346367
ASTBuildUtil::createTablesInSelectQueryElement(table_function->as<ASTFunction>())->as<ASTTablesInSelectQueryElement>());
@@ -371,4 +392,23 @@ ASTPtr StageQueryDistributedAggregationRewriteAction::createShuffleInsert(
371392

372393
return ASTBuildUtil::createTableFunctionInsertSelectQuery(table_function, ASTBuildUtil::wrapSelectQuery(select_query));
373394
}
395+
396+
bool StageQueryDistributedAggregationRewriteAction::isAllRequiredColumnsLowCardinality(const ASTPtr & ast, const TablesWithColumns & tables)
397+
{
398+
CollectRequiredColumnsAction collect_columns_action(tables);
399+
ASTDepthFirstVisitor<CollectRequiredColumnsAction> collect_columns_visitor(collect_columns_action, ast);
400+
auto required_columns = collect_columns_visitor.visit().required_columns;
401+
for (auto & cols : required_columns)
402+
{
403+
for (auto & col : cols)
404+
{
405+
LOG_TRACE(logger, "check group by col. {} {}", col.full_name, col.type->getName());
406+
if (!col.type->lowCardinality())
407+
return false;
408+
}
409+
}
410+
LOG_TRACE(logger, "all columns are locaCardinality");
411+
return true;
412+
413+
}
374414
}

src/Interpreters/ASTRewriters/StageQueryDistributedAggregationRewriteAction.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <Parsers/ASTSubquery.h>
88
#include <Parsers/IAST.h>
99
#include <Parsers/IAST_fwd.h>
10+
#include <Interpreters/DatabaseAndTableWithAlias.h>
1011

1112
namespace DB
1213
{
@@ -67,5 +68,7 @@ class StageQueryDistributedAggregationRewriteAction : public EmptyASTDepthFirstV
6768

6869
ASTPtr createShuffleInsert(
6970
const String & table_function_name, ASTTableExpression * table_expr, const NamesAndTypesList & table_desc, ASTPtr groupby_clause);
71+
72+
bool isAllRequiredColumnsLowCardinality(const ASTPtr & ast, const TablesWithColumns & tables);
7073
};
7174
}

src/Processors/Transforms/StageQueryTransform.cpp

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,8 @@ ParallelStageBlockIOsTransform::ParallelStageBlockIOsTransform(
226226

227227
ParallelStageBlockIOsTransform::~ParallelStageBlockIOsTransform()
228228
{
229-
#if 0
230-
for (auto & task : background_tasks)
231-
{
232-
task->deactivate();
233-
}
234-
#else
235229
if (thread_pool)
236230
thread_pool->wait();
237-
#endif
238231
LOG_TRACE(logger, "run query({}) in elapsedMilliseconds:{}", queryToString(output_block_io.query), elapsed);
239232
}
240233

@@ -305,20 +298,11 @@ void ParallelStageBlockIOsTransform::startBackgroundTasks()
305298
queryToString(block_io.query),
306299
task_watch.elapsedMilliseconds());
307300
};
308-
#if 0
309-
auto & thread_pool = context->getSchedulePool();
310-
for (auto & block_io : input_block_ios)
311-
{
312-
background_tasks.emplace_back(thread_pool.createTask("BackgroundBlockIOTask", [build_task, &block_io](){ build_task(block_io);}));
313-
background_tasks.back()->activateAndSchedule();
314-
}
315-
#else
316301
thread_pool = std::make_unique<ThreadPool>(input_block_ios.size());
317302
for (auto & block : input_block_ios)
318303
{
319304
thread_pool->scheduleOrThrowOnError([&]() { build_task(block); });
320305
}
321-
#endif
322306
has_start_background_tasks = true;
323307
}
324308
}

src/Storages/DistributedShuffle/ShuffleBlockTable.cpp

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,18 @@ void ShuffleBlockTable::addChunk(Chunk && chunk)
3030
{
3131
if (chunk.hasRows()) [[likely]]
3232
{
33-
if (is_sink_finished)[[unlikely]]
34-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Try in insert into a sink finished table({}.{})", session_id, table_id);
35-
std::unique_lock lock(mutex);
36-
rows += chunk.getNumRows();
37-
chunks.emplace_back(std::move(chunk));
33+
{
34+
std::unique_lock lock(mutex);
35+
if (is_sink_finished) [[unlikely]]
36+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Try in insert into a sink finished table({}.{})", session_id, table_id);
37+
while (remained_rows > max_rows_limit)
38+
wait_consume_data.wait(lock);
39+
rows += chunk.getNumRows();
40+
remained_rows += chunk.getNumRows();
41+
if (remained_rows > max_rows)
42+
max_rows = remained_rows;
43+
chunks.emplace_back(std::move(chunk));
44+
}
3845
wait_more_data.notify_one();
3946
}
4047
else
@@ -46,30 +53,34 @@ void ShuffleBlockTable::addChunk(Chunk && chunk)
4653

4754
Chunk ShuffleBlockTable::popChunk()
4855
{
49-
std::unique_lock lock(mutex);
50-
while (chunks.empty())
56+
Chunk res;
5157
{
52-
if (!is_sink_finished)
53-
{
54-
wait_more_data.wait(lock, [&] { return is_sink_finished || !chunks.empty(); });
55-
}
56-
else
58+
std::unique_lock lock(mutex);
59+
while (chunks.empty())
5760
{
58-
break;
61+
if (!is_sink_finished)
62+
{
63+
wait_more_data.wait(lock, [&] { return is_sink_finished || !chunks.empty(); });
64+
}
65+
else
66+
{
67+
break;
68+
}
5969
}
60-
}
61-
//LOG_TRACE(logger, "{}.{} popChunk. isSinkFinished()={}, chunks.size()={}", session_id, table_id, is_sink_finished, chunks.size());
70+
//LOG_TRACE(logger, "{}.{} popChunk. isSinkFinished()={}, chunks.size()={}", session_id, table_id, is_sink_finished, chunks.size());
6271

63-
Chunk res;
64-
if (!chunks.empty()) [[likely]]
65-
{
66-
res.swap(chunks.front());
67-
chunks.pop_front();
68-
if (unlikely(!res.hasRows()))
72+
if (!chunks.empty()) [[likely]]
6973
{
70-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should not be empty. table({}.{})", session_id, table_id);
74+
res.swap(chunks.front());
75+
remained_rows -= res.getNumRows();
76+
chunks.pop_front();
77+
if (unlikely(!res.hasRows()))
78+
{
79+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should not be empty. table({}.{})", session_id, table_id);
80+
}
7181
}
7282
}
83+
wait_consume_data.notify_all();
7384
return res;
7485
}
7586

src/Storages/DistributedShuffle/ShuffleBlockTable.h

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,6 @@
1818
namespace DB
1919
{
2020
///
21-
/// How to clear all the data when a query session has finished ?
22-
/// The following measures were taken at current
23-
/// 1)Chunks in ShuffleBlockTable are read only once, so we use popChunkWithoutMutex() for loading a chunk.
24-
/// That ensures that all chunks are released after the loading finish.
25-
/// 2) When ShuffleBlockTable becomes empty, it will call ShuffleBlockSession::releaseTable() to
26-
/// release it-self.
27-
/// 3) When ShuffleBlockSession becomes empty, it will call ShuffleBlockTableManager::tryCloseSession() to
28-
/// release it-self.
29-
/// All above will ensure all datas are released in normal processing. But more need be considered, exceptions could
30-
/// happen during the processing which make the release actions not be called. Some measures may be token.
31-
/// 1) In TCPHandler, catch all exceptions , and make a session releasing action on all nodes
32-
/// 2) All sessions have a max TTL, make background routine to check timeout sessions and clear them.
33-
///
3421

3522
class ShuffleBlockTable
3623
{
@@ -47,7 +34,7 @@ class ShuffleBlockTable
4734

4835
~ShuffleBlockTable()
4936
{
50-
LOG_TRACE(logger, "close table {}.{}", session_id, table_id);
37+
LOG_TRACE(logger, "close table {}.{}. rows:{}. max_rows:{}", session_id, table_id, rows, max_rows);
5138
}
5239

5340
inline const Block & getHeader() const
@@ -79,8 +66,12 @@ class ShuffleBlockTable
7966
std::atomic<bool> is_sink_finished = false;
8067
std::list<Chunk> chunks;
8168
std::condition_variable wait_more_data;
69+
std::condition_variable wait_consume_data;
8270
Poco::Logger * logger = &Poco::Logger::get("ShuffleBlockTable");
8371
size_t rows = 0;
72+
size_t remained_rows = 0;
73+
size_t max_rows = 0;
74+
const static size_t max_rows_limit = 20000000;
8475
};
8576
using ShuffleBlockTablePtr = std::shared_ptr<ShuffleBlockTable>;
8677

0 commit comments

Comments
 (0)