Skip to content

Commit c6eee2c

Browse files
committed
update partition export
1 parent c4e0888 commit c6eee2c

File tree

6 files changed

+32
-25
lines changed

6 files changed

+32
-25
lines changed

src/Storages/ExportReplicatedMergeTreePartitionManifest.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry
6262
struct ExportReplicatedMergeTreePartitionProcessedPartEntry
6363
{
6464
String part_name;
65-
String path_in_destination;
65+
std::vector<String> paths_in_destination;
6666
String finished_by;
6767

6868
std::string toJsonString() const
6969
{
7070
Poco::JSON::Object json;
7171
json.set("part_name", part_name);
72-
json.set("path_in_destination", path_in_destination);
72+
json.set("paths_in_destination", paths_in_destination);
7373
json.set("finished_by", finished_by);
7474
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
7575
oss.exceptions(std::ios::failbit);
@@ -86,7 +86,11 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry
8686
ExportReplicatedMergeTreePartitionProcessedPartEntry entry;
8787

8888
entry.part_name = json->getValue<String>("part_name");
89-
entry.path_in_destination = json->getValue<String>("path_in_destination");
89+
90+
const auto paths_in_destination_array = json->getArray("paths_in_destination");
91+
for (size_t i = 0; i < paths_in_destination_array->size(); ++i)
92+
entry.paths_in_destination.emplace_back(paths_in_destination_array->getElement<String>(static_cast<unsigned int>(i)));
93+
9094
entry.finished_by = json->getValue<String>("finished_by");
9195

9296
return entry;

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ bool ExportPartTask::executeStep()
196196
ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, (*exports_list_entry)->watch.elapsedMilliseconds());
197197

198198
if (manifest.completion_callback)
199-
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_paths.front()));
199+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_paths));
200200
}
201201
catch (const Exception & e)
202202
{
@@ -226,12 +226,7 @@ bool ExportPartTask::executeStep()
226226

227227
if (manifest.completion_callback)
228228
{
229-
if ((*exports_list_entry)->destination_file_paths.empty())
230-
{
231-
throw Exception(ErrorCodes::LOGICAL_ERROR, "No destination file paths found for part {}", manifest.data_part->name);
232-
}
233-
234-
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_paths.front()));
229+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_paths));
235230
}
236231

