Skip to content

Commit de9deb2

Browse files
committed
introduce file already exists policy setting
1 parent 7a12b3a commit de9deb2

File tree

9 files changed

+204
-24
lines changed

9 files changed

+204
-24
lines changed

src/Core/Settings.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6883,6 +6883,12 @@ Maximum number of retries for exporting a merge tree part in an export partition
68836883
DECLARE(UInt64, export_merge_tree_partition_manifest_ttl, 180, R"(
68846884
Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination.
68856885
This setting does not affect / delete in progress tasks. It'll only cleanup the completed ones.
6886+
)", 0) \
6887+
DECLARE(String, export_merge_tree_part_file_already_exists_policy, "NO_OP", R"(
6888+
Possible values:
6889+
- NO_OP - No-op if the file already exists - Default.
6890+
- ERROR - Throw an error if the file already exists.
6891+
- OVERWRITE - Overwrite the file
68866892
)", 0) \
68876893
\
68886894
/* ####################################################### */ \

src/Storages/ExportReplicatedMergeTreePartitionManifest.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <Poco/JSON/Object.h>
66
#include <Poco/JSON/Array.h>
77
#include <Poco/JSON/Parser.h>
8+
#include <Storages/MergeTree/MergeTreePartExportManifest.h>
89

910
namespace DB
1011
{
@@ -102,6 +103,7 @@ struct ExportReplicatedMergeTreePartitionManifest
102103
size_t max_threads;
103104
bool parallel_formatting;
104105
bool parquet_parallel_encoding;
106+
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
105107

106108
std::string toJsonString() const
107109
{
@@ -120,6 +122,7 @@ struct ExportReplicatedMergeTreePartitionManifest
120122
json.set("parallel_formatting", parallel_formatting);
121123
json.set("max_threads", max_threads);
122124
json.set("parquet_parallel_encoding", parquet_parallel_encoding);
125+
json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy)));
123126
json.set("create_time", create_time);
124127
json.set("max_retries", max_retries);
125128
json.set("ttl_seconds", ttl_seconds);
@@ -152,6 +155,18 @@ struct ExportReplicatedMergeTreePartitionManifest
152155
manifest.max_threads = json->getValue<size_t>("max_threads");
153156
manifest.parallel_formatting = json->getValue<bool>("parallel_formatting");
154157
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");
158+
159+
if (json->has("file_already_exists_policy"))
160+
{
161+
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
162+
if (file_already_exists_policy)
163+
{
164+
manifest.file_already_exists_policy = file_already_exists_policy.value();
165+
}
166+
167+
/// what to do if it's not a valid value?
168+
}
169+
155170
return manifest;
156171
}
157172
};

src/Storages/MergeTree/ExportList.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ struct ExportsListElement : private boost::noncopyable
4141
const StorageID destination_table_id;
4242
const UInt64 part_size;
4343
const String part_name;
44-
const String destination_file_path;
44+
String destination_file_path;
4545
UInt64 rows_read {0};
4646
UInt64 total_rows_to_read {0};
4747
UInt64 total_size_bytes_compressed {0};

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <QueryPipeline/QueryPipelineBuilder.h>
1313
#include "Common/Exception.h"
1414
#include <Common/ProfileEventsScope.h>
15+
#include "Storages/MergeTree/ExportList.h"
1516

1617
namespace ProfileEvents
1718
{
@@ -68,8 +69,19 @@ bool ExportPartTask::executeStep()
6869
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name);
6970
}
7071

72+
auto exports_list_entry = storage.getContext()->getExportsList().insert(
73+
getStorageID(),
74+
manifest.destination_storage_id,
75+
manifest.data_part->getBytesOnDisk(),
76+
manifest.data_part->name,
77+
"not_computed_yet",
78+
manifest.data_part->rows_count,
79+
manifest.data_part->getBytesOnDisk(),
80+
manifest.data_part->getBytesUncompressedOnDisk(),
81+
manifest.create_time,
82+
local_context);
83+
7184
SinkToStoragePtr sink;
72-
std::string destination_file_path;
7385

