Skip to content

Commit 8df92c7

Browse files
committed
Merge branch 'antalya-25.8' of github.com:Altinity/ClickHouse into backports/antalya-25.8/88341_disable_catalogs_in_system_tables
2 parents 057da6d + c24224b commit 8df92c7

28 files changed

+611
-172
lines changed

docs/en/engines/table-engines/mergetree-family/part_export.md

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## Overview
44

5-
The `ALTER TABLE EXPORT PART` command exports individual MergeTree data parts to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format.
5+
The `ALTER TABLE EXPORT PART` command exports individual MergeTree data parts to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format. A commit file is shipped to the same destination directory containing all data files exported within that transaction.
66

77
**Key Characteristics:**
88
- **Experimental feature** - must be enabled via `allow_experimental_export_merge_tree_part` setting
@@ -48,6 +48,18 @@ Source and destination tables must be 100% compatible:
4848
- **Default**: `false`
4949
- **Description**: If set to `true`, it will overwrite the file. Otherwise, fails with exception.
5050

51+
### `export_merge_tree_part_max_bytes_per_file` (Optional)
52+
53+
- **Type**: `UInt64`
54+
- **Default**: `0`
55+
- **Description**: Maximum number of bytes to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care.
56+
57+
### `export_merge_tree_part_max_rows_per_file` (Optional)
58+
59+
- **Type**: `UInt64`
60+
- **Default**: `0`
61+
- **Description**: Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care.
62+
5163
## Examples
5264

5365
### Basic Export to S3
@@ -93,7 +105,7 @@ destination_database: default
93105
destination_table: destination_table
94106
create_time: 2025-11-19 09:09:11
95107
part_name: 20251016-365_1_1_0
96-
destination_file_path: table_root/eventDate=2025-10-16/retention=365/20251016-365_1_1_0_17B2F6CD5D3C18E787C07AE3DAF16EB1.parquet
108+
destination_file_paths: ['table_root/eventDate=2025-10-16/retention=365/20251016-365_1_1_0_17B2F6CD5D3C18E787C07AE3DAF16EB1.1.parquet']
97109
elapsed: 2.04845441
98110
rows_read: 1138688 -- 1.14 million
99111
total_rows_to_read: 550961374 -- 550.96 million
@@ -138,7 +150,8 @@ partition_id: 2021
138150
partition: 2021
139151
part_type: Compact
140152
disk_name: default
141-
path_on_disk: year=2021/2021_0_0_0_78C704B133D41CB0EF64DD2A9ED3B6BA.parquet
153+
path_on_disk:
154+
remote_file_paths ['year=2021/2021_0_0_0_78C704B133D41CB0EF64DD2A9ED3B6BA.1.parquet']
142155
rows: 1
143156
size_in_bytes: 272
144157
merged_from: ['2021_0_0_0']
@@ -158,3 +171,99 @@ ProfileEvents: {}
158171
- `PartsExportDuplicated` - Number of part exports that failed because target already exists.
159172
- `PartsExportTotalMilliseconds` - Total time
160173

