Skip to content

Commit d13eab3

Browse files
authored
Merge branch 'antalya-25.8' into backports/antalya-25.8/87600
2 parents 126b9a7 + 25ed9aa commit d13eab3

File tree

15 files changed

+458
-35
lines changed

15 files changed

+458
-35
lines changed

.github/actions/create_workflow_report/create_workflow_report.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,9 @@ def get_checks_known_fails(
229229
if len(known_fails) == 0:
230230
return pd.DataFrame()
231231

232-
query = f"""SELECT job_status, job_name, status as test_status, test_name, results_link
232+
query = f"""SELECT job_name, status as test_status, test_name, results_link
233233
FROM (
234234
SELECT
235-
argMax(check_status, check_start_time) as job_status,
236235
check_name as job_name,
237236
argMax(test_status, check_start_time) as status,
238237
test_name,

ci/jobs/scripts/functional_tests_results.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ def run(self):
356356
pass
357357

358358
if not info:
359-
info = f"Failed: {s.failed}, Passed: {s.success}, Skipped: {s.skipped}"
359+
info = f"Failed: {s.failed}, Passed: {s.success}, Skipped: {s.skipped}, Broken: {s.broken}"
360360

361361
result = Result.create_from(
362362
name="Tests",

ci/jobs/scripts/integration_tests_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1380,7 +1380,7 @@ def run_normal_check(self):
13801380
for c in counters[state]
13811381
]
13821382
failed_sum = len(counters["FAILED"]) + len(counters["ERROR"])
1383-
status_text = f"fail: {failed_sum}, passed: {len(counters['PASSED'])}"
1383+
status_text = f"fail: {failed_sum}, passed: {len(counters['PASSED'])}, broken: {len(counters['BROKEN'])}"
13841384

13851385
if not counters or sum(len(counter) for counter in counters.values()) == 0:
13861386
status_text = "No tests found for some reason! It's a bug"
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
# ALTER TABLE EXPORT PART
2+
3+
## Overview
4+
5+
The `ALTER TABLE EXPORT PART` command exports individual MergeTree data parts to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format.
6+
7+
**Key Characteristics:**
8+
- **Experimental feature** - must be enabled via `allow_experimental_export_merge_tree_part` setting
9+
- **Asynchronous** - executes in the background, returns immediately
10+
- **Ephemeral** - no automatic retry mechanism; manual retry required on failure
11+
- **Idempotent** - safe to re-export the same part (skips by default if file exists)
12+
- **Preserves sort order** from the source table
13+
14+
## Syntax
15+
16+
```sql
17+
ALTER TABLE [database.]table_name
18+
EXPORT PART 'part_name'
19+
TO TABLE [destination_database.]destination_table
20+
SETTINGS allow_experimental_export_merge_tree_part = 1
21+
[, setting_name = value, ...]
22+
```
23+
24+
### Parameters
25+
26+
- **`table_name`**: The source MergeTree table containing the part to export
27+
- **`part_name`**: The exact name of the data part to export (e.g., `'2020_1_1_0'`, `'all_1_1_0'`)
28+
- **`destination_table`**: The target table for the export (typically an S3, Azure, or other object storage table)
29+
30+
## Requirements
31+
32+
Source and destination tables must be 100% compatible:
33+
34+
1. **Identical schemas** - same columns, types, and order
35+
2. **Matching partition keys** - partition expressions must be identical
36+
37+
## Settings
38+
39+
### `allow_experimental_export_merge_tree_part` (Required)
40+
41+
- **Type**: `Bool`
42+
- **Default**: `false`
43+
- **Description**: Must be set to `true` to enable the experimental feature.
44+
45+
### `export_merge_tree_part_overwrite_file_if_exists` (Optional)
46+
47+
- **Type**: `Bool`
48+
- **Default**: `false`
49+
- **Description**: If set to `true`, it will overwrite the file. Otherwise, fails with exception.
50+
51+
## Examples
52+
53+
### Basic Export to S3
54+
55+
```sql
56+
-- Create source and destination tables
57+
CREATE TABLE mt_table (id UInt64, year UInt16)
58+
ENGINE = MergeTree() PARTITION BY year ORDER BY tuple();
59+
60+
CREATE TABLE s3_table (id UInt64, year UInt16)
61+
ENGINE = S3(s3_conn, filename='data', format=Parquet, partition_strategy='hive')
62+
PARTITION BY year;
63+
64+
-- Insert and export
65+
INSERT INTO mt_table VALUES (1, 2020), (2, 2020), (3, 2021);
66+
67+
ALTER TABLE mt_table EXPORT PART '2020_1_1_0' TO TABLE s3_table
68+
SETTINGS allow_experimental_export_merge_tree_part = 1;
69+
70+
ALTER TABLE mt_table EXPORT PART '2021_2_2_0' TO TABLE s3_table
71+
SETTINGS allow_experimental_export_merge_tree_part = 1;
72+
```
73+
74+
## Monitoring
75+
76+
### Active Exports
77+
78+
Active exports can be found in the `system.exports` table. As of now, it only shows currently executing exports. It will not show pending or finished exports.
79+
80+
```sql
81+
arthur :) select * from system.exports;
82+
83+
SELECT *
84+
FROM system.exports
85+
86+
Query id: 2026718c-d249-4208-891b-a271f1f93407
87+
88+
Row 1:
89+
──────
90+
source_database: default
91+
source_table: source_mt_table
92+
destination_database: default
93+
destination_table: destination_table
94+
create_time: 2025-11-19 09:09:11
95+
part_name: 20251016-365_1_1_0
96+
destination_file_path: table_root/eventDate=2025-10-16/retention=365/20251016-365_1_1_0_17B2F6CD5D3C18E787C07AE3DAF16EB1.parquet
97+
elapsed: 2.04845441
98+
rows_read: 1138688 -- 1.14 million
99+
total_rows_to_read: 550961374 -- 550.96 million
100+
total_size_bytes_compressed: 37619147120 -- 37.62 billion
101+
total_size_bytes_uncompressed: 138166213721 -- 138.17 billion
102+
bytes_read_uncompressed: 316892925 -- 316.89 million
103+
memory_usage: 596006095 -- 596.01 million
104+
peak_memory_usage: 601239033 -- 601.24 million
105+
```
106+
107+
### Export History
108+
109+
You can query succeeded or failed exports in `system.part_log`. For now, it only keeps track of completion events (either success or fails).
110+
111+
```sql
112+
arthur :) select * from system.part_log where event_type='ExportPart' and table = 'replicated_source' order by event_time desc limit 1;
113+
114+
SELECT *
115+
FROM system.part_log
116+
WHERE (event_type = 'ExportPart') AND (`table` = 'replicated_source')
117+
ORDER BY event_time DESC
118+
LIMIT 1
119+
120+
Query id: ae1c1cd3-c20e-4f20-8b82-ed1f6af0237f
121+
122+
Row 1:
123+
──────
124+
hostname: arthur
125+
query_id:
126+
event_type: ExportPart
127+
merge_reason: NotAMerge
128+
merge_algorithm: Undecided
129+
event_date: 2025-11-19
130+
event_time: 2025-11-19 09:08:31
131+
event_time_microseconds: 2025-11-19 09:08:31.974701
132+
duration_ms: 4
133+
database: default
134+
table: replicated_source
135+
table_uuid: 78471c67-24f4-4398-9df5-ad0a6c3daf41
136+
part_name: 2021_0_0_0
137+
partition_id: 2021
138+
partition: 2021
139+
part_type: Compact
140+
disk_name: default
141+
path_on_disk: year=2021/2021_0_0_0_78C704B133D41CB0EF64DD2A9ED3B6BA.parquet
142+
rows: 1
143+
size_in_bytes: 272
144+
merged_from: ['2021_0_0_0']
145+
bytes_uncompressed: 86
146+
read_rows: 1
147+
read_bytes: 6
148+
peak_memory_usage: 22
149+
error: 0
150+
exception:
151+
ProfileEvents: {}
152+
```
153+
154+
### Profile Events
155+
156+
- `PartsExports` - Successful exports
157+
- `PartsExportFailures` - Failed exports
158+
- `PartsExportDuplicated` - Number of part exports that failed because target already exists.
159+
- `PartsExportTotalMilliseconds` - Total time
160+
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
---
2+
description: 'System table containing information about in progress merge tree part exports'
3+
keywords: ['system table', 'exports', 'merge tree', 'part']
4+
slug: /operations/system-tables/exports
5+
title: 'system.exports'
6+
---
7+
8+
Contains information about in progress merge tree part exports
9+
10+
Columns:
11+
12+
- `source_database` ([String](/docs/en/sql-reference/data-types/string.md)) — Name of the source database.
13+
- `source_table` ([String](/docs/en/sql-reference/data-types/string.md)) — Name of the source table.
14+
- `destination_database` ([String](/docs/en/sql-reference/data-types/string.md)) — Name of the destination database.
15+
- `destination_table` ([String](/docs/en/sql-reference/data-types/string.md)) — Name of the destination table.
16+
- `create_time` ([DateTime](/docs/en/sql-reference/data-types/datetime.md)) — Date and time when the export command was received in the server.
17+
- `part_name` ([String](/docs/en/sql-reference/data-types/string.md)) — Name of the part.
18+
- `destination_file_path` ([String](/docs/en/sql-reference/data-types/string.md)) — File path relative to where the part is being exported to.
19+
- `elapsed` ([Float64](/docs/en/sql-reference/data-types/float.md)) — The time elapsed (in seconds) since the export started.
20+
- `rows_read` ([UInt64](/docs/en/sql-reference/data-types/int-uint.md)) — The number of rows read from the exported part.
21+
- `total_rows_to_read` ([UInt64](/docs/en/sql-reference/data-types/int-uint.md)) — The total number of rows to read from the exported part.
22+
- `total_size_bytes_compressed` ([UInt64](/docs/en/sql-reference/data-types/int-uint.md)) — The total size of the compressed data in the exported part.
23+
- `total_size_bytes_uncompressed` ([UInt64](/docs/en/sql-reference/data-types/int-uint.md)) — The total size of the uncompressed data in the exported part.
24+
- `bytes_read_uncompressed` ([UInt64](/docs/en/sql-reference/data-types/int-uint.md)) — The number of uncompressed bytes read from the exported part.
25+
- `memory_usage` ([UInt64](/docs/en/sql-reference/data-types/int-uint.md)) — Current memory usage in bytes for the export operation.
26+
- `peak_memory_usage` ([UInt64](/docs/en/sql-reference/data-types/int-uint.md)) — Peak memory usage in bytes during the export operation.
27+
28+
**Example**
29+
30+
```sql
31+
arthur :) select * from system.exports;
32+
33+
SELECT *
34+
FROM system.exports
35+
36+
Query id: 2026718c-d249-4208-891b-a271f1f93407
37+
38+
Row 1:
39+
──────
40+
source_database: default
41+
source_table: source_mt_table
42+
destination_database: default
43+
destination_table: destination_table
44+
create_time: 2025-11-19 09:09:11
45+
part_name: 20251016-365_1_1_0
46+
destination_file_path: table_root/eventDate=2025-10-16/retention=365/20251016-365_1_1_0_17B2F6CD5D3C18E787C07AE3DAF16EB1.parquet
47+
elapsed: 2.04845441
48+
rows_read: 1138688 -- 1.14 million
49+
total_rows_to_read: 550961374 -- 550.96 million
50+
total_size_bytes_compressed: 37619147120 -- 37.62 billion
51+
total_size_bytes_uncompressed: 138166213721 -- 138.17 billion
52+
bytes_read_uncompressed: 316892925 -- 316.89 million
53+
memory_usage: 596006095 -- 596.01 million
54+
peak_memory_usage: 601239033 -- 601.24 million
55+
```
56+

src/Databases/DataLake/GlueCatalog.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
485485
DB::StoredObject metadata_stored_object(metadata_path);
486486
auto read_buf = object_storage->readObject(metadata_stored_object, read_settings);
487487
String metadata_file;
488-
readString(metadata_file, *read_buf);
488+
readStringUntilEOF(metadata_file, *read_buf);
489489

490490
Poco::JSON::Parser parser;
491491
Poco::Dynamic::Var result = parser.parse(metadata_file);

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo
4747

4848
bool ExportPartTask::executeStep()
4949
{
50-
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
50+
const auto & metadata_snapshot = manifest.storage_snapshot->metadata;
51+
5152
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
52-
StorageSnapshotPtr storage_snapshot = storage.getStorageSnapshot(metadata_snapshot, local_context);
5353

5454
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
5555

@@ -142,13 +142,8 @@ bool ExportPartTask::executeStep()
142142
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
143143
bool prefetch = false;
144144

145-
MergeTreeData::IMutationsSnapshot::Params params
146-
{
147-
.metadata_version = metadata_snapshot->getMetadataVersion(),
148-
.min_part_metadata_version = manifest.data_part->getMetadataVersion(),
149-
};
150-
151-
auto mutations_snapshot = storage.getMutationsSnapshot(params);
145+
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*manifest.storage_snapshot->data);
146+
auto mutations_snapshot = snapshot_data.mutations_snapshot;
152147

153148
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
154149
manifest.data_part,
@@ -161,7 +156,7 @@ bool ExportPartTask::executeStep()
161156
read_type,
162157
plan_for_part,
163158
storage,
164-
storage_snapshot,
159+
manifest.storage_snapshot,
165160
RangesInDataPart(manifest.data_part),
166161
alter_conversions,
167162
nullptr,

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6239,13 +6239,13 @@ void MergeTreeData::exportPartToTable(
62396239
return ast ? ast->formatWithSecretsOneLine() : "";
62406240
};
62416241

6242-
auto src_snapshot = getInMemoryMetadataPtr();
6243-
auto destination_snapshot = dest_storage->getInMemoryMetadataPtr();
6242+
auto source_metadata_ptr = getInMemoryMetadataPtr();
6243+
auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr();
62446244

6245-
if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
6245+
if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical()))
62466246
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
62476247

