Skip to content

Commit 807c012

Browse files
authored
Merge pull request #1156 from Altinity/mf_25.8_hybrid2
engine=Hybrid improvements
2 parents c0d280e + 8c0d982 commit 807c012

15 files changed

+693
-127
lines changed

docs/en/engines/table-engines/special/tiered-distributed.md renamed to docs/en/engines/table-engines/special/hybrid.md

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
2-
description: 'Hybrid unions multiple data sources behind per-layer predicates so queries behave like a single table while data is migrated or tiered.'
3-
slug: /engines/table-engines/special/tiered-distributed
2+
description: 'Hybrid unions multiple data sources behind per-segment predicates so queries behave like a single table while data is migrated or tiered.'
3+
slug: /engines/table-engines/special/hybrid
44
title: 'Hybrid Table Engine'
55
sidebar_label: 'Hybrid'
66
sidebar_position: 11
@@ -9,7 +9,7 @@ sidebar_position: 11
99
# Hybrid table engine
1010

1111
`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate.
12-
The engine rewrites incoming queries so that each layer receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
12+
The engine rewrites incoming queries so that each segment receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
1313
global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats.
1414

1515
It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source.
@@ -20,7 +20,21 @@ Typical use cases include:
2020
- Tiered storage, for example fresh data on a local cluster and historical data in S3.
2121
- Gradual roll-outs where only a subset of rows should be served from a new backend.
2222

23-
By giving mutually exclusive predicates to the layers (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.
23+
By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.
24+
25+
## Enable the engine
26+
27+
The Hybrid engine is experimental. Enable it per session (or in the user profile) before creating tables:
28+
29+
```sql
30+
SET allow_experimental_hybrid_table = 1;
31+
```
32+
33+
### Automatic Type Alignment
34+
35+
Hybrid segments can evolve independently, so the same logical column may use different physical types. With the experimental `hybrid_table_auto_cast_columns = 1` **(enabled by default and requires `allow_experimental_analyzer = 1`)**, the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. You can opt out by setting the flag to `0` if it causes issues.
36+
37+
Segment schemas are cached when you create or attach a Hybrid table. If you alter a segment later (for example change a column type), refresh the Hybrid table (detach/attach or recreate it) so the cached headers stay in sync with the new schema; otherwise the auto-cast feature may miss the change and queries can still fail with header/type errors.
2438

2539
## Engine definition
2640

@@ -39,14 +53,14 @@ You must pass at least two arguments – the first table function and its predic
3953
### Arguments and behaviour
4054

4155
- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage.
42-
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the layer's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
43-
- The query planner picks the same processing stage for every layer as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
56+
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the segment's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
57+
- The query planner picks the same processing stage for every segment as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
4458
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
45-
- Align schemas across the layers. ClickHouse builds a common header; if the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.
59+
- Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.
4660

4761
## Example: local cluster plus S3 historical tier
4862

49-
The following commands illustrate a two-layer layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.
63+
The following commands illustrate a two-segment layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.
5064

5165
```sql
5266
-- Local MergeTree table that keeps current data
@@ -79,7 +93,7 @@ CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid(
7993
s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01'
8094
) AS btc_blocks_local;
8195

