Skip to content

Commit ff781b0

Browse files
committed
fixed: session close
1 parent 082f80d commit ff781b0

File tree

9 files changed

+164
-73
lines changed

9 files changed

+164
-73
lines changed

src/Interpreters/ASTRewriters/ASTBuildUtil.h

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@ class ASTBuildUtil
1616
static String toTableStructureDescription(const ColumnWithDetailNameAndTypes & columns);
1717

1818
/**
19-
* @brief Create a Shuffle Table Function object
19+
* Create a Shuffle Table Function object
2020
*
21-
* @param function_name which shuffle function to use. see TableFunctionShuffle.h
22-
* @param session_id session_od
23-
* @param cluster_name cluster name
24-
* @param table_id table_id
25-
* @param columns describe the table structure. etc. 'x int, y string'
26-
* @param hash_expression_list hash expression list for shuffle hashing. etc. 'x, y'
27-
* @param alias table alias
28-
* @return ASTPtr ASTFunction
21+
* - function_name which shuffle function to use. see TableFunctionShuffle.h
22+
* - session_id session_od
23+
* - cluster_name cluster name
24+
* - table_id table_id
25+
* - columns describe the table structure. etc. 'x int, y string'
26+
* - hash_expression_list hash expression list for shuffle hashing. etc. 'x, y'
27+
* - alias table alias
2928
*/
3029
static ASTPtr createShuffleTableFunction(
3130
const String & function_name,
@@ -37,32 +36,32 @@ class ASTBuildUtil
3736
const String & alias = "");
3837

3938
/**
40-
* @brief Create a Table Function Insert Select Query object
39+
* Create a Table Function Insert Select Query object
4140
*
42-
* @param table_function must be a ASTFunction
43-
* @param select_query must be a ASTSelectWithUnionQuery
44-
* @return ASTPtr it's a ASTInsertQuery
41+
* - table_function must be a ASTFunction
42+
* - select_query must be a ASTSelectWithUnionQuery
43+
* return ASTPtr it's a ASTInsertQuery
4544
*/
4645
static ASTPtr createTableFunctionInsertSelectQuery(ASTPtr table_function, ASTPtr select_query);
4746

4847
/**
49-
* @brief Create a ASTSelectWithUnionQuery with a ASTSelectQuery
48+
* Create a ASTSelectWithUnionQuery with a ASTSelectQuery
5049
*
51-
* @param select_query must be a ASTSelectQuery
52-
* @return ASTPtr it's a ASTSelectWithUnionQuery
50+
* - select_query must be a ASTSelectQuery
51+
* return ASTPtr it's a ASTSelectWithUnionQuery
5352
*/
5453
static ASTPtr wrapSelectQuery(const ASTSelectQuery * select_query);
5554

5655
/**
57-
* @brief Create a Select Expression object
56+
* Create a Select Expression object
5857
*
59-
* @param names_and_types Use the names to build the select expression
60-
* @return ASTPtr
58+
* - names_and_types Use the names to build the select expression
59+
* return ASTPtr
6160
*/
6261
static ASTPtr createSelectExpression(const NamesAndTypesList & names_and_types);
6362

6463
/**
65-
* @brief Update ASTSelectQuery::TABLES ASTTableExpressions
64+
* Update ASTSelectQuery::TABLES ASTTableExpressions
6665
*/
6766
static void updateSelectQueryTables(ASTSelectQuery * select_query, const ASTTableExpression * table_expr_);
6867

