Skip to content

Commit 2186269

Browse files
committed
finishing
1 parent eda9305 commit 2186269

File tree

9 files changed

+106
-54
lines changed

9 files changed

+106
-54
lines changed

docs/en/engines/table-engines/special/hybrid.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ SET allow_experimental_hybrid_table = 1;
3232

3333
### Automatic Type Alignment
3434

35-
Hybrid segments can evolve independently, so the same logical column may use different physical types. With the experimental `hybrid_table_auto_cast_columns = 1` (default, requires `allow_experimental_analyzer = 1`), the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. This prevents header mismatches without having to edit each query, and you can opt out by setting the flag to `0` if it causes issues in your setup.
35+
Hybrid segments can evolve independently, so the same logical column may use different physical types. With the experimental `hybrid_table_auto_cast_columns = 1` **(enabled by default and requires `allow_experimental_analyzer = 1`)**, the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. You can opt out by setting the flag to `0` if it causes issues.
3636

3737
Segment schemas are cached when you create or attach a Hybrid table. If you alter a segment later (for example change a column type), refresh the Hybrid table (detach/attach or recreate it) so the cached headers stay in sync with the new schema; otherwise the auto-cast feature may miss the change and queries can still fail with header/type errors.
3838

src/Analyzer/Passes/HybridCastsPass.cpp

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
#include <Core/Settings.h>
1818
#include <Core/SettingsEnums.h>
19-
#include <Core/Settings.h>
2019
#include <Common/Exception.h>
2120

2221
namespace DB
@@ -42,20 +41,6 @@ struct HybridCastTask
4241
};
4342

4443
// Visitor replaces all usages of the column with CAST(column, type) in the query tree.
45-
//
46-
// It normalizes headers coming from different segments when table structure in some segments
47-
// differs from the Hybrid table definition. For example column X is UInt32 in the Hybrid table,
48-
// but Int64 in an additional segment.
49-
//
50-
// Without these casts ConvertingActions may fail to reconcile mismatched headers when casts are impossible
51-
// (e.g. AggregateFunction states carry hashed data tied to their argument type and cannot be recast), for example:
52-
// "Conversion from AggregateFunction(uniq, Decimal(38, 0)) to AggregateFunction(uniq, UInt64) is not supported"
53-
// (CANNOT_CONVERT_TYPE).
54-
//
55-
// Per-segment casts are not reliable because WithMergeState strips aliases, so merged pipelines
56-
// from different segments would return different headers (with or without CAST), leading to errors
57-
// like "Cannot find column `max(value)` in source stream, there are only columns: [max(_CAST(value, 'UInt64'))]"
58-
// (THERE_IS_NO_COLUMN).
5944
class HybridCastVisitor : public InDepthQueryTreeVisitor<HybridCastVisitor>
6045
{
6146
public:
@@ -125,7 +110,7 @@ void collectHybridTables(const QueryTreeNodePtr & join_tree, std::unordered_map<
125110
{
126111
ColumnsDescription to_cast = distributed->getColumnsToCast();
127112
if (!to_cast.empty())
128-
cast_map.emplace(join_tree.get(), std::move(to_cast));
113+
cast_map.emplace(join_tree.get(), std::move(to_cast)); // repeated table_expression can overwrite
129114
}
130115
return;
131116
}

src/Analyzer/Passes/HybridCastsPass.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,20 @@ namespace DB
77
{
88

99
/// Adds CASTs for Hybrid segments when physical types differ from the Hybrid schema
10-
/// and reorders the SELECT list to match the schema order (needed because planner
11-
/// later aligns remote headers by position).
10+
///
11+
/// It normalizes headers coming from different segments when table structure in some segments
12+
/// differs from the Hybrid table definition. For example column X is UInt32 in the Hybrid table,
13+
/// but Int64 in an additional segment.
14+
///
15+
/// Without these casts ConvertingActions may fail to reconcile mismatched headers when casts are impossible
16+
/// (e.g. AggregateFunction states carry hashed data tied to their argument type and cannot be recast), for example:
17+
/// "Conversion from AggregateFunction(uniq, Decimal(38, 0)) to AggregateFunction(uniq, UInt64) is not supported"
18+
/// (CANNOT_CONVERT_TYPE).
19+
///
20+
/// Per-segment casts are not reliable because WithMergeState strips aliases, so merged pipelines
21+
/// from different segments would return different headers (with or without CAST), leading to errors
22+
/// like "Cannot find column `max(value)` in source stream, there are only columns: [max(_CAST(value, 'UInt64'))]"
23+
/// (THERE_IS_NO_COLUMN).
1224
class HybridCastsPass : public IQueryTreePass
1325
{
1426
public:

src/Analyzer/QueryTreePassManager.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,6 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
267267
manager.addPass(std::make_unique<ConvertLogicalExpressionToCNFPass>());
268268
manager.addPass(std::make_unique<RegexpFunctionRewritePass>());
269269

270-
manager.addPass(std::make_unique<HybridCastsPass>());
271-
272270
manager.addPass(std::make_unique<RewriteSumFunctionWithSumAndCountPass>());
273271
manager.addPass(std::make_unique<CountDistinctPass>());
274272
manager.addPass(std::make_unique<UniqToCountPass>());
@@ -312,6 +310,8 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
312310
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());
313311

314312
manager.addPass(std::make_unique<OptimizeDateOrDateTimeConverterWithPreimagePass>());
313+
314+
manager.addPass(std::make_unique<HybridCastsPass>());
315315
}
316316

317317
}

