Skip to content

Commit 2fd834e

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #953 from Altinity/feature/antalya-25.6.5/frontport_860
25.8 Antalya port of #860: Support different warehouses behind Iceberg REST catalog
1 parent 61c1d5c commit 2fd834e

File tree

11 files changed

+127
-19
lines changed

11 files changed

+127
-19
lines changed

src/Databases/DataLake/ICatalog.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,19 @@ void TableMetadata::setLocation(const std::string & location_)
8484
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
8585

8686
if (pos_to_path == std::string::npos)
87-
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
88-
89-
pos_to_path = pos_to_bucket + pos_to_path;
87+
{ // empty path
88+
location_without_path = location_;
89+
path.clear();
90+
bucket = location_.substr(pos_to_bucket);
91+
}
92+
else
93+
{
94+
pos_to_path = pos_to_bucket + pos_to_path;
9095

91-
location_without_path = location_.substr(0, pos_to_path);
92-
path = location_.substr(pos_to_path + 1);
93-
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
96+
location_without_path = location_.substr(0, pos_to_path);
97+
path = location_.substr(pos_to_path + 1);
98+
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
99+
}
94100

95101
LOG_TEST(getLogger("TableMetadata"),
96102
"Parsed location without path: {}, path: {}",

src/IO/S3/Client.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
421421
auto bucket_uri = getURIForBucket(bucket);
422422
if (!bucket_uri)
423423
{
424-
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
424+
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
425425
return *maybe_error;
426426

427427
if (auto region = getRegionForBucket(bucket); !region.empty())
@@ -626,7 +626,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
626626
if (auto uri = getURIForBucket(bucket); uri.has_value())
627627
request.overrideURI(std::move(*uri));
628628

629-
630629
bool found_new_endpoint = false;
631630
// if we found correct endpoint after 301 responses, update the cache for future requests
632631
SCOPE_EXIT(
@@ -929,12 +928,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
929928
}
930929

931930
// Do a list request because head requests don't have body in response
932-
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
931+
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
932+
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
933933
{
934-
ListObjectsV2Request req;
934+
GetObjectRequest req;
935935
req.SetBucket(bucket);
936-
req.SetMaxKeys(1);
937-
auto result = ListObjectsV2(req);
936+
req.SetKey(key);
937+
req.SetRange("bytes=0-1");
938+
auto result = GetObject(req);
939+
938940
if (result.IsSuccess())
939941
return std::nullopt;
940942
return result.GetError();

src/IO/S3/Client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ class Client : private Aws::S3::S3Client
279279

280280
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
281281
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
282-
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
282+
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;
283283

284284
std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;
285285

src/IO/S3/URI.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
158158
validateKey(key, uri);
159159
}
160160

161+
bool URI::isAWSRegion(std::string_view region)
162+
{
163+
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
164+
static const std::unordered_set<std::string_view> regions = {
165+
"us-east-2",
166+
"us-east-1",
167+
"us-west-1",
168+
"us-west-2",
169+
"af-south-1",
170+
"ap-east-1",
171+
"ap-south-2",
172+
"ap-southeast-3",
173+
"ap-southeast-5",
174+
"ap-southeast-4",
175+
"ap-south-1",
176+
"ap-northeast-3",
177+
"ap-northeast-2",
178+
"ap-southeast-1",
179+
"ap-southeast-2",
180+
"ap-east-2",
181+
"ap-southeast-7",
182+
"ap-northeast-1",
183+
"ca-central-1",
184+
"ca-west-1",
185+
"eu-central-1",
186+
"eu-west-1",
187+
"eu-west-2",
188+
"eu-south-1",
189+
"eu-west-3",
190+
"eu-south-2",
191+
"eu-north-1",
192+
"eu-central-2",
193+
"il-central-1",
194+
"mx-central-1",
195+
"me-south-1",
196+
"me-central-1",
197+
"sa-east-1",
198+
"us-gov-east-1",
199+
"us-gov-west-1"
200+
};
201+
202+
/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
203+
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
204+
if (region.substr(0, 3) == "s3-")
205+
region = region.substr(3);
206+
207+
return regions.contains(region);
208+
}
209+
161210
void URI::addRegionToURI(const std::string &region)
162211
{
163212
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
213+
{
214+
if (pos > 0)
215+
{ /// Check if region is already in endpoint to avoid add it second time
216+
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
217+
if (prev_pos == std::string::npos)
218+
prev_pos = 0;
219+
else
220+
++prev_pos;
221+
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
222+
if (isAWSRegion(endpoint_region))
223+
return;
224+
}
164225
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
226+
}
165227
}
166228

167229
void URI::validateBucket(const String & bucket, const Poco::URI & uri)

src/IO/S3/URI.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ struct URI
4141

4242
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
4343
static void validateKey(const std::string & key, const Poco::URI & uri);
44+
45+
/// Returns true if 'region' string is an AWS S3 region
46+
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
47+
static bool isAWSRegion(std::string_view region);
4448
};
4549

4650
}

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,10 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
316316
persistent_components,
317317
local_context,
318318
getProperFilePathFromMetadataInfo(
319-
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, persistent_components.table_location),
319+
snapshot->getValue<String>(f_manifest_list),
320+
configuration_ptr->getPathForRead().path,
321+
persistent_components.table_location,
322+
configuration_ptr->getNamespace()),
320323
log),
321324
relevant_snapshot_id,
322325
total_rows,

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ ManifestFileContent::ManifestFileContent(
150150
Int64 inherited_sequence_number,
151151
Int64 inherited_snapshot_id,
152152
const String & table_location,
153+
const String & common_namespace,
153154
DB::ContextPtr context,
154155
const String & path_to_manifest_file_)
155156
: path_to_manifest_file(path_to_manifest_file_)
@@ -286,7 +287,11 @@ ManifestFileContent::ManifestFileContent(
286287

287288
const auto file_path_key
288289
= manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>();
289-
const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(), common_path, table_location);
290+
const auto file_path = getProperFilePathFromMetadataInfo(
291+
manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(),
292+
common_path,
293+
table_location,
294+
common_namespace);
290295