7486
try
7587
{
@@ -81,18 +93,44 @@ bool ExportPartTask::executeStep()
8193
sink = destination_storage->import(
8294
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
8395
block_with_partition_values,
84-
destination_file_path,
85-
manifest.overwrite_file_if_exists,
96+
(*exports_list_entry)->destination_file_path,
97+
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::OVERWRITE,
8698
context_copy);
8799
}
88100
catch (const Exception & e)
89101
{
90-
tryLogCurrentException(__PRETTY_FUNCTION__);
91102
if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS)
92103
{
93104
ProfileEvents::increment(ProfileEvents::PartsExportDuplicated);
105+
106+
/// File already exists and the policy is NO_OP, treat it as success.
107+
if (manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::NO_OP)
108+
{
109+
storage.writePartLog(
110+
PartLogElement::Type::EXPORT_PART,
111+
{},
112+
static_cast<UInt64>((*exports_list_entry)->elapsed * 1000000000),
113+
manifest.data_part->name,
114+
manifest.data_part,
115+
{manifest.data_part},
116+
nullptr,
117+
nullptr,
118+
exports_list_entry.get());
119+
120+
std::lock_guard inner_lock(storage.export_manifests_mutex);
121+
storage.export_manifests.erase(manifest);
122+
123+
ProfileEvents::increment(ProfileEvents::PartsExports);
124+
ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast<UInt64>((*exports_list_entry)->elapsed * 1000));
125+
126+
if (manifest.completion_callback)
127+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_path));
128+
return false;
129+
}
94130
}
95131

132+
tryLogCurrentException(__PRETTY_FUNCTION__);
133+
96134
ProfileEvents::increment(ProfileEvents::PartsExportFailures);
97135

98136
std::lock_guard inner_lock(storage.export_manifests_mutex);
@@ -139,17 +177,6 @@ bool ExportPartTask::executeStep()
139177
local_context,
140178
getLogger("ExportPartition"));
141179

142-
auto exports_list_entry = storage.getContext()->getExportsList().insert(
143-
getStorageID(),
144-
manifest.destination_storage_id,
145-
manifest.data_part->getBytesOnDisk(),
146-
manifest.data_part->name,
147-
destination_file_path,
148-
manifest.data_part->rows_count,
149-
manifest.data_part->getBytesOnDisk(),
150-
manifest.data_part->getBytesUncompressedOnDisk(),
151-
manifest.create_time,
152-
local_context);
153180

154181
ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, "");
155182

@@ -198,7 +225,7 @@ bool ExportPartTask::executeStep()
198225
ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast<UInt64>((*exports_list_entry)->elapsed * 1000));
199226

200227
if (manifest.completion_callback)
201-
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess(destination_file_path));
228+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_path));
202229
}
203230
catch (const Exception & e)
204231
{

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <Interpreters/DatabaseCatalog.h>
55
#include "Common/ZooKeeper/Types.h"
66
#include "Storages/MergeTree/ExportPartitionUtils.h"
7+
#include "Storages/MergeTree/MergeTreePartExportManifest.h"
78

89

910
namespace DB
@@ -17,6 +18,7 @@ namespace
1718
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
1819
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
1920
context_copy->setSetting("max_threads", manifest.max_threads);
21+
context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy)));
2022
return context_copy;
2123
}
2224
}

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ namespace Setting
212212
extern const SettingsBool apply_patch_parts;
213213
extern const SettingsBool allow_experimental_export_merge_tree_part;
214214
extern const SettingsUInt64 min_bytes_to_use_direct_io;
215-
extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists;
215+
extern const SettingsString export_merge_tree_part_file_already_exists_policy;
216216
extern const SettingsBool output_format_parallel_formatting;
217217
extern const SettingsBool output_format_parquet_parallel_encoding;
218218
}
@@ -6253,11 +6253,20 @@ void MergeTreeData::exportPartToTable(
62536253
part_name, getStorageID().getFullTableName());
62546254

