Skip to content

Commit 021ab4f

Browse files
committed
still ugly but seems to be working
1 parent ae5bf79 commit 021ab4f

File tree

10 files changed

+140
-28
lines changed

10 files changed

+140
-28
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ Plan getPlan(
147147
std::unordered_map<String, std::shared_ptr<ManifestFilePlan>> manifest_files;
148148
for (const auto & snapshot : snapshots_info)
149149
{
150-
auto [manifest_list_storage, key_in_storage] = resolveObjectStorageForPath("", snapshot.manifest_list_path, object_storage, secondary_storages, context);
150+
auto [manifest_list_storage, key_in_storage] = resolveObjectStorageForPath(persistent_table_components.table_location, snapshot.manifest_list_path, object_storage, secondary_storages, context);
151151

152152
auto manifest_list
153153
= getManifestList(manifest_list_storage, configuration, persistent_table_components, context, key_in_storage, snapshot.manifest_list_path, log);
@@ -163,10 +163,10 @@ Plan getPlan(
163163
persistent_table_components,
164164
context,
165165
log,
166-
manifest_file.manifest_file_path,
167166
manifest_file.manifest_file_absolute_path,
168167
manifest_file.added_sequence_number,
169-
manifest_file.added_snapshot_id);
168+
manifest_file.added_snapshot_id,
169+
secondary_storages);
170170

171171
if (!manifest_files.contains(manifest_file.manifest_file_absolute_path))
172172
{

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,29 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_man
5656
}
5757
}
5858

59+
IcebergDataObjectInfo::IcebergDataObjectInfo(
60+
Iceberg::ManifestFileEntry data_manifest_file_entry_,
61+
ObjectStoragePtr resolved_storage,
62+
const String & resolved_key)
63+
: PathWithMetadata(resolved_key, std::nullopt, data_manifest_file_entry_.file_path, resolved_storage)
64+
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
65+
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
66+
, sequence_number(data_manifest_file_entry_.added_sequence_number)
67+
{
68+
auto toupper = [](String & str)
69+
{
70+
std::transform(str.begin(), str.end(), str.begin(), ::toupper);
71+
return str;
72+
};
73+
if (!position_deletes_objects.empty() && toupper(data_manifest_file_entry_.file_format) != "PARQUET")
74+
{
75+
throw Exception(
76+
ErrorCodes::NOT_IMPLEMENTED,
77+
"Position deletes are only supported for data files of Parquet format in Iceberg, but got {}",
78+
data_manifest_file_entry_.file_format);
79+
}
80+
}
81+
5982
std::shared_ptr<ISimpleTransform> IcebergDataObjectInfo::getPositionDeleteTransformer(
6083
ObjectStoragePtr object_storage,
6184
const SharedHeader & header,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
2121
/// It is used to filter position deletes objects by data file path.
2222
/// It is also used to create a filter for the data object in the position delete transform.
2323
explicit IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_);
24+
25+
/// Constructor with resolved storage and key for files that may be outside table location
26+
explicit IcebergDataObjectInfo(
27+
Iceberg::ManifestFileEntry data_manifest_file_entry_,
28+
ObjectStoragePtr resolved_storage,
29+
const String & resolved_key);
2430

