Skip to content

Commit cf4d137

Browse files
authored
Merge pull request #1071 from Altinity/mf-25.8-tiered-distributed
engine=Hybrid
2 parents f3bd121 + b28b8ed commit cf4d137

File tree

15 files changed

+1185
-42
lines changed

15 files changed

+1185
-42
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
---
2+
description: 'Hybrid unions multiple data sources behind per-layer predicates so queries behave like a single table while data is migrated or tiered.'
3+
slug: /engines/table-engines/special/tiered-distributed
4+
title: 'Hybrid Table Engine'
5+
sidebar_label: 'Hybrid'
6+
sidebar_position: 11
7+
---
8+
9+
# Hybrid table engine
10+
11+
`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate.
12+
The engine rewrites incoming queries so that each layer receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
13+
global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats.
14+
15+
It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source.
16+
17+
Typical use cases include:
18+
19+
- Zero-downtime migrations where "old" and "new" replicas temporarily overlap.
20+
- Tiered storage, for example fresh data on a local cluster and historical data in S3.
21+
- Gradual roll-outs where only a subset of rows should be served from a new backend.
22+
23+
By giving mutually exclusive predicates to the layers (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.
24+
25+
## Engine definition
26+
27+
```sql
28+
CREATE TABLE [IF NOT EXISTS] [db.]table_name
29+
(
30+
column1 type1,
31+
column2 type2,
32+
...
33+
)
34+
ENGINE = Hybrid(table_function_1, predicate_1 [, table_function_2, predicate_2 ...])
35+
```
36+
37+
You must pass at least two arguments – the first table function and its predicate. Additional sources are appended as `table_function, predicate` pairs. The first table function is also used for `INSERT` statements.
38+
39+
### Arguments and behaviour
40+
41+
- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage.
42+
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the layer's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
43+
- The query planner picks the same processing stage for every layer as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
44+
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
45+
- Align schemas across the layers. ClickHouse builds a common header; if the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.
46+
47+
## Example: local cluster plus S3 historical tier
48+
49+
The following commands illustrate a two-layer layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.
50+
51+
```sql
52+
-- Local MergeTree table that keeps current data
53+
CREATE OR REPLACE TABLE btc_blocks_local
54+
(
55+
`hash` FixedString(64),
56+
`version` Int64,
57+
`mediantime` DateTime64(9),
58+
`nonce` Int64,
59+
`bits` FixedString(8),
60+
`difficulty` Float64,
61+
`chainwork` FixedString(64),
62+
`size` Int64,
63+
`weight` Int64,
64+
`coinbase_param` String,
65+
`number` Int64,
66+
`transaction_count` Int64,
67+
`merkle_root` FixedString(64),
68+
`stripped_size` Int64,
69+
`timestamp` DateTime64(9),
70+
`date` Date
71+
)
72+
ENGINE = MergeTree
73+
ORDER BY (timestamp)
74+
PARTITION BY toYYYYMM(date);
75+
76+
-- Hybrid table that unions the local shard with historical data in S3
77+
CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid(
78+
remote('localhost:9000', currentDatabase(), 'btc_blocks_local'), date >= '2025-09-01',
79+
s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01'
80+
) AS btc_blocks_local;
81+
82+
-- Writes target the first (remote) layer
83+
INSERT INTO btc_blocks
84+
SELECT *
85+
FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN)
86+
WHERE date BETWEEN '2025-09-01' AND '2025-09-30';
87+
88+
-- Reads seamlessly combine both predicates
89+
SELECT * FROM btc_blocks WHERE date = '2025-08-01'; -- data from s3
90+
SELECT * FROM btc_blocks WHERE date = '2025-09-05'; -- data from MergeTree (TODO: still analyzes s3)
91+
SELECT * FROM btc_blocks WHERE date IN ('2025-08-31','2025-09-01') -- data from both sources, single copy always
92+
93+
94+
-- Run analytic queries as usual
95+
SELECT
96+
date,
97+
count(),
98+
uniqExact(CAST(hash, 'Nullable(String)')) AS hashes,
99+
sum(CAST(number, 'Nullable(Int64)')) AS blocks_seen
100+
FROM btc_blocks
101+
WHERE date BETWEEN '2025-08-01' AND '2025-09-30'
102+
GROUP BY date
103+
ORDER BY date;
104+
```
105+
106+
Because the predicates are applied inside every layer, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.

src/Interpreters/ClusterProxy/SelectStreamFactory.cpp

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ ASTPtr rewriteSelectQuery(
6767
const ASTPtr & query,
6868
const std::string & remote_database,
6969
const std::string & remote_table,
70-
ASTPtr table_function_ptr)
70+
ASTPtr table_function_ptr,
71+
ASTPtr additional_filter)
7172
{
7273
auto modified_query_ast = query->clone();
7374

@@ -80,8 +81,33 @@ ASTPtr rewriteSelectQuery(
8081

8182
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
8283
{
84+
// Apply additional filter if provided
85+
if (additional_filter)
86+
{
87+
if (select_query.where())
88+
{
89+
/// WHERE <old> AND <filter>
90+
select_query.setExpression(
91+
ASTSelectQuery::Expression::WHERE,
92+
makeASTFunction("and", select_query.where(), additional_filter->clone()));
93+
}
94+
else
95+
{
96+
/// No WHERE – simply set it
97+
select_query.setExpression(
98+
ASTSelectQuery::Expression::WHERE, additional_filter->clone());
99+
}
100+
}
101+
83102
if (table_function_ptr)
84-
select_query.addTableFunction(table_function_ptr);
103+
{
104+
select_query.addTableFunction(table_function_ptr->clone());
105+
106+
// Reset semantic table information for all column identifiers to prevent
107+
// RestoreQualifiedNamesVisitor from adding wrong table names
108+
ResetSemanticTableVisitor::Data data;
109+
ResetSemanticTableVisitor(data).visit(modified_query_ast);
110+
}
85111
else
86112
select_query.replaceDatabaseAndTable(remote_database, remote_table);
87113

@@ -93,6 +119,7 @@ ASTPtr rewriteSelectQuery(
93119
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query->as<ASTSelectQuery &>(), 0));
94120
data.remote_table.database = remote_database;
95121
data.remote_table.table = remote_table;
122+
96123
RestoreQualifiedNamesVisitor(data).visit(modified_query_ast);
97124
}
98125
}

src/Interpreters/ClusterProxy/SelectStreamFactory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ ASTPtr rewriteSelectQuery(
4141
const ASTPtr & query,
4242
const std::string & remote_database,
4343
const std::string & remote_table,
44-
ASTPtr table_function_ptr = nullptr);
44+
ASTPtr table_function_ptr = nullptr,
45+
ASTPtr additional_filter = nullptr);
4546

4647
using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
4748
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;

src/Interpreters/ClusterProxy/executeQuery.cpp

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <Parsers/ASTInsertQuery.h>
1616
#include <Planner/Utils.h>
1717
#include <Processors/QueryPlan/ParallelReplicasLocalPlan.h>
18+
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
1819
#include <Processors/QueryPlan/QueryPlan.h>
1920
#include <Processors/QueryPlan/ReadFromLocalReplica.h>
2021
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
@@ -333,7 +334,8 @@ void executeQuery(
333334
const std::string & sharding_key_column_name,
334335
const DistributedSettings & distributed_settings,
335336
AdditionalShardFilterGenerator shard_filter_generator,
336-
bool is_remote_function)
337+
bool is_remote_function,
338+
std::span<const SelectQueryInfo> additional_query_infos)
337339
{
338340
const Settings & settings = context->getSettingsRef();
339341

@@ -361,6 +363,7 @@ void executeQuery(
361363
new_context->increaseDistributedDepth();
362364

363365
const size_t shards = cluster->getShardCount();
366+
const bool has_additional_query_infos = !additional_query_infos.empty();
364367

365368
if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
366369
{
@@ -470,6 +473,29 @@ void executeQuery(
470473
plans.emplace_back(std::move(plan));
471474
}
472475

476+
if (has_additional_query_infos)
477+
{
478+
if (!header)
479+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Header is not initialized for local hybrid plan creation");
480+
481+
const Block & header_block = *header;
482+
for (const auto & additional_query_info : additional_query_infos)
483+
{
484+
auto additional_plan = createLocalPlan(
485+
additional_query_info.query,
486+
header_block,
487+
context,
488+
processed_stage,
489+
0, /// shard_num is not applicable for local hybrid plans
490+
1, /// shard_count is not applicable for local hybrid plans
491+
false,
492+
false,
493+
"");
494+
495+
plans.emplace_back(std::move(additional_plan));
496+
}
497+
}
498+
473499
if (plans.empty())
474500
return;
475501

@@ -485,6 +511,8 @@ void executeQuery(
485511
input_headers.emplace_back(plan->getCurrentHeader());
486512

487513
auto union_step = std::make_unique<UnionStep>(std::move(input_headers));
514+
if (has_additional_query_infos)
515+
union_step->setStepDescription("Hybrid");
488516
query_plan.unitePlans(std::move(union_step), std::move(plans));
489517
}
490518

src/Interpreters/ClusterProxy/executeQuery.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <Parsers/IAST_fwd.h>
66

77
#include <optional>
8+
#include <span>
89

910
namespace DB
1011
{
@@ -88,7 +89,8 @@ void executeQuery(
8889
const std::string & sharding_key_column_name,
8990
const DistributedSettings & distributed_settings,
9091
AdditionalShardFilterGenerator shard_filter_generator,
91-
bool is_remote_function);
92+
bool is_remote_function,
93+
std::span<const SelectQueryInfo> additional_query_infos = {});
9294

9395
std::optional<QueryPipeline> executeInsertSelectWithParallelReplicas(
9496
const ASTInsertQuery & query_ast,

src/Interpreters/TranslateQualifiedNamesVisitor.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,4 +399,15 @@ void RestoreQualifiedNamesMatcher::visit(ASTIdentifier & identifier, ASTPtr &, D
399399
}
400400
}
401401

402+
void ResetSemanticTableMatcher::visit(ASTPtr & ast, Data & data)
403+
{
404+
if (auto * t = ast->as<ASTIdentifier>())
405+
visit(*t, ast, data);
406+
}
407+
408+
void ResetSemanticTableMatcher::visit(ASTIdentifier & identifier, ASTPtr &, Data &)
409+
{
410+
identifier.resetSemanticTable();
411+
}
412+
402413
}

src/Interpreters/TranslateQualifiedNamesVisitor.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,33 @@ struct RestoreQualifiedNamesMatcher
8080

8181
using RestoreQualifiedNamesVisitor = InDepthNodeVisitor<RestoreQualifiedNamesMatcher, true>;
8282

83+
84+
/// Reset semantic->table for all column identifiers in the AST.
85+
///
86+
/// PROBLEM DESCRIPTION:
87+
/// When an AST is passed through multiple query rewrites (e.g., in Hybrid -> remote),
88+
/// the semantic->table information attached to ASTIdentifier nodes can become stale and
89+
/// cause incorrect column qualification. This happens because:
90+
///
91+
/// 1. During initial parsing, semantic->table is populated with the original table name
92+
/// 2. When the query is rewritten (e.g., FROM clause changed from table to remote() function inside Hybrid),
93+
/// the AST structure is modified but semantic->table information is preserved
94+
/// 3. Subsequent visitors like RestoreQualifiedNamesVisitor called in remote() function over the same AST
95+
/// may use this stale semantic->table information to incorrectly qualify column names with the original table name
96+
///
97+
/// SOLUTION:
98+
/// This visitor clears semantic->table for all column identifiers, ensuring that subsequent
99+
/// visitors work with clean semantic information and don't apply stale table qualifications.
100+
struct ResetSemanticTableMatcher
101+
{
102+
// No data needed for this visitor
103+
struct Data {};
104+
105+
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
106+
static void visit(ASTPtr & ast, Data & data);
107+
static void visit(ASTIdentifier & identifier, ASTPtr &, Data & data);
108+
};
109+
110+
using ResetSemanticTableVisitor = InDepthNodeVisitor<ResetSemanticTableMatcher, true>;
111+
83112
}

src/Parsers/ASTIdentifier.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,17 @@ void ASTIdentifier::restoreTable()
167167
}
168168
}
169169

170+
void ASTIdentifier::resetSemanticTable()
171+
{
172+
// Only reset semantic table for column identifiers (not table identifiers)
173+
if (semantic && !semantic->special)
174+
{
175+
semantic->table.clear();
176+
semantic->can_be_alias = true;
177+
semantic->membership = std::nullopt;
178+
}
179+
}
180+
170181
std::shared_ptr<ASTTableIdentifier> ASTIdentifier::createTable() const
171182
{
172183
if (name_parts.size() == 1) return std::make_shared<ASTTableIdentifier>(name_parts[0]);

src/Parsers/ASTIdentifier.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class ASTIdentifier : public ASTWithAlias
5252
void updateTreeHashImpl(SipHash & hash_state, bool ignore_alias) const override;
5353

5454
void restoreTable(); // TODO(ilezhankin): get rid of this
55+
void resetSemanticTable(); // Reset semantic to empty string (see ResetSemanticTableVisitor)
5556
std::shared_ptr<ASTTableIdentifier> createTable() const; // returns |nullptr| if identifier is not table.
5657

5758
String full_name;

src/Planner/PlannerActionsVisitor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ class ActionNodeNameHelper
348348
}
349349
default:
350350
{
351-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {}", node->formatASTForErrorMessage());
351+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {} (node_type: {})", node->formatASTForErrorMessage(), static_cast<int>(node_type));
352352
}
353353
}
354354

0 commit comments

Comments
 (0)