6248-
if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST()))
6248+
if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST()))
62496249
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key");
62506250

62516251
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
@@ -6262,6 +6262,7 @@ void MergeTreeData::exportPartToTable(
62626262
transaction_id,
62636263
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
62646264
format_settings,
6265+
getStorageSnapshot(source_metadata_ptr, query_context),
62656266
completion_callback);
62666267

62676268
std::lock_guard lock(export_manifests_mutex);

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <Interpreters/StorageID.h>
44
#include <Storages/MergeTree/IMergeTreeDataPart.h>
5+
#include <Storages/StorageSnapshot.h>
56
#include <QueryPipeline/QueryPipeline.h>
67
#include <optional>
78

@@ -46,12 +47,14 @@ struct MergeTreePartExportManifest
4647
const String & transaction_id_,
4748
FileAlreadyExistsPolicy file_already_exists_policy_,
4849
const FormatSettings & format_settings_,
50+
const StorageSnapshotPtr & storage_snapshot_,
4951
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
5052
: destination_storage_id(destination_storage_id_),
5153
data_part(data_part_),
5254
transaction_id(transaction_id_),
5355
file_already_exists_policy(file_already_exists_policy_),
5456
format_settings(format_settings_),
57+
storage_snapshot(storage_snapshot_),
5558
completion_callback(completion_callback_),
5659
create_time(time(nullptr)) {}
5760