2531
std::shared_ptr<ISimpleTransform> getPositionDeleteTransformer(
2632
ObjectStoragePtr object_storage,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include <Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h>
4343
#include <Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h>
4444
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
45+
#include <Storages/ObjectStorage/Utils.h>
4546

4647
#include <Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h>
4748

@@ -124,16 +125,17 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
124125
++manifest_file_index;
125126
continue;
126127
}
128+
std::map<String, ObjectStoragePtr> empty_secondary_storages;
127129
current_manifest_file_content = Iceberg::getManifestFile(
128130
object_storage,
129131
configuration.lock(),
130132
persistent_components,
131133
local_context,
132134
log,
133-
data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_path,
134135
data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path,
135136
data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number,
136-
data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id);
137+
data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id,
138+
empty_secondary_storages);
137139
internal_data_index = 0;
138140
}
139141
auto files = files_generator(current_manifest_file_content);
@@ -237,6 +239,7 @@ IcebergIterator::IcebergIterator(
237239
PersistentTableComponents persistent_components_)
238240
: filter_dag(filter_dag_ ? std::make_unique<ActionsDAG>(filter_dag_->clone()) : nullptr)
239241
, object_storage(std::move(object_storage_))
242+
, local_context(local_context_)
240243
, data_files_iterator(
241244
object_storage,
242245
local_context_,
@@ -327,7 +330,13 @@ ObjectInfoPtr IcebergIterator::next(size_t)
327330
Iceberg::ManifestFileEntry manifest_file_entry;
328331
if (blocking_queue.pop(manifest_file_entry))
329332
{
330-
IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(manifest_file_entry);
333+
// Resolve the data file path to get the correct storage and key
334+
std::map<String, ObjectStoragePtr> empty_secondary_storages;
335+
auto [storage_to_use, resolved_key] = resolveObjectStorageForPath(
336+
persistent_components.table_location, manifest_file_entry.file_path, object_storage, empty_secondary_storages, local_context);
337+
338+
// Create IcebergDataObjectInfo with resolved storage
339+
IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(manifest_file_entry, storage_to_use, resolved_key);
331340
for (const auto & position_delete : defineDeletesSpan(manifest_file_entry, position_deletes_files, false))
332341
{
333342
object_info->addPositionDeleteObject(position_delete);

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class IcebergIterator : public IObjectIterator
100100
private:
101101
std::unique_ptr<ActionsDAG> filter_dag;
102102
ObjectStoragePtr object_storage;
103+
ContextPtr local_context;
103104
Iceberg::SingleThreadIcebergKeysIterator data_files_iterator;
104105
Iceberg::SingleThreadIcebergKeysIterator deletes_iterator;
105106
ConcurrentBoundedQueue<Iceberg::ManifestFileEntry> blocking_queue;

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
497497

498498
auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object);
499499

500-
auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath("", snapshot->getValue<String>(f_manifest_list), object_storage, secondary_storages, local_context);
500+
auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath(persistent_components.table_location, snapshot->getValue<String>(f_manifest_list), object_storage, secondary_storages, local_context);
501501

502502
relevant_snapshot = std::make_shared<IcebergDataSnapshot>(
503503
getManifestList(
@@ -914,10 +914,10 @@ std::optional<size_t> IcebergMetadata::totalRows(ContextPtr local_context) const
914914
persistent_components,
915915
local_context,
916916
log,
917-
manifest_list_entry.manifest_file_path,
918917
manifest_list_entry.manifest_file_absolute_path,
919918
manifest_list_entry.added_sequence_number,
920-
manifest_list_entry.added_snapshot_id);
919+
manifest_list_entry.added_snapshot_id,
920+
secondary_storages);
921921
auto data_count = manifest_file_ptr->getRowsCountInAllFilesExcludingDeleted(FileContentType::DATA);
922922
auto position_deletes_count = manifest_file_ptr->getRowsCountInAllFilesExcludingDeleted(FileContentType::POSITION_DELETE);
923923
if (!data_count.has_value() || !position_deletes_count.has_value())
@@ -955,10 +955,10 @@ std::optional<size_t> IcebergMetadata::totalBytes(ContextPtr local_context) cons
955955
persistent_components,
956956
local_context,
957957
log,
958-
manifest_list_entry.manifest_file_path,
959958
manifest_list_entry.manifest_file_absolute_path,
960959
manifest_list_entry.added_sequence_number,
961-
manifest_list_entry.added_snapshot_id);
960+
manifest_list_entry.added_snapshot_id,
961+
secondary_storages);
962962
auto count = manifest_file_ptr->getBytesCountInAllDataFilesExcludingDeleted();
963963
if (!count.has_value())
964964
return {};

src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ Iceberg::ManifestFilePtr getManifestFile(
7373
const PersistentTableComponents & persistent_table_components,
7474
ContextPtr local_context,
7575
LoggerPtr log,
76-
const String & key_in_storage,
7776
const String & absolute_path,
7877
Int64 inherited_sequence_number,
79-
Int64 inherited_snapshot_id)
78+
Int64 inherited_snapshot_id,
79+
std::map<String, ObjectStoragePtr> & secondary_storages)
8080
{
8181
auto log_level = local_context->getSettingsRef()[Setting::iceberg_metadata_log_level].value;
8282

@@ -85,19 +85,23 @@ Iceberg::ManifestFilePtr getManifestFile(
8585

8686
auto create_fn = [&, use_iceberg_metadata_cache]()
8787
{
88-
PathWithMetadata manifest_object_info(key_in_storage, std::nullopt, absolute_path, object_storage);
88+
// Resolve the absolute path to get the correct storage and key_in_storage
89+
auto [storage_to_use, resolved_key_in_storage] = resolveObjectStorageForPath(
90+
persistent_table_components.table_location, absolute_path, object_storage, secondary_storages, local_context);
91+
92+
PathWithMetadata manifest_object_info(resolved_key_in_storage, std::nullopt, absolute_path, storage_to_use);
8993

9094
auto read_settings = local_context->getReadSettings();
9195
/// Do not utilize filesystem cache if more precise cache enabled
9296
if (use_iceberg_metadata_cache)
9397
read_settings.enable_filesystem_cache = false;
9498

95-
auto buffer = createReadBuffer(manifest_object_info, object_storage, local_context, log, read_settings);
96-
Iceberg::AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), key_in_storage, getFormatSettings(local_context));
99+
auto buffer = createReadBuffer(manifest_object_info, storage_to_use, local_context, log, read_settings);
100+
Iceberg::AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), resolved_key_in_storage, getFormatSettings(local_context));
97101

