Skip to content

Commit f15b21f

Browse files
authored
Merge pull request #1060 from Altinity/frontport/antalya-25.8/iceberg_rest_warehouses
25.8 Antalya ports: Improvement for Iceberg REST catalog
2 parents 43433f2 + 34560d2 commit f15b21f

File tree

13 files changed

+157
-22
lines changed

13 files changed

+157
-22
lines changed

src/Databases/DataLake/ICatalog.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,19 @@ void TableMetadata::setLocation(const std::string & location_)
8484
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
8585

8686
if (pos_to_path == std::string::npos)
87-
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
88-
89-
pos_to_path = pos_to_bucket + pos_to_path;
87+
{ // empty path
88+
location_without_path = location_;
89+
path.clear();
90+
bucket = location_.substr(pos_to_bucket);
91+
}
92+
else
93+
{
94+
pos_to_path = pos_to_bucket + pos_to_path;
9095

91-
location_without_path = location_.substr(0, pos_to_path);
92-
path = location_.substr(pos_to_path + 1);
93-
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
96+
location_without_path = location_.substr(0, pos_to_path);
97+
path = location_.substr(pos_to_path + 1);
98+
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
99+
}
94100

95101
LOG_TEST(getLogger("TableMetadata"),
96102
"Parsed location without path: {}, path: {}",

src/Databases/DataLake/RestCatalog.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(
272272
{
273273
const auto & context = getContext();
274274

275-
Poco::URI url(base_url / endpoint);
275+
Poco::URI url(base_url / endpoint, false);
276276
if (!params.empty())
277277
url.setQueryParameters(params);
278278

@@ -511,7 +511,9 @@ DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & bas
511511
for (size_t i = 0; i < identifiers_object->size(); ++i)
512512
{
513513
const auto current_table_json = identifiers_object->get(static_cast<int>(i)).extract<Poco::JSON::Object::Ptr>();
514-
const auto table_name = current_table_json->get("name").extract<String>();
514+
const auto table_name_raw = current_table_json->get("name").extract<String>();
515+
std::string table_name;
516+
Poco::URI::encode(table_name_raw, "/", table_name);
515517

516518
tables.push_back(base_namespace + "." + table_name);
517519
if (limit && tables.size() >= limit)
@@ -700,7 +702,7 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r
700702
};
701703
}
702704

703-
Poco::URI url(endpoint);
705+
Poco::URI url(endpoint, false);
704706
auto wb = DB::BuilderRWBufferFromHTTP(url)
705707
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
706708
.withMethod(method)

src/IO/S3/Client.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
465465
auto bucket_uri = getURIForBucket(bucket);
466466
if (!bucket_uri)
467467
{
468-
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
468+
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
469469
return *maybe_error;
470470

471471
if (auto region = getRegionForBucket(bucket); !region.empty())
@@ -670,7 +670,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
670670
if (auto uri = getURIForBucket(bucket); uri.has_value())
671671
request.overrideURI(std::move(*uri));
672672

673-
674673
bool found_new_endpoint = false;
675674
// if we found correct endpoint after 301 responses, update the cache for future requests
676675
SCOPE_EXIT(
@@ -973,12 +972,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
973972
}
974973

975974
// Do a list request because head requests don't have body in response
976-
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
975+
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
976+
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
977977
{
978-
ListObjectsV2Request req;
978+
GetObjectRequest req;
979979
req.SetBucket(bucket);
980-
req.SetMaxKeys(1);
981-
auto result = ListObjectsV2(req);
980+
req.SetKey(key);
981+
req.SetRange("bytes=0-1");
982+
auto result = GetObject(req);
983+
982984
if (result.IsSuccess())
983985
return std::nullopt;
984986
return result.GetError();

src/IO/S3/Client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ class Client : private Aws::S3::S3Client
285285

286286
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
287287
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
288-
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
288+
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;
289289

290290
std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;
291291

src/IO/S3/URI.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
158158
validateKey(key, uri);
159159
}
160160

161+
bool URI::isAWSRegion(std::string_view region)
162+
{
163+
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
164+
static const std::unordered_set<std::string_view> regions = {
165+
"us-east-2",
166+
"us-east-1",
167+
"us-west-1",
168+
"us-west-2",
169+
"af-south-1",
170+
"ap-east-1",
171+
"ap-south-2",
172+
"ap-southeast-3",
173+
"ap-southeast-5",
174+
"ap-southeast-4",
175+
"ap-south-1",
176+
"ap-northeast-3",
177+
"ap-northeast-2",
178+
"ap-southeast-1",
179+
"ap-southeast-2",
180+
"ap-east-2",
181+
"ap-southeast-7",
182+
"ap-northeast-1",
183+
"ca-central-1",
184+
"ca-west-1",
185+
"eu-central-1",
186+
"eu-west-1",
187+
"eu-west-2",
188+
"eu-south-1",
189+
"eu-west-3",
190+
"eu-south-2",
191+
"eu-north-1",
192+
"eu-central-2",
193+
"il-central-1",
194+
"mx-central-1",
195+
"me-south-1",
196+
"me-central-1",
197+
"sa-east-1",
198+
"us-gov-east-1",
199+
"us-gov-west-1"
200+
};
201+
202+
/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
203+
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
204+
if (region.substr(0, 3) == "s3-")
205+
region = region.substr(3);
206+
207+
return regions.contains(region);
208+
}
209+
161210
void URI::addRegionToURI(const std::string &region)
162211
{
163212
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
213+
{
214+
if (pos > 0)
215+
{ /// Check if region is already in endpoint to avoid add it second time
216+
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
217+
if (prev_pos == std::string::npos)
218+
prev_pos = 0;
219+
else
220+
++prev_pos;
221+
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
222+
if (isAWSRegion(endpoint_region))
223+
return;
224+
}
164225
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
226+
}
165227
}
166228

167229
void URI::validateBucket(const String & bucket, const Poco::URI & uri)

src/IO/S3/URI.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ struct URI
4141

4242
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
4343
static void validateKey(const std::string & key, const Poco::URI & uri);
44+
45+
/// Returns true if 'region' string is an AWS S3 region
46+
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
47+
static bool isAWSRegion(std::string_view region);
4448
};
4549

4650
}

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,10 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
316316
persistent_components,
317317
local_context,
318318
getProperFilePathFromMetadataInfo(
319-
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, persistent_components.table_location),
319+
snapshot->getValue<String>(f_manifest_list),
320+
configuration_ptr->getPathForRead().path,
321+
persistent_components.table_location,
322+
configuration_ptr->getNamespace()),
320323
log),
321324
relevant_snapshot_id,
322325
total_rows,

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ ManifestFileContent::ManifestFileContent(
150150
Int64 inherited_sequence_number,
151151
Int64 inherited_snapshot_id,
152152
const String & table_location,
153+
const String & common_namespace,
153154
DB::ContextPtr context,
154155
const String & path_to_manifest_file_)
155156
: path_to_manifest_file(path_to_manifest_file_)
@@ -286,7 +287,11 @@ ManifestFileContent::ManifestFileContent(
286287

287288
const auto file_path_key
288289
= manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>();
289-
const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(), common_path, table_location);
290+
const auto file_path = getProperFilePathFromMetadataInfo(
291+
manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(),
292+
common_path,
293+
table_location,
294+
common_namespace);
290295

