Skip to content

Commit c09470d

Browse files
alexey-milovidovzvonand
authored andcommitted
Merge pull request ClickHouse#84011 from ClickHouse/sync-disks
Implement AWS S3 authentication with an explicitly provided IAM role; implement OAuth for GCS.
1 parent 50016bc commit c09470d

File tree

102 files changed

+3531
-226
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+3531
-226
lines changed

src/Backups/BackupCoordinationReplicatedTables.cpp

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#include <Backups/BackupCoordinationReplicatedTables.h>
22
#include <Storages/MergeTree/MergeTreePartInfo.h>
33
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
4+
#if CLICKHOUSE_CLOUD
5+
#include <Storages/SharedMergeTree/SharedMergeTreeMutationEntry.h>
6+
#endif
47
#include <Common/Exception.h>
58

69
#include <boost/range/adaptor/map.hpp>
@@ -292,13 +295,6 @@ void BackupCoordinationReplicatedTables::prepare() const
292295
auto part_info = MergeTreePartInfo::fromPartName(part_name, MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
293296
[[maybe_unused]] const auto & partition_id = part_info.getPartitionId();
294297

295-
auto & min_data_versions_by_partition = table_info.min_data_versions_by_partition;
296-
auto it2 = min_data_versions_by_partition.find(partition_id);
297-
if (it2 == min_data_versions_by_partition.end())
298-
min_data_versions_by_partition[partition_id] = part_info.getDataVersion();
299-
else
300-
it2->second = std::min(it2->second, part_info.getDataVersion());
301-
302298
table_info.covered_parts_finder->addPartInfo(std::move(part_info), part_replicas.replica_names[0]);
303299
}
304300

@@ -310,24 +306,6 @@ void BackupCoordinationReplicatedTables::prepare() const
310306
const auto & chosen_replica_name = *part_replicas.replica_names[chosen_index];
311307
table_info.part_names_by_replica_name[chosen_replica_name].push_back(part_name);
312308
}
313-
314-
/// Remove finished or unrelated mutations.
315-
std::unordered_map<String, String> unfinished_mutations;
316-
for (const auto & [mutation_id, mutation_entry_str] : table_info.mutations)
317-
{
318-
auto mutation_entry = ReplicatedMergeTreeMutationEntry::parse(mutation_entry_str, mutation_id);
319-
std::map<String, Int64> new_block_numbers;
320-
for (const auto & [partition_id, block_number] : mutation_entry.block_numbers)
321-
{
322-
auto it = table_info.min_data_versions_by_partition.find(partition_id);
323-
if ((it != table_info.min_data_versions_by_partition.end()) && (it->second < block_number))
324-
new_block_numbers[partition_id] = block_number;
325-
}
326-
mutation_entry.block_numbers = std::move(new_block_numbers);
327-
if (!mutation_entry.block_numbers.empty())
328-
unfinished_mutations[mutation_id] = mutation_entry.toString();
329-
}
330-
table_info.mutations = unfinished_mutations;
331309
}
332310
catch (Exception & e)
333311
{

src/Backups/BackupCoordinationReplicatedTables.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ class BackupCoordinationReplicatedTables
104104
std::map<String /* part_name */, PartReplicas> replicas_by_part_name; /// Should be ordered because we need this map to be in the same order on every replica.
105105
mutable std::unordered_map<String /* replica_name> */, Strings> part_names_by_replica_name;
106106
std::unique_ptr<CoveredPartsFinder> covered_parts_finder;
107-
mutable std::unordered_map<String, Int64> min_data_versions_by_partition;
108107
mutable std::unordered_map<String, String> mutations;
109108
String replica_name_to_store_mutations;
110109
std::unordered_set<String> data_paths;

src/Backups/BackupEntriesCollector.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ namespace Setting
4545
extern const SettingsUInt64 backup_restore_keeper_retry_max_backoff_ms;
4646
extern const SettingsUInt64 backup_restore_keeper_max_retries;
4747
extern const SettingsSeconds lock_acquire_timeout;
48+
49+
/// Cloud only
50+
extern const SettingsBool cloud_mode;
4851
}
4952

