Skip to content

Commit d09fb0b

Browse files
authored
Merge pull request #1124 from Altinity/export_replicated_mt_partition_v2
2 parents 19d5821 + 6b9870a commit d09fb0b

File tree

64 files changed

+3520
-235
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+3520
-235
lines changed

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ enum class AccessType : uint8_t
211211
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
212212
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
213213
M(ALTER_EXPORT_PART, "ALTER EXPORT PART, EXPORT PART", TABLE, ALTER_TABLE) \
214+
M(ALTER_EXPORT_PARTITION, "ALTER EXPORT PARTITION, EXPORT PARTITION", TABLE, ALTER_TABLE) \
214215
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
215216
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
216217
M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \

src/Core/ServerSettings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
11441144
DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \
11451145
DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0) \
11461146
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
1147+
DECLARE(Bool, enable_experimental_export_merge_tree_partition_feature, false, "Enable export replicated merge tree partition feature. It is experimental and not yet ready for production use.", 0) \
11471148
// clang-format on
11481149

11491150
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below

src/Core/Settings.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6873,6 +6873,22 @@ Use roaring bitmap for iceberg positional deletes.
68736873
)", 0) \
68746874
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
68756875
Overwrite file if it already exists when exporting a merge tree part
6876+
)", 0) \
6877+
DECLARE(Bool, export_merge_tree_partition_force_export, false, R"(
6878+
Ignore existing partition export and overwrite the zookeeper entry
6879+
)", 0) \
6880+
DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"(
6881+
Maximum number of retries for exporting a merge tree part in an export partition task
6882+
)", 0) \
6883+
DECLARE(UInt64, export_merge_tree_partition_manifest_ttl, 180, R"(
6884+
Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination.
6885+
This setting does not affect / delete in progress tasks. It'll only cleanup the completed ones.
6886+
)", 0) \
6887+
DECLARE(MergeTreePartExportFileAlreadyExistsPolicy, export_merge_tree_part_file_already_exists_policy, MergeTreePartExportFileAlreadyExistsPolicy::skip, R"(
6888+
Possible values:
6889+
- skip - Skip the file if it already exists.
6890+
- error - Throw an error if the file already exists.
6891+
- overwrite - Overwrite the file.
68766892
)", 0) \
68776893
DECLARE(Timezone, iceberg_timezone_for_timestamptz, "UTC", R"(
68786894
Timezone for Iceberg timestamptz field.

src/Core/Settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class WriteBuffer;
8181
M(CLASS_NAME, LogsLevel) \
8282
M(CLASS_NAME, Map) \
8383
M(CLASS_NAME, MaxThreads) \
84+
M(CLASS_NAME, MergeTreePartExportFileAlreadyExistsPolicy) \
8485
M(CLASS_NAME, Milliseconds) \
8586
M(CLASS_NAME, MsgPackUUIDRepresentation) \
8687
M(CLASS_NAME, MySQLDataTypesSupport) \

src/Core/SettingsChangesHistory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
4949
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
5050
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
5151
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
52+
{"export_merge_tree_partition_force_export", false, false, "New setting."},
53+
{"export_merge_tree_partition_max_retries", 3, 3, "New setting."},
54+
{"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."},
55+
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
5256
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
5357
});
5458
addSettingsChanges(settings_changes_history, "25.8",

src/Core/SettingsEnums.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,4 +370,6 @@ IMPLEMENT_SETTING_ENUM(
370370
{"manifest_list_entry", IcebergMetadataLogLevel::ManifestListEntry},
371371
{"manifest_file_metadata", IcebergMetadataLogLevel::ManifestFileMetadata},
372372
{"manifest_file_entry", IcebergMetadataLogLevel::ManifestFileEntry}})
373+
374+
IMPLEMENT_SETTING_AUTO_ENUM(MergeTreePartExportFileAlreadyExistsPolicy, ErrorCodes::BAD_ARGUMENTS);
373375
}

src/Core/SettingsEnums.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,4 +480,14 @@ enum class IcebergMetadataLogLevel : uint8_t
480480
};
481481

482482
DECLARE_SETTING_ENUM(IcebergMetadataLogLevel)
483+
484+
enum class MergeTreePartExportFileAlreadyExistsPolicy : uint8_t
485+
{
486+
skip,
487+
error,
488+
overwrite,
489+
};
490+
491+
DECLARE_SETTING_ENUM(MergeTreePartExportFileAlreadyExistsPolicy)
492+
483493
}

src/Functions/generateSnowflakeID.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ uint64_t generateSnowflakeID()
154154
return fromSnowflakeId(snowflake_id);
155155
}
156156

157+
std::string generateSnowflakeIDString()
158+
{
159+
return std::to_string(generateSnowflakeID());
160+
}
161+
157162
class FunctionGenerateSnowflakeID : public IFunction
158163
{
159164
public:

src/Functions/generateSnowflakeID.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ namespace DB
77

88
uint64_t generateSnowflakeID();
99

10+
std::string generateSnowflakeIDString();
11+
1012
}

src/Interpreters/InterpreterAlterQuery.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
545545
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
546546
break;
547547
}
548+
case ASTAlterCommand::EXPORT_PARTITION:
549+
{
550+
required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION, database, table);
551+
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
552+
break;
553+
}
548554
case ASTAlterCommand::FETCH_PARTITION:
549555
{
550556
required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table);

0 commit comments

Comments
 (0)