Skip to content

Commit 5e5e9d7

Browse files
committed
better
1 parent ae6c714 commit 5e5e9d7

File tree

10 files changed

+112
-63
lines changed

10 files changed

+112
-63
lines changed

docs/en/engines/table-engines/special/hybrid.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ The Hybrid engine is experimental. Enable it per session (or in the user profile
3030
SET allow_experimental_hybrid_table = 1;
3131
```
3232

33+
### Automatic Type Alignment
34+
35+
Hybrid segments can evolve independently, so the same logical column may use different physical types. When you set the experimental `hybrid_table_auto_cast_columns = 1` (requires `allow_experimental_analyzer = 1`), the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. This prevents header mismatches without having to edit each query.
36+
3337
## Engine definition
3438

3539
```sql

src/Interpreters/ClusterProxy/SelectStreamFactory.cpp

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@
1515
#include <Interpreters/Cluster.h>
1616
#include <Interpreters/DatabaseCatalog.h>
1717
#include <Interpreters/AddDefaultDatabaseVisitor.h>
18-
#include <Interpreters/addTypeConversionToAST.h>
1918
#include <Interpreters/RequiredSourceColumnsVisitor.h>
2019
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
2120
#include <DataTypes/ObjectUtils.h>
2221
#include <Client/IConnections.h>
2322
#include <Parsers/ASTSelectQuery.h>
2423
#include <Parsers/ASTSetQuery.h>
25-
#include <Parsers/ASTIdentifier.h>
2624
#include <Processors/QueryPlan/QueryPlan.h>
2725
#include <Processors/QueryPlan/ReadFromRemote.h>
2826
#include <Processors/QueryPlan/ExpressionStep.h>
@@ -62,46 +60,6 @@ namespace FailPoints
6260
namespace ClusterProxy
6361
{
6462

65-
namespace
66-
{
67-
void applyHybridCastsToAST(
68-
ASTPtr & node,
69-
const ColumnsDescription * metadata_columns,
70-
const NameSet * columns_to_cast)
71-
{
72-
if (!metadata_columns || !columns_to_cast || columns_to_cast->empty() || !node)
73-
return;
74-
75-
if (auto * func = node->as<ASTFunction>(); func && func->name == "_CAST")
76-
return;
77-
78-
if (auto * identifier = node->as<ASTIdentifier>())
79-
{
80-
String candidate = identifier->name();
81-
String short_name = candidate;
82-
83-
auto dot_pos = candidate.rfind('.');
84-
if (dot_pos != String::npos && dot_pos + 1 < candidate.size())
85-
short_name = candidate.substr(dot_pos + 1);
86-
87-
if (columns_to_cast->contains(short_name))
88-
{
89-
if (auto expected_column_opt = metadata_columns->tryGetPhysical(short_name))
90-
{
91-
auto cast_ast = addTypeConversionToAST(node->clone(), expected_column_opt->type->getName());
92-
const auto & alias = identifier->alias.empty() ? short_name : identifier->alias;
93-
cast_ast->setAlias(alias);
94-
node = cast_ast;
95-
return;
96-
}
97-
}
98-
}
99-
100-
for (auto & child : node->children)
101-
applyHybridCastsToAST(child, metadata_columns, columns_to_cast);
102-
}
103-
}
104-
10563
/// select query has database, table and table function names as AST pointers
10664
/// Creates a copy of query, changes database, table and table function names.
10765
ASTPtr rewriteSelectQuery(
@@ -110,9 +68,7 @@ ASTPtr rewriteSelectQuery(
11068
const std::string & remote_database,
11169
const std::string & remote_table,
11270
ASTPtr table_function_ptr,
113-
ASTPtr additional_filter,
114-
const NameSet * columns_to_cast,
115-
const ColumnsDescription * metadata_columns)
71+
ASTPtr additional_filter)
11672
{
11773
auto modified_query_ast = query->clone();
11874

@@ -166,9 +122,6 @@ ASTPtr rewriteSelectQuery(
166122

167123
RestoreQualifiedNamesVisitor(data).visit(modified_query_ast);
168124
}
169-
170-
if (columns_to_cast && !columns_to_cast->empty() && metadata_columns)
171-
applyHybridCastsToAST(modified_query_ast, metadata_columns, columns_to_cast);
172125
}
173126

174127
/// To make local JOIN works, default database should be added to table names.

src/Interpreters/ClusterProxy/SelectStreamFactory.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
#include <Analyzer/IQueryTreeNode.h>
44
#include <Client/ConnectionPool.h>
55
#include <Core/QueryProcessingStage.h>
6-
#include <Core/Names.h>
76
#include <Interpreters/Cluster.h>
87
#include <Interpreters/StorageID.h>
98
#include <Parsers/IAST_fwd.h>
109
#include <Storages/IStorage_fwd.h>
11-
#include <Storages/ColumnsDescription.h>
1210
#include <Storages/StorageSnapshot.h>
1311

1412
namespace DB
@@ -44,9 +42,7 @@ ASTPtr rewriteSelectQuery(
4442
const std::string & remote_database,
4543
const std::string & remote_table,
4644
ASTPtr table_function_ptr = nullptr,
47-
ASTPtr additional_filter = nullptr,
48-
const NameSet * columns_to_cast = nullptr,
49-
const ColumnsDescription * metadata_columns = nullptr);
45+
ASTPtr additional_filter = nullptr);
5046

5147
using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
5248
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;

src/Storages/StorageDistributed.cpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,6 +1229,9 @@ void StorageDistributed::read(
12291229
const auto * columns_to_cast_ptr = need_hybrid_casts ? &columns_to_cast_names : nullptr;
12301230
const auto * base_actual_columns_for_cast = !base_segment_columns.empty() ? &base_segment_columns : &metadata_columns;
12311231

1232+
if (need_hybrid_casts && !settings[Setting::allow_experimental_analyzer])
1233+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Setting 'hybrid_table_auto_cast_columns' is supported only with allow_experimental_analyzer=1");
1234+
12321235
auto describe_segment_target = [&](const HybridSegment & segment) -> String
12331236
{
12341237
if (segment.storage_id)
@@ -1319,9 +1322,7 @@ void StorageDistributed::read(
13191322
modified_query_info.query = ClusterProxy::rewriteSelectQuery(
13201323
local_context, modified_query_info.query,
13211324
remote_database, remote_table, remote_table_function_ptr,
1322-
base_segment_predicate,
1323-
columns_to_cast_ptr,
1324-
&metadata_columns);
1325+
base_segment_predicate);
13251326
log_rewritten_query(base_target, modified_query_info.query);
13261327

13271328
if (!segments.empty())
@@ -1336,18 +1337,14 @@ void StorageDistributed::read(
13361337
local_context, additional_query_info.query,
13371338
segment.storage_id->database_name, segment.storage_id->table_name,
13381339
nullptr,
1339-
segment.predicate_ast,
1340-
columns_to_cast_ptr,
1341-
&metadata_columns);
1340+
segment.predicate_ast);
13421341
}
13431342
else
13441343
{
13451344
additional_query_info.query = ClusterProxy::rewriteSelectQuery(
13461345
local_context, additional_query_info.query,
13471346
"", "", segment.table_function_ast,
1348-
segment.predicate_ast,
1349-
columns_to_cast_ptr,
1350-
&metadata_columns);
1347+
segment.predicate_ast);
13511348
}
13521349

