Skip to content

Commit 3d4922b

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #860 from Altinity/feature/different_folder_for_data_lake
Antalya 25.3: Support different warehouses behind Iceberg REST catalog
1 parent 809953b commit 3d4922b

File tree

10 files changed

+121
-19
lines changed

10 files changed

+121
-19
lines changed

src/Databases/DataLake/ICatalog.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,19 @@ void TableMetadata::setLocation(const std::string & location_)
7070
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
7171

7272
if (pos_to_path == std::string::npos)
73-
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
74-
75-
pos_to_path = pos_to_bucket + pos_to_path;
73+
{ // empty path
74+
location_without_path = location_;
75+
path.clear();
76+
bucket = location_.substr(pos_to_bucket);
77+
}
78+
else
79+
{
80+
pos_to_path = pos_to_bucket + pos_to_path;
7681

77-
location_without_path = location_.substr(0, pos_to_path);
78-
path = location_.substr(pos_to_path + 1);
79-
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
82+
location_without_path = location_.substr(0, pos_to_path);
83+
path = location_.substr(pos_to_path + 1);
84+
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
85+
}
8086

8187
LOG_TEST(getLogger("TableMetadata"),
8288
"Parsed location without path: {}, path: {}",

src/IO/S3/Client.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
379379
auto bucket_uri = getURIForBucket(bucket);
380380
if (!bucket_uri)
381381
{
382-
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
382+
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
383383
return *maybe_error;
384384

385385
if (auto region = getRegionForBucket(bucket); !region.empty())
@@ -584,7 +584,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
584584
if (auto uri = getURIForBucket(bucket); uri.has_value())
585585
request.overrideURI(std::move(*uri));
586586

587-
588587
bool found_new_endpoint = false;
589588
// if we found correct endpoint after 301 responses, update the cache for future requests
590589
SCOPE_EXIT(
@@ -864,12 +863,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
864863
}
865864

866865
// Do a list request because head requests don't have body in response
867-
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
866+
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
867+
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
868868
{
869-
ListObjectsV2Request req;
869+
GetObjectRequest req;
870870
req.SetBucket(bucket);
871-
req.SetMaxKeys(1);
872-
auto result = ListObjectsV2(req);
871+
req.SetKey(key);
872+
req.SetRange("bytes=0-1");
873+
auto result = GetObject(req);
874+
873875
if (result.IsSuccess())
874876
return std::nullopt;
875877
return result.GetError();

src/IO/S3/Client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ class Client : private Aws::S3::S3Client
269269

270270
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
271271
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
272-
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
272+
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;
273273

274274
std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;
275275

src/IO/S3/URI.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
158158
validateKey(key, uri);
159159
}
160160

161+
bool URI::isAWSRegion(std::string_view region)
162+
{
163+
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
164+
static const std::unordered_set<std::string_view> regions = {
165+
"us-east-2",
166+
"us-east-1",
167+
"us-west-1",
168+
"us-west-2",
169+
"af-south-1",
170+
"ap-east-1",
171+
"ap-south-2",
172+
"ap-southeast-3",
173+
"ap-southeast-5",
174+
"ap-southeast-4",
175+
"ap-south-1",
176+
"ap-northeast-3",
177+
"ap-northeast-2",
178+
"ap-southeast-1",
179+
"ap-southeast-2",
180+
"ap-east-2",
181+
"ap-southeast-7",
182+
"ap-northeast-1",
183+
"ca-central-1",
184+
"ca-west-1",
185+
"eu-central-1",
186+
"eu-west-1",
187+
"eu-west-2",
188+
"eu-south-1",
189+
"eu-west-3",
190+
"eu-south-2",
191+
"eu-north-1",
192+
"eu-central-2",
193+
"il-central-1",
194+
"mx-central-1",
195+
"me-south-1",
196+
"me-central-1",
197+
"sa-east-1",
198+
"us-gov-east-1",
199+
"us-gov-west-1"
200+
};
201+
202+
/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
203+
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
204+
if (region.substr(0, 3) == "s3-")
205+
region = region.substr(3);
206+
207+
return regions.contains(region);
208+
}
209+
161210
void URI::addRegionToURI(const std::string &region)
162211
{
163212
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
213+
{
214+
if (pos > 0)
215+
{ /// Check if region is already in endpoint to avoid add it second time
216+
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
217+
if (prev_pos == std::string::npos)
218+
prev_pos = 0;
219+
else
220+
++prev_pos;
221+
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
222+
if (isAWSRegion(endpoint_region))
223+
return;
224+
}
164225
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
226+
}
165227
}
166228

167229
void URI::validateBucket(const String & bucket, const Poco::URI & uri)

src/IO/S3/URI.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ struct URI
4242
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
4343
static void validateKey(const std::string & key, const Poco::URI & uri);
4444

45+
/// Returns true if 'region' string is an AWS S3 region
46+
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
47+
static bool isAWSRegion(std::string_view region);
48+
4549
private:
4650
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
4751
};

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
528528

529529
relevant_snapshot = IcebergSnapshot{
530530
getManifestList(local_context, getProperFilePathFromMetadataInfo(
531-
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, table_location)),
531+
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace())),
532532
relevant_snapshot_id, total_rows, total_bytes};
533533