82-
-- Writes target the first (remote) layer
96+
-- Writes target the first (remote) segment
8397
INSERT INTO btc_blocks
8498
SELECT *
8599
FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN)
@@ -103,4 +117,4 @@ GROUP BY date
103117
ORDER BY date;
104118
```
105119

106-
Because the predicates are applied inside every layer, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.
120+
Because the predicates are applied inside every segment, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
#include <Analyzer/Passes/HybridCastsPass.h>
2+
3+
#include <Analyzer/QueryTreeBuilder.h>
4+
#include <Analyzer/QueryTreePassManager.h>
5+
#include <Analyzer/Passes/QueryAnalysisPass.h>
6+
#include <Analyzer/Utils.h>
7+
#include <Analyzer/Resolve/IdentifierResolver.h>
8+
#include <Analyzer/QueryNode.h>
9+
#include <Analyzer/TableNode.h>
10+
#include <Analyzer/UnionNode.h>
11+
#include <Analyzer/FunctionNode.h>
12+
#include <Analyzer/ColumnNode.h>
13+
#include <Analyzer/InDepthQueryTreeVisitor.h>
14+
15+
#include <Storages/IStorage.h>
16+
#include <Storages/StorageDistributed.h>
17+
18+
#include <Core/Settings.h>
19+
#include <Core/SettingsEnums.h>
20+
#include <Common/Exception.h>
21+
22+
namespace DB
23+
{
24+
25+
namespace Setting
26+
{
27+
extern const SettingsBool hybrid_table_auto_cast_columns;
28+
}
29+
30+
namespace ErrorCodes
31+
{
32+
extern const int LOGICAL_ERROR;
33+
}
34+
35+
namespace
36+
{
37+
38+
/// Collect Hybrid table expressions that require casts to normalize headers across segments.
39+
///
40+
/// Hybrid is currently exposed only as an engine (TableNode). If it ever gets a table function
41+
/// wrapper, this visitor must also look at TableFunctionNode and unwrap to the underlying
42+
/// StorageDistributed so cached casts can be picked up there as well.
43+
class HybridCastTablesCollector : public InDepthQueryTreeVisitor<HybridCastTablesCollector>
44+
{
45+
public:
46+
explicit HybridCastTablesCollector(std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_)
47+
: cast_map(cast_map_)
48+
{}
49+
50+
static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr &) { return true; }
51+
52+
void visitImpl(QueryTreeNodePtr & node)
53+
{
54+
const auto * table = node->as<TableNode>();
55+
if (!table)
56+
return;
57+
58+
const auto * storage = table->getStorage().get();
59+
if (const auto * distributed = typeid_cast<const StorageDistributed *>(storage))
60+
{
61+
ColumnsDescription to_cast = distributed->getColumnsToCast();
62+
if (!to_cast.empty())
63+
cast_map.emplace(node.get(), std::move(to_cast)); // repeated table_expression can overwrite
64+
}
65+
}
66+
67+
private:
68+
std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
69+
};
70+
71+
// Visitor replaces all usages of the column with CAST(column, type) in the query tree.
72+
class HybridCastVisitor : public InDepthQueryTreeVisitor<HybridCastVisitor>
73+
{
74+
public:
75+
HybridCastVisitor(
76+
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_,
77+
ContextPtr context_)
78+
: cast_map(cast_map_)
79+
, context(std::move(context_))
80+
{}
81+
82+
bool shouldTraverseTopToBottom() const { return false; }
83+
84+
static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & child)
85+
{
86+
/// Traverse all child nodes so casts also apply inside subqueries and UNION branches.
87+
(void)child;
88+
return true;
89+
}
90+
91+
void visitImpl(QueryTreeNodePtr & node)
92+
{
93+
auto * column_node = node->as<ColumnNode>();
94+
if (!column_node)
95+
return;
96+
97+
auto column_source = column_node->getColumnSourceOrNull();
98+
if (!column_source)
99+
return;
100+
101+
auto it = cast_map.find(column_source.get());
102+
if (it == cast_map.end())
103+
return;
104+
105+
const auto & column_name = column_node->getColumnName();
106+
auto expected_column_opt = it->second.tryGetPhysical(column_name);
107+
if (!expected_column_opt)
108+
return;
109+
110+
auto column_clone = std::static_pointer_cast<ColumnNode>(column_node->clone());
111+
112+
auto cast_node = buildCastFunction(column_clone, expected_column_opt->type, context);
113+
const auto & alias = node->getAlias();
114+
if (!alias.empty())
115+
cast_node->setAlias(alias);
116+
else
117+
cast_node->setAlias(expected_column_opt->name);
118+
119+
node = cast_node;
120+
}
121+
122+
private:
123+
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
124+
ContextPtr context;
125+
};
126+
127+
128+
} // namespace
129+
130+
void HybridCastsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
131+
{
132+
const auto & settings = context->getSettingsRef();
133+
if (!settings[Setting::hybrid_table_auto_cast_columns])
134+
return;
135+
136+
auto * query = query_tree_node->as<QueryNode>();
137+
if (!query)
138+
return;
139+
140+
std::unordered_map<const IQueryTreeNode *, ColumnsDescription> cast_map;
141+
HybridCastTablesCollector collector(cast_map);
142+
collector.visit(query_tree_node);
143+
if (cast_map.empty())
144+
return;
145+
146+
HybridCastVisitor visitor(cast_map, context);
147+
visitor.visit(query_tree_node);
148+
}
149+
150+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#pragma once
2+
3+
#include <Analyzer/IQueryTreePass.h>
4+
#include <Interpreters/Context_fwd.h>
5+
6+
namespace DB
7+
{
8+
9+
/// Adds CASTs for Hybrid segments when physical types differ from the Hybrid schema
10+
///
11+
/// It normalizes headers coming from different segments when table structure in some segments
12+
/// differs from the Hybrid table definition. For example column X is UInt32 in the Hybrid table,
13+
/// but Int64 in an additional segment.
14+
///
15+
/// Without these casts ConvertingActions may fail to reconcile mismatched headers when casts are impossible
16+
/// (e.g. AggregateFunction states carry hashed data tied to their argument type and cannot be recast), for example:
17+
/// "Conversion from AggregateFunction(uniq, Decimal(38, 0)) to AggregateFunction(uniq, UInt64) is not supported"
18+
/// (CANNOT_CONVERT_TYPE).
19+
///
20+
/// Per-segment casts are not reliable because WithMergeState strips aliases, so merged pipelines
21+
/// from different segments would return different headers (with or without CAST), leading to errors
22+
/// like "Cannot find column `max(value)` in source stream, there are only columns: [max(_CAST(value, 'UInt64'))]"
23+
/// (THERE_IS_NO_COLUMN).
24+
class HybridCastsPass : public IQueryTreePass
25+
{
26+
public:
27+
String getName() override { return "HybridCastsPass"; }
28+
String getDescription() override { return "Inject casts for Hybrid columns to match schema types"; }
29+
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
30+
};
31+
32+
}

src/Analyzer/QueryTreePassManager.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include <Analyzer/Passes/SumIfToCountIfPass.h>
4949
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
5050
#include <Analyzer/Passes/UniqToCountPass.h>
51+
#include <Analyzer/Passes/HybridCastsPass.h>
5152
#include <Analyzer/Utils.h>
5253

5354
namespace DB
@@ -309,6 +310,8 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
309310
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());
310311

311312
manager.addPass(std::make_unique<OptimizeDateOrDateTimeConverterWithPreimagePass>());
313+
314+
manager.addPass(std::make_unique<HybridCastsPass>());
312315
}
313316

314317
}

src/Core/Settings.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6926,6 +6926,12 @@ Allows creation of tables with the [TimeSeries](../../engines/table-engines/inte
69266926
- 0 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is disabled.
69276927
- 1 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is enabled.
69286928
)", EXPERIMENTAL) \
6929+
DECLARE(Bool, allow_experimental_hybrid_table, false, R"(
6930+
Allows creation of tables with the [Hybrid](../../engines/table-engines/special/hybrid.md) table engine.
6931+
)", EXPERIMENTAL) \
6932+
DECLARE(Bool, hybrid_table_auto_cast_columns, true, R"(
6933+
Automatically cast columns to the schema defined in Hybrid tables when remote segments expose different physical types. Works only with analyzer. Enabled by default, does nothing if (experimental) Hybrid tables are disabled; disable it if it causes issues. Segment schemas are cached when the Hybrid table is created or attached; if a segment schema changes later, detach/attach or recreate the Hybrid table so the cached headers stay in sync.
6934+
)", 0) \
69296935
DECLARE(Bool, allow_experimental_codecs, false, R"(
69306936
If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).
69316937
)", EXPERIMENTAL) \

src/Core/SettingsChangesHistory.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5353
{"export_merge_tree_partition_max_retries", 3, 3, "New setting."},
5454
{"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."},
5555
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
56-
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
56+
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
57+
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
58+
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}
5759
});
5860
addSettingsChanges(settings_changes_history, "25.8",
5961
{

src/Databases/enableAllExperimentalSettings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context)
6161
context->setSetting("allow_experimental_ytsaurus_table_engine", 1);
6262
context->setSetting("allow_experimental_ytsaurus_dictionary_source", 1);
6363
context->setSetting("allow_experimental_time_series_aggregate_functions", 1);
64+
context->setSetting("allow_experimental_hybrid_table", 1);
6465
context->setSetting("allow_experimental_lightweight_update", 1);
6566
context->setSetting("allow_experimental_insert_into_iceberg", 1);
6667
context->setSetting("allow_experimental_iceberg_compaction", 1);

0 commit comments

Comments
 (0)