Skip to content

Commit 02ebff1

Browse files
authored
Merge branch 'antalya-25.8' into bugfix/antalya-25.8/fix_in_for_cluster_request
2 parents 09d938f + 8b8535b commit 02ebff1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1323
-387
lines changed
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# ALTER TABLE EXPORT PARTITION
2+
3+
## Overview
4+
5+
The `ALTER TABLE EXPORT PARTITION` command exports entire partitions from Replicated*MergeTree tables to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format. This feature coordinates export part operations across all replicas using ZooKeeper.
6+
7+
Each MergeTree part will become a separate file with the following name convention: `<table_directory>/<partitioning>/<data_part_name>_<merge_tree_part_checksum>.<format>`. To ensure atomicity, a commit file containing the relative paths of all exported parts is also shipped. A data file should only be considered part of the dataset if a commit file references it. The commit file will be named using the following convention: `<table_directory>/commit_<partition_id>_<transaction_id>`.
8+
9+
The set of parts that are exported is based on the list of parts the replica that received the export command sees. The other replicas will assist in the export process if they have those parts locally. Otherwise they will ignore it.
10+
11+
The partition export tasks can be observed through `system.replicated_partition_exports`. Querying this table results in a query to ZooKeeper, so it must be used with care. Individual part export progress can be observed as usual through `system.exports`.
12+
13+
The same partition can not be exported to the same destination more than once. There are two ways to override this behavior: either by setting the `export_merge_tree_partition_force_export` setting or waiting for the task to expire.
14+
15+
The export task can be killed by issuing the kill command: `KILL EXPORT PARTITION <where predicate for system.replicated_partition_exports>`.
16+
17+
The task is persistent - it should be resumed after crashes, failures and etc.
18+
19+
## Syntax
20+
21+
```sql
22+
ALTER TABLE [database.]table_name
23+
EXPORT PARTITION ID 'partition_id'
24+
TO TABLE [destination_database.]destination_table
25+
[SETTINGS setting_name = value, ...]
26+
```
27+
28+
### Parameters
29+
30+
- **`table_name`**: The source Replicated*MergeTree table containing the partition to export
31+
- **`partition_id`**: The partition identifier to export (e.g., `'2020'`, `'2021'`)
32+
- **`destination_table`**: The target table for the export (typically an S3, Azure, or other object storage table)
33+
34+
## Settings
35+
36+
### Server Settings
37+
38+
#### `enable_experimental_export_merge_tree_partition_feature` (Required)
39+
40+
- **Type**: `Bool`
41+
- **Default**: `false`
42+
- **Description**: Enable export replicated merge tree partition feature. It is experimental and not yet ready for production use.
43+
44+
### Query Settings
45+
46+
#### `export_merge_tree_partition_force_export` (Optional)
47+
48+
- **Type**: `Bool`
49+
- **Default**: `false`
50+
- **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition to the same destination before the manifest expires.
51+
52+
#### `export_merge_tree_partition_max_retries` (Optional)
53+
54+
- **Type**: `UInt64`
55+
- **Default**: `3`
56+
- **Description**: Maximum number of retries for exporting a merge tree part in an export partition task. If it exceeds, the entire task fails.
57+
58+
#### `export_merge_tree_partition_manifest_ttl` (Optional)
59+
60+
- **Type**: `UInt64`
61+
- **Default**: `180` (seconds)
62+
- **Description**: Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination. This setting does not affect or delete in-progress tasks; it only cleans up completed ones.
63+
64+
#### `export_merge_tree_part_file_already_exists_policy` (Optional)
65+
66+
- **Type**: `MergeTreePartExportFileAlreadyExistsPolicy`
67+
- **Default**: `skip`
68+
- **Description**: Policy for handling files that already exist during export. Possible values:
69+
- `skip` - Skip the file if it already exists
70+
- `error` - Throw an error if the file already exists
71+
- `overwrite` - Overwrite the file
72+
73+
## Examples
74+
75+
### Basic Export to S3
76+
77+
```sql
78+
CREATE TABLE rmt_table (id UInt64, year UInt16)
79+
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/rmt_table', 'replica1')
80+
PARTITION BY year ORDER BY tuple();
81+
82+
CREATE TABLE s3_table (id UInt64, year UInt16)
83+
ENGINE = S3(s3_conn, filename='data', format=Parquet, partition_strategy='hive')
84+
PARTITION BY year;
85+
86+
INSERT INTO rmt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021);
87+
88+
ALTER TABLE rmt_table EXPORT PARTITION ID '2020' TO TABLE s3_table;
89+
90+
## Killing Exports
91+
92+
You can cancel in-progress partition exports using the `KILL EXPORT PARTITION` command:
93+
94+
```sql
95+
KILL EXPORT PARTITION
96+
WHERE partition_id = '2020'
97+
AND source_table = 'rmt_table'
98+
AND destination_table = 's3_table'
99+
```
100+
101+
The `WHERE` clause filters exports from the `system.replicated_partition_exports` table. You can use any columns from that table in the filter.
102+
103+
## Monitoring
104+
105+
### Active and Completed Exports
106+
107+
Monitor partition exports using the `system.replicated_partition_exports` table:
108+
109+
```sql
110+
arthur :) select * from system.replicated_partition_exports Format Vertical;
111+
112+
SELECT *
113+
FROM system.replicated_partition_exports
114+
FORMAT Vertical
115+
116+
Query id: 9efc271a-a501-44d1-834f-bc4d20156164
117+
118+
Row 1:
119+
──────
120+
source_database: default
121+
source_table: replicated_source
122+
destination_database: default
123+
destination_table: replicated_destination
124+
create_time: 2025-11-21 18:21:51
125+
partition_id: 2022
126+
transaction_id: 7397746091717128192
127+
source_replica: r1
128+
parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0']
129+
parts_count: 3
130+
parts_to_do: 0
131+
status: COMPLETED
132+
exception_replica:
133+
last_exception:
134+
exception_part:
135+
exception_count: 0
136+
137+
Row 2:
138+
──────
139+
source_database: default
140+
source_table: replicated_source
141+
destination_database: default
142+
destination_table: replicated_destination
143+
create_time: 2025-11-21 18:20:35
144+
partition_id: 2021
145+
transaction_id: 7397745772618674176
146+
source_replica: r1
147+
parts: ['2021_0_0_0']
148+
parts_count: 1
149+
parts_to_do: 0
150+
status: COMPLETED
151+
exception_replica:
152+
last_exception:
153+
exception_part:
154+
exception_count: 0
155+
156+
2 rows in set. Elapsed: 0.019 sec.
157+
158+
arthur :)
159+
```
160+
161+
Status values include:
162+
- `PENDING` - Export is queued / in progress
163+
- `COMPLETED` - Export finished successfully
164+
- `FAILED` - Export failed
165+
- `KILLED` - Export was cancelled
166+
167+
## Related Features
168+
169+
- [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated)
170+

docs/en/engines/table-engines/special/tiered-distributed.md renamed to docs/en/engines/table-engines/special/hybrid.md

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
2-
description: 'Hybrid unions multiple data sources behind per-layer predicates so queries behave like a single table while data is migrated or tiered.'
3-
slug: /engines/table-engines/special/tiered-distributed
2+
description: 'Hybrid unions multiple data sources behind per-segment predicates so queries behave like a single table while data is migrated or tiered.'
3+
slug: /engines/table-engines/special/hybrid
44
title: 'Hybrid Table Engine'
55
sidebar_label: 'Hybrid'
66
sidebar_position: 11
@@ -9,7 +9,7 @@ sidebar_position: 11
99
# Hybrid table engine
1010

1111
`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate.
12-
The engine rewrites incoming queries so that each layer receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
12+
The engine rewrites incoming queries so that each segment receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
1313
global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats.
1414