13531350
log_rewritten_query(describe_segment_target(segment), additional_query_info.query);

tests/queries/0_stateless/03642_hybrid.reference renamed to tests/queries/0_stateless/03643_hybrid.reference

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
Hybrid creation requires allow_experimental_hybrid_table
12
Check Hybrid engine is registered
23
Hybrid
34
Ensure no leftovers before validation checks

tests/queries/0_stateless/03642_hybrid.sql renamed to tests/queries/0_stateless/03643_hybrid.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ DROP TABLE IF EXISTS test_tiered_watermark SYNC;
346346
DROP TABLE IF EXISTS test_tiered_watermark_after SYNC;
347347
DROP TABLE IF EXISTS test_tiered_watermark_before SYNC;
348348

349-
-- TODO:
349+
-- TODO: - addressed by 03644_hybrid_auto_cast.sql
350350
-- Code: 70. DB::Exception: Received from localhost:9000. DB::Exception: Conversion from AggregateFunction(sum, Decimal(38, 0)) to AggregateFunction(sum, UInt32) is not supported: while converting source column `sum(__table1.value)` to destination column `sum(__table1.value)`. (CANNOT_CONVERT_TYPE)
351351
-- SELECT sum(value) FROM test_tiered_watermark;
352352

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 (headers mismatch)
2+
hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 (headers mismatch)
3+
1
4+
hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 manual cast
5+
600
6+
1
7+
hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 manual cast
8+
600
9+
1
10+
hybrid_table_auto_cast_columns = 1, enable_analyzer = 1
11+
600
12+
1
13+
hybrid_table_auto_cast_columns = 1, enable_analyzer = 0 (analizer required)
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
SET allow_experimental_hybrid_table = 1,
2+
prefer_localhost_replica = 0;
3+
4+
DROP TABLE IF EXISTS test_tiered_watermark_after;
5+
DROP TABLE IF EXISTS test_tiered_watermark_before;
6+
DROP TABLE IF EXISTS test_tiered_watermark;
7+
8+
CREATE TABLE test_tiered_watermark_after
9+
(
10+
`id` UInt32,
11+
`name` String,
12+
`date` Date,
13+
`value` UInt64,
14+
`categories` Array(UInt32)
15+
)
16+
ENGINE = MergeTree()
17+
ORDER BY id;
18+
19+
CREATE TABLE test_tiered_watermark_before
20+
(
21+
`id` Int32,
22+
`name` Nullable(String),
23+
`date` Date,
24+
`value` Decimal128(0),
25+
`categories` Array(Int64)
26+
)
27+
ENGINE = MergeTree()
28+
ORDER BY id;
29+
30+
INSERT INTO test_tiered_watermark_after VALUES
31+
(11, 'Alice', '2025-08-15', 100, [100, 10]),
32+
(12, 'Bob', '2025-08-20', 200, [200, 20]),
33+
(13, 'Charlie', '2025-08-25', 300, [300, 30]),
34+
(14, 'David', '2025-09-05', 400, [400, 40]),
35+
(15, 'Eve', '2025-09-10', 500, [500, 50]),
36+
(16, 'Frank', '2025-09-15', 600, [600, 60]);
37+
38+
INSERT INTO test_tiered_watermark_before VALUES
39+
(21, 'Alice', '2025-08-15', 100, [100, 10]),
40+
(22, 'Bob', '2025-08-20', 200, [200, 20]),
41+
(23, 'Charlie', '2025-08-25', 300, [300, 30]),
42+
(24, 'David', '2025-09-05', 400, [400, 40]),
43+
(25, 'Eve', '2025-09-10', 500, [500, 50]),
44+
(26, 'Frank', '2025-09-15', 600, [600, 60]);
45+
46+
CREATE TABLE test_tiered_watermark
47+
ENGINE = Hybrid(
48+
remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_after'),
49+
date >= '2025-09-01',
50+
remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_before'),
51+
date < '2025-09-01'
52+
);
53+
54+
-- the problem
55+
SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 (headers mismatch)';
56+
SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 1;
57+
SELECT max(value) FROM test_tiered_watermark; -- { serverError CANNOT_CONVERT_TYPE }
58+
SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; -- { serverError THERE_IS_NO_COLUMN }
59+
60+
SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 (headers mismatch)';
61+
SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 0;
62+
SELECT max(value) FROM test_tiered_watermark; -- { serverError CANNOT_CONVERT_TYPE }
63+
SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; -- works w/o analyzer
64+
65+
-- workaround - explicit cast
66+
SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 manual cast';
67+
SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 1;
68+
SELECT max(value::UInt32) FROM test_tiered_watermark;
69+
SELECT sum(if(arrayExists(x -> (x IN (10)), categories::Array(UInt32)), 1, 0)) AS x FROM test_tiered_watermark;
70+
71+
SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 manual cast';
72+
SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 0;
73+
SELECT max(value::UInt32) FROM test_tiered_watermark;
74+
SELECT sum(if(arrayExists(x -> (x IN (10)), categories::Array(UInt32)), 1, 0)) AS x FROM test_tiered_watermark;
75+
76+
-- feature to add casts automatically
77+
SELECT 'hybrid_table_auto_cast_columns = 1, enable_analyzer = 1';
78+
SET hybrid_table_auto_cast_columns = 1, enable_analyzer = 1;
79+
SELECT max(value) FROM test_tiered_watermark;
80+
SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark;
81+
82+
SELECT 'hybrid_table_auto_cast_columns = 1, enable_analyzer = 0 (analizer required)';
83+
SET hybrid_table_auto_cast_columns = 1, enable_analyzer = 0;
84+
SELECT max(value) FROM test_tiered_watermark; -- { serverError NOT_IMPLEMENTED }
85+
SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; -- { serverError NOT_IMPLEMENTED }

tests/queries/0_stateless/04182_hybrid_unqualified_table.reference renamed to tests/queries/0_stateless/03644_hybrid_unqualified_table.reference

File renamed without changes.

tests/queries/0_stateless/04182_hybrid_unqualified_table.sql renamed to tests/queries/0_stateless/03644_hybrid_unqualified_table.sql

File renamed without changes.

0 commit comments

Comments
 (0)