src/Interpreters/ASTRewriters/IdentRenameRewriteAction.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,7 @@ ASTs IdentifierRenameAction::collectChildren(const ASTPtr & ast)
5858
{
5959
return {};
6060
}
61-
else
62-
{
63-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknow ast type {}. {}", ast->getID(), queryToString(ast));
64-
}
61+
return {};
6562
}
6663
void IdentifierRenameAction::beforeVisitChildren(const ASTPtr & ast)
6764
{

src/Interpreters/ASTRewriters/IdentifierQualiferRemoveAction.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <Parsers/ASTLiteral.h>
77
#include <Parsers/ASTIdentifier.h>
88
#include <Parsers/queryToString.h>
9+
#include "Parsers/ASTExpressionList.h"
910

1011
namespace DB
1112
{
@@ -22,6 +23,10 @@ ASTs IdentifiterQualiferRemoveAction::collectChildren(const ASTPtr & ast)
2223
{
2324
children = function_ast->arguments->children;
2425
}
26+
else if (const auto * expr_list_ast = ast->as<ASTExpressionList>())
27+
{
28+
children = expr_list_ast->children;
29+
}
2530
return children;
2631
}
2732

@@ -49,6 +54,13 @@ void IdentifiterQualiferRemoveAction::visit(const ASTPtr & ast)
4954
auto * result_ast = frame->result_ast->as<ASTIdentifier>();
5055
result_ast->alias = ident_ast->tryGetAlias();
5156
}
57+
else if (const auto * expr_list_ast = ast->as<ASTExpressionList>())
58+
{
59+
auto frame = frames.getTopFrame();
60+
frame->result_ast = std::make_shared<ASTExpressionList>();
61+
auto * result_ast = frame->result_ast->as<ASTExpressionList>();
62+
result_ast->children = frame->children_results;
63+
}
5264
else
5365
{
5466
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid ast({}): {}", ast->getID(), queryToString(ast));

src/Interpreters/ASTRewriters/StageQueryDistributedAggregationRewriteAction.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ StageQueryDistributedAggregationRewriteAction::StageQueryDistributedAggregationR
3838

3939
void StageQueryDistributedAggregationRewriteAction::beforeVisitChildren(const ASTPtr & ast)
4040
{
41+
LOG_TRACE(logger, "{} push frame. {}:{}", __LINE__, ast->getID(), queryToString(ast));
4142
frames.pushFrame(ast);
4243
}
4344

@@ -59,6 +60,7 @@ ASTs StageQueryDistributedAggregationRewriteAction::collectChildren(const ASTPtr
5960
{
6061
if (!ast)
6162
return {};
63+
LOG_TRACE(logger, "{} collectChildren. {}:{}", __LINE__, ast->getID(), queryToString(ast));
6264
ASTs children;
6365
if (const auto * union_select_ast = ast->as<ASTSelectWithUnionQuery>())
6466
{
@@ -153,6 +155,7 @@ void StageQueryDistributedAggregationRewriteAction::visit(const ASTSubquery * su
153155

154156
void StageQueryDistributedAggregationRewriteAction::visit(const ASTSelectQuery * select_ast)
155157
{
158+
LOG_TRACE(logger, "{} frame size={}, select ast={}", __LINE__, frames.size(), queryToString(*select_ast));
156159
auto frame = frames.getTopFrame();
157160
if (frame->children_results.empty()) // join query
158161
{
@@ -165,12 +168,13 @@ void StageQueryDistributedAggregationRewriteAction::visit(const ASTSelectQuery *
165168
throw Exception(ErrorCodes::LOGICAL_ERROR, "ASTStageQuery is expected. return query is : {}", queryToString(rewrite_ast));
166169
auto * return_select_ast = stage_query->current_query->as<ASTSelectQuery>();
167170
if (!return_select_ast)
168-
throw Exception(ErrorCodes::LOGICAL_ERROR, "ASTSelectQuery is expected. return query is : {}", queryToString(stage_query->current_query));
171+
throw Exception(ErrorCodes::LOGICAL_ERROR, "ASTSelectQuery is expected. return query is :(id={}) {}", stage_query->current_query->getID(), queryToString(stage_query->current_query));
169172

170173

171174
ASTs upstream_queries;
172175
upstream_queries.insert(upstream_queries.end(), stage_query->upstream_queries.begin(), stage_query->upstream_queries.end());
173176
frame->upstream_queries = std::vector<ASTs>{upstream_queries};
177+
//frame->children_results.emplace_back(stage_query->current_query);
174178
frame->result_ast = stage_query->current_query;
175179
}
176180
else
@@ -207,6 +211,7 @@ void StageQueryDistributedAggregationRewriteAction::visit(const ASTSelectQuery *
207211

208212
if (frames.size() == 1)
209213
{
214+
LOG_TRACE(logger, "{} top select ast:{}", __LINE__, queryToString(*select_ast));
210215
frame->mergeChildrenUpstreamQueries();
211216
frame->result_ast = ASTStageQuery::make(frame->result_ast, frame->upstream_queries[0]);
212217
}
@@ -318,7 +323,7 @@ void StageQueryDistributedAggregationRewriteAction::visitSelectQueryWithGroupby(
318323
auto required_columns = collect_columns_visitor.visit().required_columns;
319324

320325
auto insert_query = createShuffleInsert(
321-
TableFunctionLocalShuffle::name,
326+
TableFunctionShuffleAggregation::name,
322327
rewrite_table_expr,
323328
ColumnWithDetailNameAndType::toNamesAndTypesList(required_columns[0]),
324329
select_ast->groupBy());

src/Interpreters/ASTRewriters/StageQueryDistributedJoinRewriteAction.cpp

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include <TableFunctions/TableFunctionShuffle.h>
2323
#include <Common/logger_useful.h>
2424
#include <Common/Exception.h>
25+
#include <Parsers/ASTFunction.h>
26+
#include <Storages/StorageDictionary.h>
2527
#include <Interpreters/JoinedTables.h>
2628

2729
namespace DB
@@ -213,10 +215,11 @@ void StageQueryDistributedJoinRewriteAction::visit(const ASTSelectQuery * select
213215

214216
void StageQueryDistributedJoinRewriteAction::visitSelectQueryWithAggregation(const ASTSelectQuery * select_ast)
215217
{
218+
LOG_TRACE(logger, "{} visitSelectQueryWithAggregation:{}", __LINE__, queryToString(*select_ast));
216219
auto frame = frames.getTopFrame();
217-
StageQueryDistributedAggregationRewriteAction distributed_join_action(context, id_generator);
218-
ASTDepthFirstVisitor<StageQueryDistributedAggregationRewriteAction> distributed_join_visitor(distributed_join_action, select_ast->clone());
219-
auto rewrite_ast = distributed_join_visitor.visit();
220+
StageQueryDistributedAggregationRewriteAction distributed_agg_action(context, id_generator);
221+
ASTDepthFirstVisitor<StageQueryDistributedAggregationRewriteAction> distributed_agg_visitor(distributed_agg_action, select_ast->clone());
222+
auto rewrite_ast = distributed_agg_visitor.visit();
220223
auto * stage_query = rewrite_ast->as<ASTStageQuery>();
221224
if (!stage_query)
222225
throw Exception(ErrorCodes::LOGICAL_ERROR, "ASTStageQuery is expected. return query is : {}", queryToString(rewrite_ast));
@@ -245,11 +248,6 @@ void StageQueryDistributedJoinRewriteAction::visitSelectQueryOnJoin(const ASTSel
245248
ASTBuildUtil::createTablesInSelectQueryElement(
246249
frame->children_results[1]->as<ASTTableExpression>(), result_ast->join()->table_join)
247250
->as<ASTTablesInSelectQueryElement>());
248-
if (frames.size() == 1)
249-
{
250-
frame->mergeChildrenUpstreamQueries();
251-
frame->result_ast = ASTStageQuery::make(frame->result_ast, frame->upstream_queries[0]);
252-
}
253251
}
254252
else
255253
{
@@ -371,18 +369,20 @@ bool StageQueryDistributedJoinRewriteAnalyzer::isApplicableJoinType()
371369
else
372370
return false;// using clause
373371

372+
#if 1
374373
// if it is a special storage, return false
375374
const auto * join_ast = from_query->join();
376375
const auto & table_to_join = join_ast->table_expression->as<ASTTableExpression &>();
377376
if (table_to_join.database_and_table_name)
378377
{
379378
auto joined_table_id = context->resolveStorageID(table_to_join.database_and_table_name);
380379
StoragePtr storage = DatabaseCatalog::instance().tryGetTable(joined_table_id, context);
381-
if (storage)
380+
if (std::dynamic_pointer_cast<StorageDictionary>(storage))
382381
{
383382
return false;
384383
}
385384
}
385+
#endif
386386

387387
return true;
388388
}
@@ -510,8 +510,20 @@ bool StageQueryDistributedJoinRewriteAnalyzer::collectHashKeysOnAnd(
510510
auto * func = ast->as<ASTFunction>();
511511
for (auto & arg : func->arguments->children)
512512
{
513-
if (!collectHashKeysOnEqual(arg, keys_list, alias_columns))
514-
return false;
513+
auto * arg_func = arg->as<ASTFunction>();
514+
if (arg_func)
515+
{
516+
if (arg_func->name == "equals")
517+
{
518+
if (!collectHashKeysOnEqual(arg, keys_list, alias_columns))
519+
return false;
520+
}
521+
else if (arg_func->name == "and")
522+
{
523+
if (!collectHashKeysOnAnd(arg, keys_list, alias_columns))
524+
return false;
525+
}
526+
}
515527
}
516528
return true;
517529
}

src/Interpreters/InterpreterStageQuery.cpp

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,14 @@ std::optional<std::list<std::pair<DistributedTask, String>>> InterpreterStageQue
233233
}
234234
return res;
235235
}
236-
236+
auto * union_select_query = insert_query->select->as<ASTSelectWithUnionQuery>();
237+
for (auto & child : union_select_query->list_of_selects->children)
238+
{
239+
auto * select_query = child->as<ASTSelectQuery>();
240+
if (select_query->limitBy() || select_query->limitByLength() || select_query->limitLength() || select_query->limitOffset()
241+
|| select_query->limitByOffset())
242+
return {};
243+
}
237244
auto storages = getSelectStorages(insert_query->select);
238245
bool has_groupby = ASTAnalyzeUtil::hasGroupByRecursively(from_query);
239246
bool has_agg = ASTAnalyzeUtil::hasAggregationColumnRecursively(from_query);
@@ -366,6 +373,20 @@ std::optional<std::list<std::pair<DistributedTask, String>>> InterpreterStageQue
366373
auto storages = getSelectStorages(from_query);
367374
bool has_groupby = ASTAnalyzeUtil::hasGroupByRecursively(from_query);
368375
bool has_agg = ASTAnalyzeUtil::hasAggregationColumnRecursively(from_query);
376+
377+
// if the query has order by or limit, run in single node
378+
auto * union_select_query = from_query->as<ASTSelectWithUnionQuery>();
379+
for (auto & child : union_select_query->list_of_selects->children)
380+
{
381+
auto * select_query = child->as<ASTSelectQuery>();
382+
if (select_query->orderBy() || select_query->limitBy() || select_query->limitByLength() || select_query->limitLength()
383+
|| select_query->limitOffset() || select_query->limitByOffset())
384+
{
385+
LOG_TRACE(logger, "query has order by or limit. [{}] {}", child->getID(), queryToString(child));
386+
return {};
387+
}
388+
}
389+
369390
if (storages.size() == 2)
370391
{
371392
for (const auto & storage : storages)

0 commit comments

Comments
 (0)