5053
namespace ErrorCodes
@@ -112,7 +115,10 @@ BackupEntriesCollector::BackupEntriesCollector(
112115
context->getConfigRef().getUInt64("backups.min_sleep_before_next_attempt_to_collect_metadata", 100))
113116
, max_sleep_before_next_attempt_to_collect_metadata(
114117
context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000))
115-
, compare_collected_metadata(context->getConfigRef().getBool("backups.compare_collected_metadata", true))
118+
, compare_collected_metadata(
119+
context->getConfigRef().getBool("backups.compare_collected_metadata",
120+
!context->getSettingsRef()[Setting::cloud_mode])) /// Collected metadata shouldn't be compared by default in our Cloud
121+
/// (because in the Cloud only Replicated databases are used)
116122
, log(getLogger("BackupEntriesCollector"))
117123
, zookeeper_retries_info(
118124
context->getSettingsRef()[Setting::backup_restore_keeper_max_retries],

src/Backups/BackupEntryWrappedWith.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ class BackupEntryWrappedWith : public IBackupEntry
2323
bool isEncryptedByDisk() const override { return entry->isEncryptedByDisk(); }
2424
bool isFromFile() const override { return entry->isFromFile(); }
2525
bool isFromImmutableFile() const override { return entry->isFromImmutableFile(); }
26+
bool isFromRemoteFile() const override { return entry->isFromRemoteFile(); }
27+
String getEndpointURI() const override { return entry->getEndpointURI(); }
28+
String getNamespace() const override { return entry->getNamespace(); }
29+
String getRemotePath() const override { return entry->getRemotePath(); }
2630
String getFilePath() const override { return entry->getFilePath(); }
2731
DiskPtr getDisk() const override { return entry->getDisk(); }
2832

src/Backups/BackupFileInfo.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ BackupFileInfo buildFileInfoForBackupEntry(
124124
info.size = backup_entry->getSize();
125125
info.encrypted_by_disk = backup_entry->isEncryptedByDisk();
126126

127+
if (backup_entry->isFromRemoteFile())
128+
{
129+
info.object_key = backup_entry->getRemotePath();
130+
return info;
131+
}
132+
127133
/// We don't set `info.data_file_name` and `info.data_file_index` in this function because they're set during backup coordination
128134
/// (see the class BackupCoordinationFileInfos).
129135

src/Backups/BackupIO_AzureBlobStorage.cpp

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include <Poco/Util/AbstractConfiguration.h>
1515
#include <azure/storage/blobs/blob_options.hpp>
16+
#include <azure/core/context.hpp>
1617

1718
#include <filesystem>
1819

@@ -65,6 +66,16 @@ static bool compareAuthMethod (AzureBlobStorage::AuthMethod auth_method_a, Azure
6566
Azure::Core::Credentials::TokenRequestContext tokenRequestContext;
6667
return managed_identity_a->get()->GetToken(tokenRequestContext, {}).Token == managed_identity_b->get()->GetToken(tokenRequestContext, {}).Token;
6768
}
69+
70+
const auto * static_credential_a = std::get_if<std::shared_ptr<AzureBlobStorage::StaticCredential>>(&auth_method_a);
71+
const auto * static_credential_b = std::get_if<std::shared_ptr<AzureBlobStorage::StaticCredential>>(&auth_method_b);
72+
73+
if (static_credential_a && static_credential_b)
74+
{
75+
Azure::Core::Credentials::TokenRequestContext tokenRequestContext;
76+
auto az_context = Azure::Core::Context();
77+
return static_credential_a->get()->GetToken(tokenRequestContext, az_context).Token == static_credential_b->get()->GetToken(tokenRequestContext, az_context).Token;
78+
}
6879
}
6980
catch (const Azure::Core::Credentials::AuthenticationException & e)
7081
{
@@ -83,7 +94,7 @@ BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
8394
const WriteSettings & write_settings_,
8495
const ContextPtr & context_)
8596
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderAzureBlobStorage"))
86-
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false}
97+
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false, ""}
8798
, connection_params(connection_params_)
8899
, blob_path(blob_path_)
89100
{
@@ -95,8 +106,10 @@ BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
95106
connection_params.auth_method,
96107
std::move(client_ptr),
97108
std::move(settings_ptr),
109+
connection_params,
98110
connection_params.getContainer(),
99-
connection_params.getConnectionURL());
111+
connection_params.getConnectionURL(),
112+
/*common_key_prefix*/ "");
100113