src/Storages/StorageDistributed.cpp

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -950,7 +950,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
950950
const StorageSnapshotPtr & distributed_storage_snapshot,
951951
const StorageID & remote_storage_id,
952952
const ASTPtr & remote_table_function,
953-
const ASTPtr & additional_filter)
953+
const ASTPtr & additional_filter = nullptr)
954954
{
955955
auto & planner_context = query_info.planner_context;
956956
const auto & query_context = planner_context->getQueryContext();
@@ -1091,6 +1091,17 @@ void StorageDistributed::read(
10911091
return String{"<unknown_segment>"};
10921092
};
10931093

1094+
auto describe_base_target = [&]() -> String
1095+
{
1096+
if (remote_table_function_ptr)
1097+
return remote_table_function_ptr->formatForLogging();
1098+
if (!remote_database.empty())
1099+
return remote_database + "." + remote_table;
1100+
return remote_table;
1101+
};
1102+
1103+
String base_target = describe_base_target();
1104+
10941105
const bool log_hybrid_query_rewrites = (!segments.empty() || base_segment_predicate);
10951106

10961107
auto log_rewritten_query = [&](const String & target, const ASTPtr & ast)
@@ -1101,14 +1112,6 @@ void StorageDistributed::read(
11011112
LOG_TRACE(log, "rewriteSelectQuery (target: {}) -> {}", target, ast->formatForLogging());
11021113
};
11031114

1104-
String base_target;
1105-
if (remote_table_function_ptr)
1106-
base_target = remote_table_function_ptr->formatForLogging();
1107-
else if (!remote_database.empty())
1108-
base_target = remote_database + "." + remote_table;
1109-
else
1110-
base_target = remote_table;
1111-
11121115
if (settings[Setting::allow_experimental_analyzer])
11131116
{
11141117
StorageID remote_storage_id = StorageID::createEmpty();
@@ -2231,26 +2234,31 @@ void StorageDistributed::setHybridLayout(std::vector<HybridSegment> segments_)
22312234
segments = std::move(segments_);
22322235
log = getLogger("Hybrid (" + getStorageID().table_name + ")");
22332236

2234-
cached_columns_to_cast = getColumnsToCast();
2235-
if (!cached_columns_to_cast.empty() && log->is(Poco::Message::PRIO_DEBUG))
2236-
{
2237-
std::vector<String> cols;
2238-
for (const auto & col : cached_columns_to_cast.getAllPhysical())
2239-
cols.push_back(col.name + " -> " + col.type->getName());
2240-
LOG_DEBUG(log, "Hybrid auto-cast will apply to: {}", fmt::join(cols, ", "));
2241-
}
2242-
22432237
auto virtuals = createVirtuals();
22442238
// or _segment_index?
22452239
virtuals.addEphemeral("_table_index", std::make_shared<DataTypeUInt32>(), "Index of the table function in Hybrid (0 for main table, 1+ for additional segments)");
22462240
setVirtuals(virtuals);
22472241
}
22482242

2243+
void StorageDistributed::setCachedColumnsToCast(ColumnsDescription columns)
2244+
{
2245+
cached_columns_to_cast = std::move(columns);
2246+
if (!cached_columns_to_cast.empty() && log)
2247+
{
2248+
Names columns_with_types;
2249+
columns_with_types.reserve(cached_columns_to_cast.getAllPhysical().size());
2250+
for (const auto & col : cached_columns_to_cast.getAllPhysical())
2251+
columns_with_types.emplace_back(col.name + " " + col.type->getName());
2252+
LOG_DEBUG(log, "Hybrid auto-cast will apply to: [{}]", fmt::join(columns_with_types, ", "));
2253+
}
2254+
}
2255+
22492256
ColumnsDescription StorageDistributed::getColumnsToCast() const
22502257
{
22512258
return cached_columns_to_cast;
22522259
}
22532260

2261+
22542262
void registerStorageDistributed(StorageFactory & factory)
22552263
{
22562264
factory.registerStorage("Distributed", [](const StorageFactory::Arguments & args)
@@ -2421,7 +2429,7 @@ void registerStorageHybrid(StorageFactory & factory)
24212429
{
24222430
throw Exception(
24232431
ErrorCodes::BAD_ARGUMENTS,
2424-
"Hybrid segment '{}' is missing column '{}' required by Hybrid schema",
2432+
"Hybrid segment {} is missing column '{}' required by Hybrid schema",
24252433
segment_name, column.name);
24262434
}
24272435

src/Storages/StorageDistributed.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ class StorageDistributed final : public IStorage, WithContext
5858
ASTPtr table_function_ast;
5959
ASTPtr predicate_ast;
6060
std::optional<StorageID> storage_id; // For table identifiers instead of table functions
61-
ColumnsDescription actual_columns;
6261

6362
HybridSegment(ASTPtr table_function_ast_, ASTPtr predicate_ast_)
6463
: table_function_ast(std::move(table_function_ast_))

tests/queries/0_stateless/03643_hybrid.reference

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,28 +174,52 @@ Read predicate-filtered data with analyzer enabled and prefer localhost replica
174174
21 Alice 2025-08-15 100
175175
22 Bob 2025-08-20 200
176176
23 Charlie 2025-08-25 300
177-
Check if the subqueries were recorded in query_log
177+
Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 0)
178178
Row 1:
179179
──────
180180
type: QueryFinish
181181
is_initial_query2: 1
182182
tbl: ['_table_function.remote','db.test_tiered_watermark']
183-
qry: SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark', max_threads=1 FORMAT Null;
184-
log_comment: test_tiered_watermark
183+
qry: SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 0, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark1', max_threads=1 FORMAT Null;
184+
log_comment: test_tiered_watermark1
185185

186186
Row 2:
187187
──────
188188
type: QueryFinish
189189
is_initial_query2: 0
190190
tbl: ['db.test_tiered_watermark_after']
191191
qry: SELECT `__table1`.`id` AS `id`, `__table1`.`name` AS `name`, `__table1`.`date` AS `date`, `__table1`.`value` AS `value` FROM `db`.`test_tiered_watermark_after` AS `__table1` WHERE `__table1`.`date` >= '2025-09-01' ORDER BY `__table1`.`id` DESC
192-
log_comment: test_tiered_watermark
192+
log_comment: test_tiered_watermark1
193193

194194
Row 3:
195195
──────
196196
type: QueryFinish
197197
is_initial_query2: 0
198198
tbl: ['db.test_tiered_watermark_before']
199199
qry: SELECT `__table1`.`id` AS `id`, `__table1`.`name` AS `name`, `__table1`.`date` AS `date`, `__table1`.`value` AS `value` FROM `db`.`test_tiered_watermark_before` AS `__table1` WHERE `__table1`.`date` < '2025-09-01' ORDER BY `__table1`.`id` DESC
200-
log_comment: test_tiered_watermark
200+
log_comment: test_tiered_watermark1
201+
Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 1)
202+
Row 1:
203+
──────
204+
type: QueryFinish
205+
is_initial_query2: 1
206+
tbl: ['_table_function.remote','db.test_tiered_watermark']
207+
qry: SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark2', max_threads=1 FORMAT Null;
208+
log_comment: test_tiered_watermark2
209+
210+
Row 2:
211+
──────
212+
type: QueryFinish
213+
is_initial_query2: 0
214+
tbl: ['db.test_tiered_watermark_after']
215+
qry: SELECT _CAST(`__table1`.`id`, 'UInt32') AS `id`, _CAST(`__table1`.`name`, 'String') AS `name`, `__table1`.`date` AS `date`, _CAST(`__table1`.`value`, 'UInt32') AS `value` FROM `db`.`test_tiered_watermark_after` AS `__table1` WHERE `__table1`.`date` >= '2025-09-01' ORDER BY `id` DESC
216+
log_comment: test_tiered_watermark2
217+
218+
Row 3:
219+
──────
220+
type: QueryFinish
221+
is_initial_query2: 0
222+
tbl: ['db.test_tiered_watermark_before']
223+
qry: SELECT _CAST(`__table1`.`id`, 'UInt32') AS `id`, _CAST(`__table1`.`name`, 'String') AS `name`, `__table1`.`date` AS `date`, _CAST(`__table1`.`value`, 'UInt32') AS `value` FROM `db`.`test_tiered_watermark_before` AS `__table1` WHERE `__table1`.`date` < '2025-09-01' ORDER BY _CAST(`__table1`.`id`, 'UInt32') DESC
224+
log_comment: test_tiered_watermark2
201225
Clean up predicate filtering tables

tests/queries/0_stateless/03643_hybrid.sql

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -303,19 +303,19 @@ SELECT count() FROM test_tiered_watermark_before WHERE id = 17;
303303

304304

305305
SELECT 'Read predicate-filtered data with analyzer disabled and no localhost preference';
306-
SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 0, prefer_localhost_replica = 0, hybrid_table_auto_cast_columns = 0;
306+
SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 0, prefer_localhost_replica = 0;
307307
SELECT 'Read predicate-filtered data with analyzer enabled and no localhost preference';
308-
SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0, hybrid_table_auto_cast_columns = 0;
308+
SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0;
309309
SELECT 'Read predicate-filtered data with analyzer disabled and prefer localhost replica';
310310
SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 0, prefer_localhost_replica = 1;
311311
SELECT 'Read predicate-filtered data with analyzer enabled and prefer localhost replica';
312-
SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 1, prefer_localhost_replica = 1, hybrid_table_auto_cast_columns = 0;
312+
SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 1, prefer_localhost_replica = 1;
313313

314314
-- other combinations of settings work, but give a bit different content in the query_log
315315
-- See the problem around is_initial_query described in https://github.com/Altinity/ClickHouse/issues/1077
316-
SELECT 'Check if the subqueries were recorded in query_log';
316+
SELECT 'Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 0)';
317317

318-
SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0, hybrid_table_auto_cast_columns = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark', max_threads=1 FORMAT Null;
318+
SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 0, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark1', max_threads=1 FORMAT Null;
319319
SYSTEM FLUSH LOGS;
320320
SELECT
321321
type,
@@ -331,7 +331,31 @@ WHERE
331331
FROM system.query_log
332332
WHERE
333333
event_time > now() - 300
334-
and log_comment = 'test_tiered_watermark'
334+
and log_comment = 'test_tiered_watermark1'
335+
and current_database = currentDatabase()
336+
and query_id = initial_query_id )
337+
ORDER BY tbl, event_time_microseconds
338+
FORMAT Vertical;
339+
340+
SELECT 'Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 1)';
341+
342+
SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark2', max_threads=1 FORMAT Null;
343+
SYSTEM FLUSH LOGS;
344+
SELECT
345+
type,
346+
query_id = initial_query_id AS is_initial_query2,
347+
arraySort(arrayMap(x -> replaceAll(x, currentDatabase(), 'db'), tables)) as tbl,
348+
replaceAll(query, currentDatabase(), 'db') as qry,
349+
log_comment
350+
FROM system.query_log
351+
WHERE
352+
event_time > now() - 300 AND type = 'QueryFinish' AND
353+
initial_query_id IN (
354+
SELECT initial_query_id
355+
FROM system.query_log
356+
WHERE
357+
event_time > now() - 300
358+
and log_comment = 'test_tiered_watermark2'
335359
and current_database = currentDatabase()
336360
and query_id = initial_query_id )
337361
ORDER BY tbl, event_time_microseconds

tests/queries/0_stateless/03644_hybrid_auto_cast.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,5 @@ SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_t
8181

8282
SELECT 'hybrid_table_auto_cast_columns = 1, enable_analyzer = 0 (analizer required)';
8383
SET hybrid_table_auto_cast_columns = 1, enable_analyzer = 0;
84-
SELECT max(value) FROM test_tiered_watermark; -- { serverError NOT_IMPLEMENTED }
85-
SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; -- { serverError NOT_IMPLEMENTED }
84+
SELECT max(value) FROM test_tiered_watermark; -- { serverError CANNOT_CONVERT_TYPE }
85+

0 commit comments

Comments
 (0)