174+
### Split large files
175+
176+
```sql
177+
alter table big_table export part '2025_0_32_3' to table replicated_big_destination SETTINGS export_merge_tree_part_max_bytes_per_file=10000000, output_format_parquet_row_group_size_bytes=5000000;
178+
179+
arthur :) select * from system.exports;
180+
181+
SELECT *
182+
FROM system.exports
183+
184+
Query id: d78d9ce5-cfbc-4957-b7dd-bc8129811634
185+
186+
Row 1:
187+
──────
188+
source_database: default
189+
source_table: big_table
190+
destination_database: default
191+
destination_table: replicated_big_destination
192+
create_time: 2025-12-15 13:12:48
193+
part_name: 2025_0_32_3
194+
destination_file_paths: ['replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.1.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.2.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.3.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.4.parquet']
195+
elapsed: 14.360427274
196+
rows_read: 10256384 -- 10.26 million
197+
total_rows_to_read: 10485760 -- 10.49 million
198+
total_size_bytes_compressed: 83779395 -- 83.78 million
199+
total_size_bytes_uncompressed: 10611691600 -- 10.61 billion
200+
bytes_read_uncompressed: 10440998912 -- 10.44 billion
201+
memory_usage: 89795477 -- 89.80 million
202+
peak_memory_usage: 107362133 -- 107.36 million
203+
204+
1 row in set. Elapsed: 0.014 sec.
205+
206+
arthur :) select * from system.part_log where event_type = 'ExportPart' order by event_time desc limit 1 format Vertical;
207+
208+
SELECT *
209+
FROM system.part_log
210+
WHERE event_type = 'ExportPart'
211+
ORDER BY event_time DESC
212+
LIMIT 1
213+
FORMAT Vertical
214+
215+
Query id: 95128b01-b751-4726-8e3e-320728ac6af7
216+
217+
Row 1:
218+
──────
219+
hostname: arthur
220+
query_id:
221+
event_type: ExportPart
222+
merge_reason: NotAMerge
223+
merge_algorithm: Undecided
224+
event_date: 2025-12-15
225+
event_time: 2025-12-15 13:13:03
226+
event_time_microseconds: 2025-12-15 13:13:03.197492
227+
duration_ms: 14673
228+
database: default
229+
table: big_table
230+
table_uuid: a3eeeea0-295c-41a3-84ef-6b5463dbbe8c
231+
part_name: 2025_0_32_3
232+
partition_id: 2025
233+
partition: 2025
234+
part_type: Wide
235+
disk_name: default
236+
path_on_disk: ./store/a3e/a3eeeea0-295c-41a3-84ef-6b5463dbbe8c/2025_0_32_3/
237+
remote_file_paths: ['replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.1.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.2.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.3.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.4.parquet']
238+
rows: 10485760 -- 10.49 million
239+
size_in_bytes: 83779395 -- 83.78 million
240+
merged_from: ['2025_0_32_3']
241+
bytes_uncompressed: 10611691600 -- 10.61 billion
242+
read_rows: 10485760 -- 10.49 million
243+
read_bytes: 10674503680 -- 10.67 billion
244+
peak_memory_usage: 107362133 -- 107.36 million
245+
error: 0
246+
exception:
247+
ProfileEvents: {}
248+
249+
1 row in set. Elapsed: 0.044 sec.
250+
251+
arthur :) select _path, formatReadableSize(_size) as _size from s3(s3_conn, filename='**', format=One);
252+
253+
SELECT
254+
_path,
255+
formatReadableSize(_size) AS _size
256+
FROM s3(s3_conn, filename = '**', format = One)
257+
258+
Query id: c48ae709-f590-4d1b-8158-191f8d628966
259+
260+
┌─_path────────────────────────────────────────────────────────────────────────────────┬─_size─────┐
261+
1. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.1.parquet │ 17.36 MiB │
262+
2. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.2.parquet │ 17.32 MiB │
263+
3. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.4.parquet │ 5.04 MiB │
264+
4. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.3.parquet │ 17.40 MiB │
265+
5. │ test/replicated_big/year=2025/commit_2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7 │ 320.00 B │
266+
└──────────────────────────────────────────────────────────────────────────────────────┴───────────┘
267+
268+
5 rows in set. Elapsed: 0.072 sec.
269+
```