101114
client = object_storage->getAzureBlobStorageClient();
102115
settings = object_storage->getSettings();
@@ -177,7 +190,7 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
177190
const ContextPtr & context_,
178191
bool attempt_to_create_container)
179192
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterAzureBlobStorage"))
180-
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false}
193+
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false, ""}
181194
, connection_params(connection_params_)
182195
, blob_path(blob_path_)
183196
{
@@ -192,8 +205,10 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
192205
connection_params.auth_method,
193206
std::move(client_ptr),
194207
std::move(settings_ptr),
208+
connection_params,
195209
connection_params.getContainer(),
196-
connection_params.getConnectionURL());
210+
connection_params.getConnectionURL(),
211+
/*common_key_prefix*/ "");
197212

198213

199214
client = object_storage->getAzureBlobStorageClient();

src/Backups/BackupIO_S3.cpp

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ namespace S3AuthSetting
4646
extern const S3AuthSettingsString server_side_encryption_customer_key_base64;
4747
extern const S3AuthSettingsBool use_environment_credentials;
4848
extern const S3AuthSettingsBool use_insecure_imds_request;
49+
50+
extern const S3AuthSettingsString role_arn;
51+
extern const S3AuthSettingsString role_session_name;
52+
extern const S3AuthSettingsString http_client;
53+
extern const S3AuthSettingsString service_account;
54+
extern const S3AuthSettingsString metadata_service;
55+
extern const S3AuthSettingsString request_token_path;
4956
}
5057

5158
namespace S3RequestSetting
@@ -69,6 +76,8 @@ namespace
6976
const S3::URI & s3_uri,
7077
const String & access_key_id,
7178
const String & secret_access_key,
79+
String role_arn,
80+
String role_session_name,
7281
const S3Settings & settings,
7382
const ContextPtr & context)
7483
{
@@ -84,6 +93,12 @@ namespace
8493
const Settings & global_settings = context->getGlobalContext()->getSettingsRef();
8594
const Settings & local_settings = context->getSettingsRef();
8695

96+
if (role_arn.empty())
97+
{
98+
role_arn = settings.auth_settings[S3AuthSetting::role_arn];
99+
role_session_name = settings.auth_settings[S3AuthSetting::role_session_name];
100+
}
101+
87102
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
88103
settings.auth_settings[S3AuthSetting::region],
89104
context->getRemoteHostFilter(),
@@ -108,6 +123,11 @@ namespace
108123
client_configuration.http_max_field_name_size = request_settings[S3RequestSetting::http_max_field_name_size];
109124
client_configuration.http_max_field_value_size = request_settings[S3RequestSetting::http_max_field_value_size];
110125

126+
client_configuration.http_client = settings.auth_settings[S3AuthSetting::http_client];
127+
client_configuration.service_account = settings.auth_settings[S3AuthSetting::service_account];
128+
client_configuration.metadata_service = settings.auth_settings[S3AuthSetting::metadata_service];
129+
client_configuration.request_token_path = settings.auth_settings[S3AuthSetting::request_token_path];
130+
111131
S3::ClientSettings client_settings{
112132
.use_virtual_addressing = s3_uri.is_virtual_hosted_style,
113133
.disable_checksum = local_settings[Setting::s3_disable_checksum],
@@ -128,7 +148,10 @@ namespace
128148
settings.auth_settings[S3AuthSetting::use_environment_credentials],
129149
settings.auth_settings[S3AuthSetting::use_insecure_imds_request],
130150
settings.auth_settings[S3AuthSetting::expiration_window_seconds],
131-
settings.auth_settings[S3AuthSetting::no_sign_request]
151+
settings.auth_settings[S3AuthSetting::no_sign_request],
152+
std::move(role_arn),
153+
std::move(role_session_name),
154+
/*sts_endpoint_override=*/""
132155
});
133156
}
134157

