Skip to content

Commit eda9305

Browse files
committed
moving the rewrite to a analyzer pass, seem to work much better
1 parent 113cfe6 commit eda9305

File tree

7 files changed

+251
-201
lines changed

7 files changed

+251
-201
lines changed

docs/en/engines/table-engines/special/hybrid.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ You must pass at least two arguments – the first table function and its predic
5656
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the segment's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
5757
- The query planner picks the same processing stage for every segment as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
5858
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
59-
- Align schemas across the segments. ClickHouse builds a common header; if the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.
59+
- Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.
6060

6161
## Example: local cluster plus S3 historical tier
6262

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
#include <Analyzer/Passes/HybridCastsPass.h>
2+
3+
#include <Analyzer/QueryTreeBuilder.h>
4+
#include <Analyzer/QueryTreePassManager.h>
5+
#include <Analyzer/Passes/QueryAnalysisPass.h>
6+
#include <Analyzer/Utils.h>
7+
#include <Analyzer/Resolve/IdentifierResolver.h>
8+
#include <Analyzer/QueryNode.h>
9+
#include <Analyzer/TableNode.h>
10+
#include <Analyzer/FunctionNode.h>
11+
#include <Analyzer/ColumnNode.h>
12+
#include <Analyzer/InDepthQueryTreeVisitor.h>
13+
14+
#include <Storages/IStorage.h>
15+
#include <Storages/StorageDistributed.h>
16+
17+
#include <Core/Settings.h>
18+
#include <Core/SettingsEnums.h>
19+
#include <Core/Settings.h>
20+
#include <Common/Exception.h>
21+
22+
namespace DB
23+
{
24+
25+
namespace Setting
26+
{
27+
extern const SettingsBool hybrid_table_auto_cast_columns;
28+
}
29+
30+
namespace ErrorCodes
31+
{
32+
extern const int LOGICAL_ERROR;
33+
}
34+
35+
namespace
36+
{
37+
38+
struct HybridCastTask
39+
{
40+
QueryTreeNodePtr table_expression;
41+
ColumnsDescription cast_schema;
42+
};
43+
44+
// Visitor replaces all usages of the column with CAST(column, type) in the query tree.
45+
//
46+
// It normalizes headers coming from different segments when table structure in some segments
47+
// differs from the Hybrid table definition. For example column X is UInt32 in the Hybrid table,
48+
// but Int64 in an additional segment.
49+
//
50+
// Without these casts ConvertingActions may fail to reconcile mismatched headers when casts are impossible
51+
// (e.g. AggregateFunction states carry hashed data tied to their argument type and cannot be recast), for example:
52+
// "Conversion from AggregateFunction(uniq, Decimal(38, 0)) to AggregateFunction(uniq, UInt64) is not supported"
53+
// (CANNOT_CONVERT_TYPE).
54+
//
55+
// Per-segment casts are not reliable because WithMergeState strips aliases, so merged pipelines
56+
// from different segments would return different headers (with or without CAST), leading to errors
57+
// like "Cannot find column `max(value)` in source stream, there are only columns: [max(_CAST(value, 'UInt64'))]"
58+
// (THERE_IS_NO_COLUMN).
59+
class HybridCastVisitor : public InDepthQueryTreeVisitor<HybridCastVisitor>
60+
{
61+
public:
62+
HybridCastVisitor(
63+
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_,
64+
ContextPtr context_)
65+
: cast_map(cast_map_)
66+
, context(std::move(context_))
67+
{}
68+
69+
bool shouldTraverseTopToBottom() const { return false; }
70+
71+
static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & child)
72+
{
73+
auto child_type = child->getNodeType();
74+
return !(child_type == QueryTreeNodeType::QUERY || child_type == QueryTreeNodeType::UNION);
75+
}
76+
77+
void visitImpl(QueryTreeNodePtr & node)
78+
{
79+
auto * column_node = node->as<ColumnNode>();
80+
if (!column_node)
81+
return;
82+
83+
auto column_source = column_node->getColumnSourceOrNull();
84+
if (!column_source)
85+
return;
86+
87+
auto it = cast_map.find(column_source.get());
88+
if (it == cast_map.end())
89+
return;
90+
91+
const auto & column_name = column_node->getColumnName();
92+
auto expected_column_opt = it->second.tryGetPhysical(column_name);
93+
if (!expected_column_opt)
94+
return;
95+
96+
auto column_clone = std::static_pointer_cast<ColumnNode>(column_node->clone());
97+
column_clone->setColumnType(expected_column_opt->type);
98+
99+
auto cast_node = buildCastFunction(column_clone, expected_column_opt->type, context);
100+
const auto & alias = node->getAlias();
101+
if (!alias.empty())
102+
cast_node->setAlias(alias);
103+
else
104+
cast_node->setAlias(expected_column_opt->name);
105+
106+
node = cast_node;
107+
}
108+
109+
private:
110+
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
111+
ContextPtr context;
112+
};
113+
114+
115+
} // namespace
116+
117+
void collectHybridTables(const QueryTreeNodePtr & join_tree, std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map)
118+
{
119+
if (!join_tree)
120+
return;
121+
if (const auto * table = join_tree->as<TableNode>())
122+
{
123+
const auto * storage = table->getStorage().get();
124+
if (const auto * distributed = typeid_cast<const StorageDistributed *>(storage))
125+
{
126+
ColumnsDescription to_cast = distributed->getColumnsToCast();
127+
if (!to_cast.empty())
128+
cast_map.emplace(join_tree.get(), std::move(to_cast));
129+
}
130+
return;
131+
}
132+
if (const auto * func = join_tree->as<FunctionNode>())
133+
{
134+
for (auto & child : func->getArguments().getNodes())
135+
collectHybridTables(child, cast_map);
136+
return;
137+
}
138+
if (const auto * query = join_tree->as<QueryNode>())
139+
{
140+
collectHybridTables(query->getJoinTree(), cast_map);
141+
}
142+
if (const auto * union_node = join_tree->as<UnionNode>())
143+
{
144+
for (auto & child : union_node->getQueries().getNodes())
145+
collectHybridTables(child, cast_map);
146+
}
147+
}
148+
149+
void HybridCastsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
150+
{
151+
const auto & settings = context->getSettingsRef();
152+
if (!settings[Setting::hybrid_table_auto_cast_columns])
153+
return;
154+
155+
auto * query = query_tree_node->as<QueryNode>();
156+
if (!query)
157+
return;
158+
159+
std::unordered_map<const IQueryTreeNode *, ColumnsDescription> cast_map;
160+
collectHybridTables(query->getJoinTree(), cast_map);
161+
if (cast_map.empty())
162+
return;
163+
164+
HybridCastVisitor visitor(cast_map, context);
165+
visitor.visit(query_tree_node);
166+
}
167+
168+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#pragma once
2+
3+
#include <Analyzer/IQueryTreePass.h>
4+
#include <Interpreters/Context_fwd.h>
5+
6+
namespace DB
7+
{
8+
9+
/// Adds CASTs for Hybrid segments when physical types differ from the Hybrid schema
10+
/// and reorders the SELECT list to match the schema order (needed because planner
11+
/// later aligns remote headers by position).
12+
class HybridCastsPass : public IQueryTreePass
13+
{
14+
public:
15+
String getName() override { return "HybridCastsPass"; }
16+
String getDescription() override { return "Inject casts for Hybrid columns to match schema types"; }
17+
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
18+
};
19+
20+
}

src/Analyzer/QueryTreePassManager.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include <Analyzer/Passes/SumIfToCountIfPass.h>
4949
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
5050
#include <Analyzer/Passes/UniqToCountPass.h>
51+
#include <Analyzer/Passes/HybridCastsPass.h>
5152
#include <Analyzer/Utils.h>
5253

5354
namespace DB
@@ -266,6 +267,8 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
266267
manager.addPass(std::make_unique<ConvertLogicalExpressionToCNFPass>());
267268
manager.addPass(std::make_unique<RegexpFunctionRewritePass>());
268269

270+
manager.addPass(std::make_unique<HybridCastsPass>());
271+
269272
manager.addPass(std::make_unique<RewriteSumFunctionWithSumAndCountPass>());
270273
manager.addPass(std::make_unique<CountDistinctPass>());
271274
manager.addPass(std::make_unique<UniqToCountPass>());

0 commit comments

Comments
 (0)