291296
/// NOTE: This is weird, because in manifest file partition looks like this:
292297
/// {

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ class ManifestFileContent : public boost::noncopyable
115115
Int64 inherited_sequence_number,
116116
Int64 inherited_snapshot_id,
117117
const std::string & table_location,
118+
const std::string & common_namespace,
118119
DB::ContextPtr context,
119120
const String & path_to_manifest_file_);
120121

src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ Iceberg::ManifestFilePtr getManifestFile(
103103
inherited_sequence_number,
104104
inherited_snapshot_id,
105105
persistent_table_components.table_location,
106+
configuration->getNamespace(),
106107
local_context,
107108
filename);
108109
};
@@ -160,7 +161,10 @@ ManifestFileCacheKeys getManifestList(
160161
const std::string file_path
161162
= manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>();
162163
const auto manifest_file_name = getProperFilePathFromMetadataInfo(
163-
file_path, configuration_ptr->getPathForRead().path, persistent_table_components.table_location);
164+
file_path,
165+
configuration_ptr->getPathForRead().path,
166+
persistent_table_components.table_location,
167+
configuration_ptr->getNamespace());
164168
Int64 added_sequence_number = 0;
165169
auto added_snapshot_id = manifest_list_deserializer.getValueFromRowByName(i, f_added_snapshot_id);
166170
if (added_snapshot_id.isNull())

0 commit comments

Comments
 (0)