@@ -150,14 +173,16 @@ BackupReaderS3::BackupReaderS3(
150173
const S3::URI & s3_uri_,
151174
const String & access_key_id_,
152175
const String & secret_access_key_,
176+
const String & role_arn,
177+
const String & role_session_name,
153178
bool allow_s3_native_copy,
154179
const ReadSettings & read_settings_,
155180
const WriteSettings & write_settings_,
156181
const ContextPtr & context_,
157182
bool is_internal_backup)
158183
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderS3"))
159184
, s3_uri(s3_uri_)
160-
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
185+
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false, ""}
161186
{
162187
s3_settings.loadFromConfig(context_->getConfigRef(), "s3", context_->getSettingsRef());
163188

@@ -170,7 +195,7 @@ BackupReaderS3::BackupReaderS3(
170195
s3_settings.request_settings.updateFromSettings(context_->getSettingsRef(), /* if_changed */true);
171196
s3_settings.request_settings[S3RequestSetting::allow_native_copy] = allow_s3_native_copy;
172197

173-
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
198+
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, role_arn, role_session_name, s3_settings, context_);
174199

175200
if (auto blob_storage_system_log = context_->getBlobStorageLog())
176201
blob_storage_log = std::make_shared<BlobStorageLogWriter>(blob_storage_system_log);
@@ -247,6 +272,8 @@ BackupWriterS3::BackupWriterS3(
247272
const S3::URI & s3_uri_,
248273
const String & access_key_id_,
249274
const String & secret_access_key_,
275+
const String & role_arn,
276+
const String & role_session_name,
250277
bool allow_s3_native_copy,
251278
const String & storage_class_name,
252279
const ReadSettings & read_settings_,
@@ -255,7 +282,7 @@ BackupWriterS3::BackupWriterS3(
255282
bool is_internal_backup)
256283
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterS3"))
257284
, s3_uri(s3_uri_)
258-
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
285+
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false, ""}
259286
, s3_capabilities(getCapabilitiesFromConfig(context_->getConfigRef(), "s3"))
260287
{
261288
s3_settings.loadFromConfig(context_->getConfigRef(), "s3", context_->getSettingsRef());
@@ -270,7 +297,8 @@ BackupWriterS3::BackupWriterS3(
270297
s3_settings.request_settings[S3RequestSetting::allow_native_copy] = allow_s3_native_copy;
271298
s3_settings.request_settings[S3RequestSetting::storage_class_name] = storage_class_name;
272299

273-
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
300+
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, role_arn, role_session_name, s3_settings, context_);
301+
274302
if (auto blob_storage_system_log = context_->getBlobStorageLog())
275303
{
276304
blob_storage_log = std::make_shared<BlobStorageLogWriter>(blob_storage_system_log);

src/Backups/BackupIO_S3.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ class BackupReaderS3 : public BackupReaderDefault
2323
const S3::URI & s3_uri_,
2424
const String & access_key_id_,
2525
const String & secret_access_key_,
26+
const String & role_arn,
27+
const String & role_session_name,
2628
bool allow_s3_native_copy,
2729
const ReadSettings & read_settings_,
2830
const WriteSettings & write_settings_,
@@ -54,6 +56,8 @@ class BackupWriterS3 : public BackupWriterDefault
5456
const S3::URI & s3_uri_,
5557
const String & access_key_id_,
5658
const String & secret_access_key_,
59+
const String & role_arn,
60+
const String & role_session_name,
5761
bool allow_s3_native_copy,
5862
const String & storage_class_name,
5963
const ReadSettings & read_settings_,

0 commit comments

Comments
 (0)