src/Core/Settings.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6899,6 +6899,14 @@ Possible values:
68996899
- `` (empty value) - use session timezone
69006900
69016901
Default value is `UTC`.
6902+
)", 0) \
6903+
DECLARE(UInt64, export_merge_tree_part_max_bytes_per_file, 0, R"(
6904+
Maximum number of bytes to write to a single file when exporting a merge tree part. 0 means no limit.
6905+
This is not a hard limit, and it highly depends on the output format granularity and input source chunk size.
6906+
)", 0) \
6907+
DECLARE(UInt64, export_merge_tree_part_max_rows_per_file, 0, R"(
6908+
Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit.
6909+
This is not a hard limit, and it highly depends on the output format granularity and input source chunk size.
69026910
)", 0) \
69036911
\
69046912
/* ####################################################### */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
111111
{"os_threads_nice_value_query", 0, 0, "New setting."},
112112
{"os_threads_nice_value_materialized_view", 0, 0, "New setting."},
113113
{"os_thread_priority", 0, 0, "Alias for os_threads_nice_value_query."},
114+
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
115+
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
114116
});
115117
addSettingsChanges(settings_changes_history, "25.8",
116118
{

src/Interpreters/PartLog.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ ColumnsDescription PartLogElement::getColumnsDescription()
134134
{"part_type", std::make_shared<DataTypeString>(), "The type of the part. Possible values: Wide and Compact."},
135135
{"disk_name", std::make_shared<DataTypeString>(), "The disk name data part lies on."},
136136
{"path_on_disk", std::make_shared<DataTypeString>(), "Absolute path to the folder with data part files."},
137+
{"remote_file_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "In case of an export operation to remote storages, the file paths a given export generated"},
137138

138139
{"rows", std::make_shared<DataTypeUInt64>(), "The number of rows in the data part."},
139140
{"size_in_bytes", std::make_shared<DataTypeUInt64>(), "Size of the data part on disk in bytes."},
@@ -187,6 +188,12 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
187188
columns[i++]->insert(disk_name);
188189
columns[i++]->insert(path_on_disk);
189190

191+
Array remote_file_paths_array;
192+
remote_file_paths_array.reserve(remote_file_paths.size());
193+
for (const auto & remote_file_path : remote_file_paths)
194+
remote_file_paths_array.push_back(remote_file_path);
195+
columns[i++]->insert(remote_file_paths_array);
196+
190197
columns[i++]->insert(rows);
191198
columns[i++]->insert(bytes_compressed_on_disk);
192199

src/Interpreters/PartLog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ struct PartLogElement
7171
String partition;
7272
String disk_name;
7373
String path_on_disk;
74+
std::vector<String> remote_file_paths;
7475

7576
MergeTreeDataPartType part_type;
7677

src/Storages/ExportReplicatedMergeTreePartitionManifest.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry
6262
struct ExportReplicatedMergeTreePartitionProcessedPartEntry
6363
{
6464
String part_name;
65-
String path_in_destination;
65+
std::vector<String> paths_in_destination;
6666
String finished_by;
6767

6868
std::string toJsonString() const
6969
{
7070
Poco::JSON::Object json;
7171
json.set("part_name", part_name);
72-
json.set("path_in_destination", path_in_destination);
72+
json.set("paths_in_destination", paths_in_destination);
7373
json.set("finished_by", finished_by);
7474
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
7575
oss.exceptions(std::ios::failbit);
@@ -86,7 +86,11 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry
8686
ExportReplicatedMergeTreePartitionProcessedPartEntry entry;
8787

8888
entry.part_name = json->getValue<String>("part_name");
89-
entry.path_in_destination = json->getValue<String>("path_in_destination");
89+
90+
const auto paths_in_destination_array = json->getArray("paths_in_destination");
91+
for (size_t i = 0; i < paths_in_destination_array->size(); ++i)
92+
entry.paths_in_destination.emplace_back(paths_in_destination_array->getElement<String>(static_cast<unsigned int>(i)));
93+
9094
entry.finished_by = json->getValue<String>("finished_by");
9195

9296
return entry;
@@ -108,6 +112,8 @@ struct ExportReplicatedMergeTreePartitionManifest
108112
size_t max_threads;
109113
bool parallel_formatting;
110114
bool parquet_parallel_encoding;
115+
size_t max_bytes_per_file;
116+
size_t max_rows_per_file;
111117
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
112118

113119
std::string toJsonString() const
@@ -127,6 +133,8 @@ struct ExportReplicatedMergeTreePartitionManifest
127133
json.set("parallel_formatting", parallel_formatting);
128134
json.set("max_threads", max_threads);
129135
json.set("parquet_parallel_encoding", parquet_parallel_encoding);
136+
json.set("max_bytes_per_file", max_bytes_per_file);
137+
json.set("max_rows_per_file", max_rows_per_file);
130138
json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy)));
131139
json.set("create_time", create_time);
132140
json.set("max_retries", max_retries);
@@ -160,7 +168,8 @@ struct ExportReplicatedMergeTreePartitionManifest
160168
manifest.max_threads = json->getValue<size_t>("max_threads");
161169
manifest.parallel_formatting = json->getValue<bool>("parallel_formatting");
162170
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");
163-
171+
manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file");
172+
manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file");
164173
if (json->has("file_already_exists_policy"))
165174
{
166175
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));

src/Storages/IStorage.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,8 +466,10 @@ It is currently only implemented in StorageObjectStorage.
466466
virtual SinkToStoragePtr import(
467467
const std::string & /* file_name */,
468468
Block & /* block_with_partition_values */,
469-
std::string & /* destination_file_path */,
469+
const std::function<void(const std::string &)> & /* new_file_path_callback */,
470470
bool /* overwrite_if_exists */,
471+
std::size_t /* max_bytes_per_file */,
472+
std::size_t /* max_rows_per_file */,
471473
const std::optional<FormatSettings> & /* format_settings */,
472474
ContextPtr /* context */)
473475
{

src/Storages/MergeTree/ExportList.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ ExportsListElement::ExportsListElement(
88
const StorageID & destination_table_id_,
99
UInt64 part_size_,
1010
const String & part_name_,
11-
const String & target_file_name_,
11+
const std::vector<String> & destination_file_paths_,
1212
UInt64 total_rows_to_read_,
1313
UInt64 total_size_bytes_compressed_,
1414
UInt64 total_size_bytes_uncompressed_,
@@ -18,7 +18,7 @@ ExportsListElement::ExportsListElement(
1818
, destination_table_id(destination_table_id_)
1919
, part_size(part_size_)
2020
, part_name(part_name_)
21-
, destination_file_path(target_file_name_)
21+
, destination_file_paths(destination_file_paths_)
2222
, total_rows_to_read(total_rows_to_read_)
2323
, total_size_bytes_compressed(total_size_bytes_compressed_)
2424
, total_size_bytes_uncompressed(total_size_bytes_uncompressed_)
@@ -40,16 +40,21 @@ ExportInfo ExportsListElement::getInfo() const
4040
res.destination_database = destination_table_id.database_name;
4141
res.destination_table = destination_table_id.table_name;
4242
res.part_name = part_name;
43-
res.destination_file_path = destination_file_path;
44-
res.rows_read = rows_read;
43+
44+
{
45+
std::shared_lock lock(destination_file_paths_mutex);
46+
res.destination_file_paths = destination_file_paths;
47+
}
48+
49+
res.rows_read = rows_read.load(std::memory_order_relaxed);
4550
res.total_rows_to_read = total_rows_to_read;
4651
res.total_size_bytes_compressed = total_size_bytes_compressed;
4752
res.total_size_bytes_uncompressed = total_size_bytes_uncompressed;
48-
res.bytes_read_uncompressed = bytes_read_uncompressed;
53+
res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed);
4954
res.memory_usage = getMemoryUsage();
5055
res.peak_memory_usage = getPeakMemoryUsage();
5156
res.create_time = create_time;
52-
res.elapsed = elapsed;
57+
res.elapsed = watch.elapsedSeconds();
5358
return res;
5459
}
5560

src/Storages/MergeTree/ExportList.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <Common/ThreadStatus.h>
88
#include <Poco/URI.h>
99
#include <boost/noncopyable.hpp>
10+
#include <shared_mutex>
1011

1112
namespace CurrentMetrics
1213
{
@@ -23,7 +24,7 @@ struct ExportInfo
2324
String destination_database;
2425
String destination_table;
2526
String part_name;
26-
String destination_file_path;
27+
std::vector<String> destination_file_paths;
2728
UInt64 rows_read;
2829
UInt64 total_rows_to_read;
2930
UInt64 total_size_bytes_compressed;
@@ -41,24 +42,26 @@ struct ExportsListElement : private boost::noncopyable
4142
const StorageID destination_table_id;
4243
const UInt64 part_size;
4344
const String part_name;
44-
String destination_file_path;
45-
UInt64 rows_read {0};
45+
46+
/// see destination_file_paths_mutex
47+
std::vector<String> destination_file_paths;
48+
std::atomic<UInt64> rows_read {0};
4649
UInt64 total_rows_to_read {0};
4750
UInt64 total_size_bytes_compressed {0};
4851
UInt64 total_size_bytes_uncompressed {0};
49-
UInt64 bytes_read_uncompressed {0};
52+
std::atomic<UInt64> bytes_read_uncompressed {0};
5053
time_t create_time {0};
51-
Float64 elapsed {0};
5254

5355
Stopwatch watch;
5456
ThreadGroupPtr thread_group;
57+
mutable std::shared_mutex destination_file_paths_mutex;
5558

5659
ExportsListElement(
5760
const StorageID & source_table_id_,
5861
const StorageID & destination_table_id_,
5962
UInt64 part_size_,
6063
const String & part_name_,
61-
const String & destination_file_path_,
64+
const std::vector<String> & destination_file_paths_,
6265
UInt64 total_rows_to_read_,
6366
UInt64 total_size_bytes_compressed_,
6467
UInt64 total_size_bytes_uncompressed_,

0 commit comments

Comments
 (0)