@@ -62,6 +65,10 @@ struct MergeTreePartExportManifest
6265
FileAlreadyExistsPolicy file_already_exists_policy;
6366
FormatSettings format_settings;
6467

68+
/// Storage snapshot captured at the time of query validation to prevent race conditions with mutations
69+
/// Otherwise the export could fail if the schema changes between validation and execution
70+
StorageSnapshotPtr storage_snapshot;
71+
6572
std::function<void(CompletionCallbackResult)> completion_callback;
6673

6774
time_t create_time;

src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,21 @@ TEST_F(ExportPartitionOrderingTest, IterationOrderMatchesCreateTime)
2626
manifest1.partition_id = "2020";
2727
manifest1.destination_database = "db1";
2828
manifest1.destination_table = "table1";
29+
manifest1.transaction_id = "tx1";
2930
manifest1.create_time = base_time + 300; // Latest
3031

3132
ExportReplicatedMergeTreePartitionManifest manifest2;
3233
manifest2.partition_id = "2021";
3334
manifest2.destination_database = "db1";
3435
manifest2.destination_table = "table1";
36+
manifest2.transaction_id = "tx2";
3537
manifest2.create_time = base_time + 100; // Middle
3638

3739
ExportReplicatedMergeTreePartitionManifest manifest3;
3840
manifest3.partition_id = "2022";
3941
manifest3.destination_database = "db1";
4042
manifest3.destination_table = "table1";
43+
manifest3.transaction_id = "tx3";
4144
manifest3.create_time = base_time; // Oldest
4245

4346
ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}};

0 commit comments

Comments
 (0)