534534
if (!snapshot->has(f_schema_id))
@@ -710,7 +710,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
710710
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
711711
{
712712
const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>();
713-
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPathForRead().path, table_location);
713+
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace());
714714
Int64 added_sequence_number = 0;
715715
if (format_version > 1)
716716
added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, f_sequence_number, TypeIndex::Int64).safeGet<Int64>();
@@ -846,6 +846,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
846846
schema_processor,
847847
inherited_sequence_number,
848848
table_location,
849+
configuration_ptr->getNamespace(),
849850
local_context);
850851
};
851852

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ ManifestFileContent::ManifestFileContent(
128128
const IcebergSchemaProcessor & schema_processor,
129129
Int64 inherited_sequence_number,
130130
const String & table_location,
131+
const String & common_namespace,
131132
DB::ContextPtr context)
132133
{
133134
this->schema_id = schema_id_;
@@ -192,7 +193,11 @@ ManifestFileContent::ManifestFileContent(
192193
}
193194
const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, f_status, TypeIndex::Int32).safeGet<UInt64>());
194195

195-
const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(), common_path, table_location);
196+
const auto file_path = getProperFilePathFromMetadataInfo(
197+
manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(),
198+
common_path,
199+
table_location,
200+
common_namespace);
196201

197202
/// NOTE: This is weird, because in manifest file partition looks like this:
198203
/// {

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class ManifestFileContent
9494
const DB::IcebergSchemaProcessor & schema_processor,
9595
Int64 inherited_sequence_number,
9696
const std::string & table_location,
97+
const std::string & common_namespace,
9798
DB::ContextPtr context);
9899

99100
const std::vector<ManifestFileEntry> & getFiles() const;

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ using namespace DB;
2828
// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files.
2929
// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro
3030
// Common path should end with "<table_name>" or "<table_name>/".
31-
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location)
31+
std::string getProperFilePathFromMetadataInfo(
32+
std::string_view data_path,
33+
std::string_view common_path,
34+
std::string_view table_location,
35+
std::string_view common_namespace)
3236
{
3337
auto trim_backward_slash = [](std::string_view str) -> std::string_view
3438
{
@@ -84,7 +88,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s
8488
}
8589
else
8690
{
87-
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path);
91+
/// Data files can have different path
92+
pos = data_path.find("://");
93+
if (pos == std::string::npos)
94+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
95+
pos = data_path.find("/", pos + 3);
96+
if (pos == std::string::npos)
97+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
98+
if (data_path.substr(pos + 1).starts_with(common_namespace))
99+
{
100+
auto new_pos = data_path.find("/", pos + 1);
101+
if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path
102+
pos = new_pos;
103+
}
104+
return std::string(data_path.substr(pos));
88105
}
89106
}
90107

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010
namespace Iceberg
1111
{
1212

13-
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location);
13+
std::string getProperFilePathFromMetadataInfo(
14+
std::string_view data_path,
15+
std::string_view common_path,
16+
std::string_view table_location,
17+
std::string_view common_namespace);
1418

1519
}
1620

0 commit comments

Comments
 (0)