291296
/// NOTE: This is weird, because in manifest file partition looks like this:
292297
/// {

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ class ManifestFileContent : public boost::noncopyable
115115
Int64 inherited_sequence_number,
116116
Int64 inherited_snapshot_id,
117117
const std::string & table_location,
118+
const std::string & common_namespace,
118119
DB::ContextPtr context,
119120
const String & path_to_manifest_file_);
120121

src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ Iceberg::ManifestFilePtr getManifestFile(
103103
inherited_sequence_number,
104104
inherited_snapshot_id,
105105
persistent_table_components.table_location,
106+
configuration->getNamespace(),
106107
local_context,
107108
filename);
108109
};
@@ -160,7 +161,10 @@ ManifestFileCacheKeys getManifestList(
160161
const std::string file_path
161162
= manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>();
162163
const auto manifest_file_name = getProperFilePathFromMetadataInfo(
163-
file_path, configuration_ptr->getPathForRead().path, persistent_table_components.table_location);
164+
file_path,
165+
configuration_ptr->getPathForRead().path,
166+
persistent_table_components.table_location,
167+
configuration_ptr->getNamespace());
164168
Int64 added_sequence_number = 0;
165169
auto added_snapshot_id = manifest_list_deserializer.getValueFromRowByName(i, f_added_snapshot_id);
166170
if (added_snapshot_id.isNull())

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,11 @@ std::optional<TransformAndArgument> parseTransformAndArgument(const String & tra
166166
// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files.
167167
// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro
168168
// Common path should end with "<table_name>" or "<table_name>/".
169-
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location)
169+
std::string getProperFilePathFromMetadataInfo(
170+
std::string_view data_path,
171+
std::string_view common_path,
172+
std::string_view table_location,
173+
std::string_view common_namespace)
170174
{
171175
auto trim_backward_slash = [](std::string_view str) -> std::string_view
172176
{
@@ -231,7 +235,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s
231235
}
232236
else
233237
{
234-
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path);
238+
/// Data files can have different path
239+
pos = data_path.find("://");
240+
if (pos == std::string::npos)
241+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
242+
pos = data_path.find('/', pos + 3);
243+
if (pos == std::string::npos)
244+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
245+
if (data_path.substr(pos + 1).starts_with(common_namespace))
246+
{
247+
auto new_pos = data_path.find('/', pos + 1);
248+
if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path
249+
pos = new_pos;
250+
}
251+
return std::string(data_path.substr(pos));
235252
}
236253
}
237254

0 commit comments

Comments
 (0)