Skip to content

Commit 1dbc9b7

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #860 from Altinity/feature/different_folder_for_data_lake
Antalya 25.3: Support different warehouses behind Iceberg REST catalog
1 parent efd42b3 commit 1dbc9b7

File tree

10 files changed

+121
-19
lines changed

10 files changed

+121
-19
lines changed

src/Databases/DataLake/ICatalog.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,19 @@ void TableMetadata::setLocation(const std::string & location_)
7070
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
7171

7272
if (pos_to_path == std::string::npos)
73-
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
74-
75-
pos_to_path = pos_to_bucket + pos_to_path;
73+
{ // empty path
74+
location_without_path = location_;
75+
path.clear();
76+
bucket = location_.substr(pos_to_bucket);
77+
}
78+
else
79+
{
80+
pos_to_path = pos_to_bucket + pos_to_path;
7681

77-
location_without_path = location_.substr(0, pos_to_path);
78-
path = location_.substr(pos_to_path + 1);
79-
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
82+
location_without_path = location_.substr(0, pos_to_path);
83+
path = location_.substr(pos_to_path + 1);
84+
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
85+
}
8086

8187
LOG_TEST(getLogger("TableMetadata"),
8288
"Parsed location without path: {}, path: {}",

src/IO/S3/Client.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
379379
auto bucket_uri = getURIForBucket(bucket);
380380
if (!bucket_uri)
381381
{
382-
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
382+
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
383383
return *maybe_error;
384384

385385
if (auto region = getRegionForBucket(bucket); !region.empty())
@@ -584,7 +584,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
584584
if (auto uri = getURIForBucket(bucket); uri.has_value())
585585
request.overrideURI(std::move(*uri));
586586

587-
588587
bool found_new_endpoint = false;
589588
// if we found correct endpoint after 301 responses, update the cache for future requests
590589
SCOPE_EXIT(
@@ -864,12 +863,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
864863
}
865864

866865
// Do a list request because head requests don't have body in response
867-
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
866+
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
867+
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
868868
{
869-
ListObjectsV2Request req;
869+
GetObjectRequest req;
870870
req.SetBucket(bucket);
871-
req.SetMaxKeys(1);
872-
auto result = ListObjectsV2(req);
871+
req.SetKey(key);
872+
req.SetRange("bytes=0-1");
873+
auto result = GetObject(req);
874+
873875
if (result.IsSuccess())
874876
return std::nullopt;
875877
return result.GetError();

src/IO/S3/Client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ class Client : private Aws::S3::S3Client
269269

270270
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
271271
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
272-
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
272+
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;
273273

274274
std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;
275275

src/IO/S3/URI.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
158158
validateKey(key, uri);
159159
}
160160

161+
bool URI::isAWSRegion(std::string_view region)
162+
{
163+
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
164+
static const std::unordered_set<std::string_view> regions = {
165+
"us-east-2",
166+
"us-east-1",
167+
"us-west-1",
168+
"us-west-2",
169+
"af-south-1",
170+
"ap-east-1",
171+
"ap-south-2",
172+
"ap-southeast-3",
173+
"ap-southeast-5",
174+
"ap-southeast-4",
175+
"ap-south-1",
176+
"ap-northeast-3",
177+
"ap-northeast-2",
178+
"ap-southeast-1",
179+
"ap-southeast-2",
180+
"ap-east-2",
181+
"ap-southeast-7",
182+
"ap-northeast-1",
183+
"ca-central-1",
184+
"ca-west-1",
185+
"eu-central-1",
186+
"eu-west-1",
187+
"eu-west-2",
188+
"eu-south-1",
189+
"eu-west-3",
190+
"eu-south-2",
191+
"eu-north-1",
192+
"eu-central-2",
193+
"il-central-1",
194+
"mx-central-1",
195+
"me-south-1",
196+
"me-central-1",
197+
"sa-east-1",
198+
"us-gov-east-1",
199+
"us-gov-west-1"
200+
};
201+
202+
/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
203+
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
204+
if (region.substr(0, 3) == "s3-")
205+
region = region.substr(3);
206+
207+
return regions.contains(region);
208+
}
209+
161210
void URI::addRegionToURI(const std::string &region)
162211
{
163212
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
213+
{
214+
if (pos > 0)
215+
{ /// Check if region is already in endpoint to avoid add it second time
216+
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
217+
if (prev_pos == std::string::npos)
218+
prev_pos = 0;
219+
else
220+
++prev_pos;
221+
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
222+
if (isAWSRegion(endpoint_region))
223+
return;
224+
}
164225
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
226+
}
165227
}
166228

167229
void URI::validateBucket(const String & bucket, const Poco::URI & uri)

src/IO/S3/URI.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ struct URI
4242
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
4343
static void validateKey(const std::string & key, const Poco::URI & uri);
4444

45+
/// Returns true if 'region' string is an AWS S3 region
46+
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
47+
static bool isAWSRegion(std::string_view region);
48+
4549
private:
4650
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
4751
};

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ void IcebergMetadata::updateSnapshot(Poco::JSON::Object::Ptr metadata_object)
515515

516516
relevant_snapshot = IcebergSnapshot{
517517
getManifestList(getProperFilePathFromMetadataInfo(
518-
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPath(), table_location)),
518+
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace())),
519519
relevant_snapshot_id, total_rows, total_bytes};
520520