237232
return false;

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ void ExportPartitionTaskScheduler::handlePartExportCompletion(
175175

176176
if (result.success)
177177
{
178-
handlePartExportSuccess(manifest, destination_storage, processing_parts_path, processed_part_path, part_name, export_path, zk, result.relative_path_in_destination_storage);
178+
handlePartExportSuccess(manifest, destination_storage, processing_parts_path, processed_part_path, part_name, export_path, zk, result.relative_paths_in_destination_storage);
179179
}
180180
else
181181
{
@@ -191,12 +191,17 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess(
191191
const std::string & part_name,
192192
const std::filesystem::path & export_path,
193193
const zkutil::ZooKeeperPtr & zk,
194-
const String & relative_path_in_destination_storage
194+
const std::vector<String> & relative_paths_in_destination_storage
195195
)
196196
{
197-
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} exported successfully", relative_path_in_destination_storage);
197+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} exported successfully, paths size: {}", part_name, relative_paths_in_destination_storage.size());
198198

199-
if (!tryToMovePartToProcessed(export_path, processing_parts_path, processed_part_path, part_name, relative_path_in_destination_storage, zk))
199+
for (const auto & relative_path_in_destination_storage : relative_paths_in_destination_storage)
200+
{
201+
LOG_INFO(storage.log, "ExportPartition scheduler task: {}", relative_path_in_destination_storage);
202+
}
203+
204+
if (!tryToMovePartToProcessed(export_path, processing_parts_path, processed_part_path, part_name, relative_paths_in_destination_storage, zk))
200205
{
201206
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to move part to processed, will not commit export partition");
202207
return;
@@ -323,7 +328,7 @@ bool ExportPartitionTaskScheduler::tryToMovePartToProcessed(
323328
const std::filesystem::path & processing_parts_path,
324329
const std::filesystem::path & processed_part_path,
325330
const std::string & part_name,
326-
const String & relative_path_in_destination_storage,
331+
const std::vector<String> & relative_paths_in_destination_storage,
327332
const zkutil::ZooKeeperPtr & zk
328333
)
329334
{
@@ -348,7 +353,7 @@ bool ExportPartitionTaskScheduler::tryToMovePartToProcessed(
348353

349354
ExportReplicatedMergeTreePartitionProcessedPartEntry processed_part_entry;
350355
processed_part_entry.part_name = part_name;
351-
processed_part_entry.path_in_destination = relative_path_in_destination_storage;
356+
processed_part_entry.paths_in_destination = relative_paths_in_destination_storage;
352357
processed_part_entry.finished_by = storage.replica_name;
353358

354359
requests.emplace_back(zkutil::makeRemoveRequest(processing_parts_path / part_name, -1));

src/Storages/MergeTree/ExportPartitionTaskScheduler.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ExportPartitionTaskScheduler
3737
const std::string & part_name,
3838
const std::filesystem::path & export_path,
3939
const zkutil::ZooKeeperPtr & zk,
40-
const String & relative_path_in_destination_storage
40+
const std::vector<String> & relative_paths_in_destination_storage
4141
);
4242

4343
void handlePartExportFailure(
@@ -53,7 +53,7 @@ class ExportPartitionTaskScheduler
5353
const std::filesystem::path & processing_parts_path,
5454
const std::filesystem::path & processed_part_path,
5555
const std::string & part_name,
56-
const String & relative_path_in_destination_storage,
56+
const std::vector<String> & relative_paths_in_destination_storage,
5757
const zkutil::ZooKeeperPtr & zk
5858
);
5959

src/Storages/MergeTree/ExportPartitionUtils.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ namespace ExportPartitionUtils
5555

5656
const auto processed_part_entry = ExportReplicatedMergeTreePartitionProcessedPartEntry::fromJsonString(responses[i].data);
5757

58-
exported_paths.emplace_back(processed_part_entry.path_in_destination);
58+
for (const auto & path_in_destination : processed_part_entry.paths_in_destination)
59+
{
60+
exported_paths.emplace_back(path_in_destination);
61+
}
5962
}
6063

6164
return exported_paths;

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,22 @@ struct MergeTreePartExportManifest
2323
struct CompletionCallbackResult
2424
{
2525
private:
26-
CompletionCallbackResult(bool success_, const String & relative_path_in_destination_storage_, std::optional<Exception> exception_)
27-
: success(success_), relative_path_in_destination_storage(relative_path_in_destination_storage_), exception(std::move(exception_)) {}
26+
CompletionCallbackResult(bool success_, const std::vector<String> & relative_paths_in_destination_storage_, std::optional<Exception> exception_)
27+
: success(success_), relative_paths_in_destination_storage(relative_paths_in_destination_storage_), exception(std::move(exception_)) {}
2828
public:
2929

30-
static CompletionCallbackResult createSuccess(const String & relative_path_in_destination_storage_)
30+
static CompletionCallbackResult createSuccess(const std::vector<String> & relative_paths_in_destination_storage_)
3131
{
32-
return CompletionCallbackResult(true, relative_path_in_destination_storage_, std::nullopt);
32+
return CompletionCallbackResult(true, relative_paths_in_destination_storage_, std::nullopt);
3333
}
3434

3535
static CompletionCallbackResult createFailure(Exception exception_)
3636
{
37-
return CompletionCallbackResult(false, "", std::move(exception_));
37+
return CompletionCallbackResult(false, {}, std::move(exception_));
3838
}
3939

4040
bool success = false;
41-
String relative_path_in_destination_storage;
41+
std::vector<String> relative_paths_in_destination_storage;
4242
std::optional<Exception> exception;
4343
};
4444

0 commit comments

Comments
 (0)