98102
return std::make_shared<Iceberg::ManifestFileContent>(
99103
manifest_file_deserializer,
100-
key_in_storage,
104+
resolved_key_in_storage,
101105
persistent_table_components.format_version,
102106
configuration->getPathForRead().path,
103107
*persistent_table_components.schema_processor,

src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ Iceberg::ManifestFilePtr getManifestFile(
2929
const PersistentTableComponents & persistent_table_components,
3030
ContextPtr local_context,
3131
LoggerPtr log,
32-
const String & key_in_storage,
3332
const String & absolute_path,
3433
Int64 inherited_sequence_number,
35-
Int64 inherited_snapshot_id);
34+
Int64 inherited_snapshot_id,
35+
std::map<String, ObjectStoragePtr> & secondary_storages);
3636

3737

3838
ManifestFileCacheKeys getManifestList(

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1282,7 +1282,10 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator(
12821282
{
12831283
auto [storage_to_use, key] = resolveObjectStorageForPath("", object->getAbsolutePath().value(), object_storage, secondary_storages, getContext());
12841284
if (!key.empty())
1285+
{
12851286
object->object_storage_to_use = storage_to_use;
1287+
object->relative_path = key;
1288+
}
12861289
}
12871290
buffer.push_back(object);
12881291
}
@@ -1308,7 +1311,11 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator
13081311

13091312
object_info = raw->getObjectInfo();
13101313

1311-
if (raw->absolute_path.has_value())
1314+
if (object_info->getObjectStorage().has_value())
1315+
{
1316+
1317+
}
1318+
else if (raw->absolute_path.has_value())
13121319
{
13131320
auto [storage_to_use, key]
13141321
= resolveObjectStorageForPath("", raw->absolute_path.value(), object_storage, secondary_storages, getContext());

src/Storages/ObjectStorage/Utils.cpp

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
#include <Disks/ObjectStorages/ObjectStorageFactory.h>
1313
#include <Poco/Util/MapConfiguration.h>
1414
#include <IO/S3/URI.h>
15+
#if USE_AWS_S3
16+
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
17+
#endif
1518

1619

1720
namespace DB
@@ -54,6 +57,7 @@ static std::string factoryTypeForScheme(const std::string & normalized_scheme)
5457

5558
}
5659

60+
// TODO: handle https://s3.amazonaws.com/bucketname/... properly
5761
SchemeAuthorityKey::SchemeAuthorityKey(const std::string & uri)
5862
{
5963
if (uri.empty())
@@ -304,34 +308,65 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
304308
const std::string base_norm = normalizeSchema(base.scheme);
305309
const std::string target_norm = normalizeSchema(target.scheme);
306310

307-
// For S3 URIs, use S3::URI to properly URLs: https://s3.amazonaws.com/.....
311+
// For S3 URIs, use S3::URI to properly handle all kinds of URIs, e.g. https://s3.amazonaws.com/bucket/... == s3://bucket/...
308312
#if USE_AWS_S3
309313
if (target_norm == "s3" || target_norm == "https" || target_norm == "http")
310314
{
311315
try
312316
{
313317
S3::URI s3_uri(path);
314318

315-
if (base_norm == "s3" || base_norm == "https" || base_norm == "http")
319+
// Check if base storage is S3 and matches the path's bucket/endpoint
320+
bool use_base_storage = false;
321+
if (base_storage->getType() == ObjectStorageType::S3)
322+
{
323+
#if USE_AWS_S3
324+
if (auto s3_storage = std::dynamic_pointer_cast<S3ObjectStorage>(base_storage))
325+
{
326+
const std::string base_bucket = s3_storage->getObjectsNamespace();
327+
const std::string base_endpoint = s3_storage->getDescription();
328+
329+
// If bucket matches, use base storage. For s3:// URIs (which don't have explicit endpoint),
330+
// we should use base storage if bucket matches, regardless of endpoint.
331+
// For explicit http(s):// URIs, we require both bucket and endpoint to match.
332+
bool bucket_matches = (s3_uri.bucket == base_bucket);
333+
bool endpoint_matches = (s3_uri.endpoint == base_endpoint);
334+
bool is_generic_s3_uri = (target_norm == "s3");
335+
336+
if (bucket_matches && (endpoint_matches || is_generic_s3_uri))
337+
{
338+
use_base_storage = true;
339+
}
340+
}
341+
#endif
342+
}
343+
344+
// Also check if table_location is provided and matches
345+
if (!use_base_storage && (base_norm == "s3" || base_norm == "https" || base_norm == "http"))
316346
{
317347
S3::URI base_s3_uri(table_location);
318348

319-
if (s3_uri.bucket == base_s3_uri.bucket && s3_uri.endpoint == base_s3_uri.endpoint)
349+
// For s3:// URIs, match by bucket only. For explicit http(s):// URIs, require both bucket and endpoint.
350+
bool bucket_matches = (s3_uri.bucket == base_s3_uri.bucket);
351+
bool endpoint_matches = (s3_uri.endpoint == base_s3_uri.endpoint);
352+
bool is_generic_s3_uri = (target_norm == "s3");
353+
354+
if (bucket_matches && (endpoint_matches || is_generic_s3_uri))
320355
{
321-
return {base_storage, s3_uri.key};
356+
use_base_storage = true;
322357
}
323358
}
359+
360+
if (use_base_storage)
361+
return {base_storage, s3_uri.key};
324362

325363
const std::string cache_key = "s3://" + s3_uri.bucket + "@" + (s3_uri.endpoint.empty() ? "amazonaws.com" : s3_uri.endpoint);
326364

327365
if (auto it = secondary_storages.find(cache_key); it != secondary_storages.end())
328-
{
329366
return {it->second, s3_uri.key};
330-
}
331367

332368
/// TODO: maybe do not invent new configuration. Use old one and clean up later
333369
Poco::AutoPtr<Poco::Util::MapConfiguration> cfg(new Poco::Util::MapConfiguration);
334-
335370
const std::string config_prefix = "object_storages." + cache_key;
336371

337372
cfg->setString(config_prefix + ".object_storage_type", "s3");
@@ -342,14 +377,41 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
342377
: s3_uri.endpoint;
343378
cfg->setString(config_prefix + ".endpoint", endpoint);
344379

380+
// Copy credentials from base storage if it's also S3
381+
if (base_storage->getType() == ObjectStorageType::S3)
382+
{
383+
#if USE_AWS_S3
384+
if (auto s3_storage = std::dynamic_pointer_cast<S3ObjectStorage>(base_storage))
385+
{
386+
if (auto s3_client = s3_storage->tryGetS3StorageClient())
387+
{
388+
const auto credentials = s3_client->getCredentials();
389+
const std::string access_key_id = credentials.GetAWSAccessKeyId();
390+
const std::string secret_access_key = credentials.GetAWSSecretKey();
391+
const std::string session_token = credentials.GetSessionToken();
392+
const std::string region = s3_client->getRegion();
393+
394+
if (!access_key_id.empty())
395+
cfg->setString(config_prefix + ".access_key_id", access_key_id);
396+
if (!secret_access_key.empty())
397+
cfg->setString(config_prefix + ".secret_access_key", secret_access_key);
398+
if (!session_token.empty())
399+
cfg->setString(config_prefix + ".session_token", session_token);
400+
if (!region.empty())
401+
cfg->setString(config_prefix + ".region", region);
402+
}
403+
}
404+
#endif
405+
}
406+
345407
auto & factory = DB::ObjectStorageFactory::instance();
346408

347409
DB::ObjectStoragePtr storage = factory.create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true);
348410

349411
secondary_storages.emplace(cache_key, storage);
350412
return {storage, s3_uri.key};
351413
}
352-
catch (...)
414+
catch (const Exception &) // NOLINT
353415
{
354416
// If S3::URI parsing fails, fall back to the old logic
355417
}

0 commit comments

Comments
 (0)