521521
if (!snapshot->has(f_schema_id))
@@ -678,7 +678,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(const String & filename)
678678
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
679679
{
680680
const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>();
681-
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location);
681+
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace());
682682
Int64 added_sequence_number = 0;
683683
if (format_version > 1)
684684
added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, f_sequence_number, TypeIndex::Int64).safeGet<Int64>();
@@ -805,6 +805,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
805805
schema_processor,
806806
inherited_sequence_number,
807807
table_location,
808+
configuration_ptr->getNamespace(),
808809
getContext());
809810
};
810811

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ ManifestFileContent::ManifestFileContent(
127127
const IcebergSchemaProcessor & schema_processor,
128128
Int64 inherited_sequence_number,
129129
const String & table_location,
130+
const String & common_namespace,
130131
DB::ContextPtr context)
131132
{
132133
this->schema_id = schema_id_;
@@ -191,7 +192,11 @@ ManifestFileContent::ManifestFileContent(
191192
}
192193
const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, f_status, TypeIndex::Int32).safeGet<UInt64>());
193194

194-
const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(), common_path, table_location);
195+
const auto file_path = getProperFilePathFromMetadataInfo(
196+
manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(),
197+
common_path,
198+
table_location,
199+
common_namespace);
195200

196201
/// NOTE: This is weird, because in manifest file partition looks like this:
197202
/// {

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class ManifestFileContent
9494
const DB::IcebergSchemaProcessor & schema_processor,
9595
Int64 inherited_sequence_number,
9696
const std::string & table_location,
97+
const std::string & common_namespace,
9798
DB::ContextPtr context);
9899

99100
const std::vector<ManifestFileEntry> & getFiles() const;

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ using namespace DB;
2828
// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files.
2929
// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro
3030
// Common path should end with "<table_name>" or "<table_name>/".
31-
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location)
31+
std::string getProperFilePathFromMetadataInfo(
32+
std::string_view data_path,
33+
std::string_view common_path,
34+
std::string_view table_location,
35+
std::string_view common_namespace)
3236
{
3337
auto trim_backward_slash = [](std::string_view str) -> std::string_view
3438
{
@@ -84,7 +88,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s
8488
}
8589
else
8690
{
87-
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path);
91+
/// Data files can have different path
92+
pos = data_path.find("://");
93+
if (pos == std::string::npos)
94+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
95+
pos = data_path.find("/", pos + 3);
96+
if (pos == std::string::npos)
97+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
98+
if (data_path.substr(pos + 1).starts_with(common_namespace))
99+
{
100+
auto new_pos = data_path.find("/", pos + 1);
101+
if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path
102+
pos = new_pos;
103+
}
104+
return std::string(data_path.substr(pos));
88105
}
89106
}
90107

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010
namespace Iceberg
1111
{
1212

13-
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location);
13+
std::string getProperFilePathFromMetadataInfo(
14+
std::string_view data_path,
15+
std::string_view common_path,
16+
std::string_view table_location,
17+
std::string_view common_namespace);
1418

1519
}
1620

0 commit comments

Comments
 (0)