62556255
{
6256+
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(Poco::toUpper(query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value));
6257+
if (!file_already_exists_policy)
6258+
{
6259+
throw Exception(
6260+
ErrorCodes::INVALID_SETTING_VALUE,
6261+
"Invalid value for setting 'export_merge_tree_part_file_already_exists_policy': {}",
6262+
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value);
6263+
}
6264+
62566265
MergeTreePartExportManifest manifest(
62576266
dest_storage->getStorageID(),
62586267
part,
62596268
transaction_id,
6260-
query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists],
6269+
*file_already_exists_policy,
62616270
query_context->getSettingsRef()[Setting::output_format_parallel_formatting],
62626271
query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding],
62636272
query_context->getSettingsRef()[Setting::max_threads],

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ class ExportPartTask;
1111

1212
struct MergeTreePartExportManifest
1313
{
14+
enum class FileAlreadyExistsPolicy
15+
{
16+
NO_OP,
17+
ERROR,
18+
OVERWRITE,
19+
};
20+
1421
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
1522

1623
struct CompletionCallbackResult
@@ -39,15 +46,15 @@ struct MergeTreePartExportManifest
3946
const StorageID & destination_storage_id_,
4047
const DataPartPtr & data_part_,
4148
const String & query_id_,
42-
bool overwrite_file_if_exists_,
49+
FileAlreadyExistsPolicy file_already_exists_policy_,
4350
bool parallel_formatting_,
4451
bool parquet_parallel_encoding_,
4552
std::size_t max_threads_,
4653
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
4754
: destination_storage_id(destination_storage_id_),
4855
data_part(data_part_),
4956
query_id(query_id_),
50-
overwrite_file_if_exists(overwrite_file_if_exists_),
57+
file_already_exists_policy(file_already_exists_policy_),
5158
parallel_formatting(parallel_formatting_),
5259
parquet_parallel_encoding(parquet_parallel_encoding_),
5360
max_threads(max_threads_),
@@ -58,7 +65,7 @@ struct MergeTreePartExportManifest
5865
DataPartPtr data_part;
5966
/// Used for killing the export.
6067
String query_id;
61-
bool overwrite_file_if_exists;
68+
FileAlreadyExistsPolicy file_already_exists_policy;
6269
bool parallel_formatting;
6370
/// parquet has a different setting for parallel formatting
6471
bool parquet_parallel_encoding;

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ namespace Setting
198198
extern const SettingsBool output_format_parallel_formatting;
199199
extern const SettingsBool output_format_parquet_parallel_encoding;
200200
extern const SettingsMaxThreads max_threads;
201+
extern const SettingsString export_merge_tree_part_file_already_exists_policy;
201202
}
202203

203204
namespace MergeTreeSetting
@@ -300,6 +301,7 @@ namespace ErrorCodes
300301
extern const int FAULT_INJECTED;
301302
extern const int CANNOT_FORGET_PARTITION;
302303
extern const int TIMEOUT_EXCEEDED;
304+
extern const int INVALID_SETTING_VALUE;
303305
}
304306

305307
namespace ActionLocks
@@ -8159,6 +8161,18 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
81598161
manifest.parallel_formatting = query_context->getSettingsRef()[Setting::output_format_parallel_formatting];
81608162
manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding];
81618163

8164+
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value);
8165+
8166+
if (!file_already_exists_policy)
8167+
{
8168+
throw Exception(
8169+
ErrorCodes::INVALID_SETTING_VALUE,
8170+
"Invalid value for setting 'export_merge_tree_part_file_already_exists_policy': {}",
8171+
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value);
8172+
}
8173+
8174+
manifest.file_already_exists_policy = file_already_exists_policy.value();
8175+
81628176
ops.emplace_back(zkutil::makeCreateRequest(
81638177
fs::path(partition_exports_path) / "metadata.json",
81648178
manifest.toJsonString(),

0 commit comments

Comments
 (0)