1515
It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source.
@@ -20,7 +20,21 @@ Typical use cases include:
2020
- Tiered storage, for example fresh data on a local cluster and historical data in S3.
2121
- Gradual roll-outs where only a subset of rows should be served from a new backend.
2222

23-
By giving mutually exclusive predicates to the layers (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.
23+
By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.
24+
25+
## Enable the engine
26+
27+
The Hybrid engine is experimental. Enable it per session (or in the user profile) before creating tables:
28+
29+
```sql
30+
SET allow_experimental_hybrid_table = 1;
31+
```
32+
33+
### Automatic Type Alignment
34+
35+
Hybrid segments can evolve independently, so the same logical column may use different physical types. With the experimental `hybrid_table_auto_cast_columns = 1` **(enabled by default and requires `allow_experimental_analyzer = 1`)**, the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. You can opt out by setting the flag to `0` if it causes issues.
36+
37+
Segment schemas are cached when you create or attach a Hybrid table. If you alter a segment later (for example change a column type), refresh the Hybrid table (detach/attach or recreate it) so the cached headers stay in sync with the new schema; otherwise the auto-cast feature may miss the change and queries can still fail with header/type errors.
2438

2539
## Engine definition
2640

@@ -39,14 +53,14 @@ You must pass at least two arguments – the first table function and its predic
3953
### Arguments and behaviour
4054

4155
- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage.
42-
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the layer's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
43-
- The query planner picks the same processing stage for every layer as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
56+
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the segment's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
57+
- The query planner picks the same processing stage for every segment as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
4458
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
45-
- Align schemas across the layers. ClickHouse builds a common header; if the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.
59+
- Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.
4660

4761
## Example: local cluster plus S3 historical tier
4862

49-
The following commands illustrate a two-layer layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.
63+
The following commands illustrate a two-segment layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.
5064

5165
```sql
5266
-- Local MergeTree table that keeps current data
@@ -79,7 +93,7 @@ CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid(
7993
s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01'
8094
) AS btc_blocks_local;
8195

82-
-- Writes target the first (remote) layer
96+
-- Writes target the first (remote) segment
8397
INSERT INTO btc_blocks
8498
SELECT *
8599
FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN)
@@ -103,4 +117,4 @@ GROUP BY date
103117
ORDER BY date;
104118
```
105119

106-
Because the predicates are applied inside every layer, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.
120+
Because the predicates are applied inside every segment, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
#include <Analyzer/Passes/HybridCastsPass.h>
2+
3+
#include <Analyzer/QueryTreeBuilder.h>
4+
#include <Analyzer/QueryTreePassManager.h>
5+
#include <Analyzer/Passes/QueryAnalysisPass.h>
6+
#include <Analyzer/Utils.h>
7+
#include <Analyzer/Resolve/IdentifierResolver.h>
8+
#include <Analyzer/QueryNode.h>
9+
#include <Analyzer/TableNode.h>
10+
#include <Analyzer/UnionNode.h>
11+
#include <Analyzer/FunctionNode.h>
12+
#include <Analyzer/ColumnNode.h>
13+
#include <Analyzer/InDepthQueryTreeVisitor.h>
14+
15+
#include <Storages/IStorage.h>
16+
#include <Storages/StorageDistributed.h>
17+
18+
#include <Core/Settings.h>
19+
#include <Core/SettingsEnums.h>
20+
#include <Common/Exception.h>
21+
22+
namespace DB
23+
{
24+
25+
namespace Setting
26+
{
27+
extern const SettingsBool hybrid_table_auto_cast_columns;
28+
}
29+
30+
namespace ErrorCodes
31+
{
32+
extern const int LOGICAL_ERROR;
33+
}
34+
35+
namespace
36+
{
37+
38+
/// Collect Hybrid table expressions that require casts to normalize headers across segments.
39+
///
40+
/// Hybrid is currently exposed only as an engine (TableNode). If it ever gets a table function
41+
/// wrapper, this visitor must also look at TableFunctionNode and unwrap to the underlying
42+
/// StorageDistributed so cached casts can be picked up there as well.
43+
class HybridCastTablesCollector : public InDepthQueryTreeVisitor<HybridCastTablesCollector>
44+
{
45+
public:
46+
explicit HybridCastTablesCollector(std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_)
47+
: cast_map(cast_map_)
48+
{}
49+
50+
static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr &) { return true; }
51+
52+
void visitImpl(QueryTreeNodePtr & node)
53+
{
54+
const auto * table = node->as<TableNode>();
55+
if (!table)
56+
return;
57+
58+
const auto * storage = table->getStorage().get();
59+
if (const auto * distributed = typeid_cast<const StorageDistributed *>(storage))
60+
{
61+
ColumnsDescription to_cast = distributed->getColumnsToCast();
62+
if (!to_cast.empty())
63+
cast_map.emplace(node.get(), std::move(to_cast)); // repeated table_expression can overwrite
64+
}
65+
}
66+
67+
private:
68+
std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
69+
};
70+
71+
// Visitor replaces all usages of the column with CAST(column, type) in the query tree.
72+
class HybridCastVisitor : public InDepthQueryTreeVisitor<HybridCastVisitor>
73+
{
74+
public:
75+
HybridCastVisitor(
76+
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_,
77+
ContextPtr context_)
78+
: cast_map(cast_map_)
79+
, context(std::move(context_))
80+
{}
81+
82+
bool shouldTraverseTopToBottom() const { return false; }
83+
84+
static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & child)
85+
{
86+
/// Traverse all child nodes so casts also apply inside subqueries and UNION branches.
87+
(void)child;
88+
return true;
89+
}
90+
91+
void visitImpl(QueryTreeNodePtr & node)
92+
{
93+
auto * column_node = node->as<ColumnNode>();
94+
if (!column_node)
95+
return;
96+
97+
auto column_source = column_node->getColumnSourceOrNull();
98+
if (!column_source)
99+
return;
100+
101+
auto it = cast_map.find(column_source.get());
102+
if (it == cast_map.end())
103+
return;
104+
105+
const auto & column_name = column_node->getColumnName();
106+
auto expected_column_opt = it->second.tryGetPhysical(column_name);
107+
if (!expected_column_opt)
108+
return;
109+
110+
auto column_clone = std::static_pointer_cast<ColumnNode>(column_node->clone());
111+
112+
auto cast_node = buildCastFunction(column_clone, expected_column_opt->type, context);
113+
const auto & alias = node->getAlias();
114+
if (!alias.empty())
115+
cast_node->setAlias(alias);
116+
else
117+
cast_node->setAlias(expected_column_opt->name);
118+
119+
node = cast_node;
120+
}
121+
122+
private:
123+
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
124+
ContextPtr context;
125+
};
126+
127+
128+
} // namespace
129+
130+
void HybridCastsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
131+
{
132+
const auto & settings = context->getSettingsRef();
133+
if (!settings[Setting::hybrid_table_auto_cast_columns])
134+
return;
135+
136+
auto * query = query_tree_node->as<QueryNode>();
137+
if (!query)
138+
return;
139+
140+
std::unordered_map<const IQueryTreeNode *, ColumnsDescription> cast_map;
141+
HybridCastTablesCollector collector(cast_map);
142+
collector.visit(query_tree_node);
143+
if (cast_map.empty())
144+
return;
145+
146+
HybridCastVisitor visitor(cast_map, context);
147+
visitor.visit(query_tree_node);
148+
}
149+
150+
}

0 commit comments

Comments
 (0)