@@ -866,7 +866,12 @@ std::optional<Int32> IcebergMetadata::getSchemaVersionByFileIfOutdated(String da
866866{
867867 auto schema_id_it = schema_id_by_data_file.find (data_path);
868868 if (schema_id_it == schema_id_by_data_file.end ())
869- throw Exception (ErrorCodes::BAD_ARGUMENTS, " Cannot find manifest file for data file: {}" , data_path);
869+ {
870+ std::string error_msg = " " ;
871+ for (const auto & sch : schema_id_by_data_file)
872+ error_msg += " Schema id: " + std::to_string (sch.second ) + " for file: " + sch.first + " \n " ;
873+ throw Exception (ErrorCodes::BAD_ARGUMENTS, " Cannot find manifest file for data file: {}.\n Contents:\n {}" , data_path, error_msg);
874+ }
870875
871876 auto schema_id = schema_id_it->second ;
872877 if (schema_id == relevant_snapshot_schema_id)
@@ -908,7 +913,7 @@ void IcebergMetadata::initializeSchemasFromManifestList(ContextPtr local_context
908913 for (const auto & manifest_file_entry : manifest_file_ptr->getFiles ())
909914 {
910915 if (std::holds_alternative<DataFileEntry>(manifest_file_entry.file ))
911- schema_id_by_data_file.emplace (std::get<DataFileEntry>(manifest_file_entry.file ).file_name , manifest_file_ptr->getSchemaId ());
916+ schema_id_by_data_file.emplace (Iceberg::makeAbsolutePath (table_location, std::get<DataFileEntry>(manifest_file_entry.file ).file_name ) , manifest_file_ptr->getSchemaId ());
912917 }
913918 }
914919
@@ -921,56 +926,32 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
921926 if (configuration_ptr == nullptr )
922927 throw Exception (ErrorCodes::LOGICAL_ERROR, " Configuration is expired" );
923928
929+ const String full_filename = Iceberg::makeAbsolutePath (table_location, filename);
930+
924931 auto create_fn = [&]()
925932 {
926- auto base_table_uri_parsed = Iceberg::parseUri (table_location);
927- auto file_uri_parsed = Iceberg::parseUri (filename);
928-
929- ObjectStoragePtr storage_to_use = object_storage;
930- String key_or_path = filename;
931-
932- if (!file_uri_parsed.scheme .empty () && !base_table_uri_parsed.scheme .empty () && file_uri_parsed.scheme == base_table_uri_parsed.scheme )
933- {
934- if (file_uri_parsed.authority == base_table_uri_parsed.authority )
935- {
936- // Same namespace as table_location -> use primary storage and strip leading '/'
937- key_or_path = file_uri_parsed.path ;
938- if (!key_or_path.empty () && key_or_path.front () == ' /' )
939- key_or_path.erase (0 , 1 );
940- }
941- else if (!file_uri_parsed.scheme .empty () && file_uri_parsed.scheme == base_table_uri_parsed.scheme && !file_uri_parsed.authority .empty ())
942- {
943- // Same scheme, different authority (e.g. another S3 bucket) -> clone storage for that authority
944- std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage (
945- file_uri_parsed.authority , local_context->getConfigRef (), configuration_ptr->getTypeName () + " ." , local_context);
946- storage_to_use = ObjectStoragePtr (cloned.release ());
947-
948- key_or_path = file_uri_parsed.path ;
949- if (!key_or_path.empty () && key_or_path.front () == ' /' )
950- key_or_path.erase (0 , 1 );
951- }
952- }
953- // TODO: what if storage type is different?
933+ auto [storage_to_use, key] = Iceberg::resolveObjectStorageForPath (
934+ table_location, filename, object_storage, secondary_storages, local_context);
954935
955- StorageObjectStorage::ObjectInfo object_info (key_or_path , std::nullopt , filename );
936+ StorageObjectStorage::ObjectInfo object_info (key , std::nullopt , full_filename );
956937
957938 auto read_settings = local_context->getReadSettings ();
958939 // / Do not utilize filesystem cache if more precise cache enabled
959940 if (manifest_cache)
960941 read_settings.enable_filesystem_cache = false ;
961942
962943 auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer (object_info, storage_to_use, local_context, log, read_settings);
963- AvroForIcebergDeserializer manifest_list_deserializer (std::move (manifest_list_buf), file_uri_parsed. path , getFormatSettings (local_context));
944+ AvroForIcebergDeserializer manifest_list_deserializer (std::move (manifest_list_buf), key , getFormatSettings (local_context));
964945
965946 ManifestFileCacheKeys manifest_file_cache_keys;
966947
967948 for (size_t i = 0 ; i < manifest_list_deserializer.rows (); ++i)
968949 {
969- const std::string manifest_file_name = manifest_list_deserializer.getValueFromRowByName (i, f_manifest_path, TypeIndex::String).safeGet <std::string>();
950+ const std::string manifest_file_path = Iceberg::makeAbsolutePath (table_location, manifest_list_deserializer.getValueFromRowByName (i, f_manifest_path, TypeIndex::String).safeGet <std::string>() );
970951 Int64 added_sequence_number = 0 ;
971952 if (format_version > 1 )
972953 added_sequence_number = manifest_list_deserializer.getValueFromRowByName (i, f_sequence_number, TypeIndex::Int64).safeGet <Int64>();
973- manifest_file_cache_keys.emplace_back (manifest_file_name , added_sequence_number);
954+ manifest_file_cache_keys.emplace_back (manifest_file_path , added_sequence_number);
974955 }
975956 // / We only return the list of {file name, seq number} for cache.
976957 // / Because ManifestList holds a list of ManifestFilePtr which consume much memory space.
@@ -980,7 +961,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
980961
981962 ManifestFileCacheKeys manifest_file_cache_keys;
982963 if (manifest_cache)
983- manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys (IcebergMetadataFilesCache::getKey (configuration_ptr, filename ), create_fn);
964+ manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys (IcebergMetadataFilesCache::getKey (configuration_ptr, full_filename ), create_fn);
984965 else
985966 manifest_file_cache_keys = create_fn ();
986967 return manifest_file_cache_keys;
@@ -1080,46 +1061,25 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
10801061{
10811062 auto configuration_ptr = configuration.lock ();
10821063
1064+ const String full_filename = Iceberg::makeAbsolutePath (table_location, filename);
1065+
10831066 auto create_fn = [&]()
10841067 {
1085- auto base_table_uri_parsed = Iceberg::parseUri (table_location);
1086- auto file_uri_parsed = Iceberg::parseUri (filename);
1068+ // Select proper storage and key for the manifest file
1069+ auto [storage_to_use, key] = Iceberg::resolveObjectStorageForPath (
1070+ table_location, full_filename, object_storage, secondary_storages, local_context);
10871071
1088- ObjectStoragePtr storage_to_use = object_storage;
1089- String key_or_path = filename;
10901072
1091- if (!file_uri_parsed.scheme .empty () && !base_table_uri_parsed.scheme .empty () && file_uri_parsed.scheme == base_table_uri_parsed.scheme )
1092- {
1093- if (file_uri_parsed.authority == base_table_uri_parsed.authority )
1094- {
1095- // Same namespace as table_location -> use primary storage and strip leading '/'
1096- key_or_path = file_uri_parsed.path ;
1097- if (!key_or_path.empty () && key_or_path.front () == ' /' )
1098- key_or_path.erase (0 , 1 );
1099- }
1100- else if (!file_uri_parsed.scheme .empty () && file_uri_parsed.scheme == base_table_uri_parsed.scheme && !file_uri_parsed.authority .empty ())
1101- {
1102- // Same scheme, different authority (e.g. another S3 bucket) -> clone storage for that authority
1103- std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage (
1104- file_uri_parsed.authority , local_context->getConfigRef (), configuration_ptr->getTypeName () + " ." , local_context);
1105- storage_to_use = ObjectStoragePtr (cloned.release ());
1106-
1107- key_or_path = file_uri_parsed.path ;
1108- if (!key_or_path.empty () && key_or_path.front () == ' /' )
1109- key_or_path.erase (0 , 1 );
1110- }
1111- }
1112-
1113- ObjectInfo manifest_object_info (key_or_path, std::nullopt , filename);
1073+ ObjectInfo manifest_object_info (key, std::nullopt , full_filename);
11141074
11151075 auto read_settings = local_context->getReadSettings ();
11161076 // / Do not utilize filesystem cache if more precise cache enabled
11171077 if (manifest_cache)
11181078 read_settings.enable_filesystem_cache = false ;
11191079
11201080 auto buffer = StorageObjectStorageSource::createReadBuffer (manifest_object_info, storage_to_use, local_context, log, read_settings);
1121- AvroForIcebergDeserializer manifest_file_deserializer (std::move (buffer), filename , getFormatSettings (local_context));
1122- auto [schema_id, schema_object] = parseTableSchemaFromManifestFile (manifest_file_deserializer, filename );
1081+ AvroForIcebergDeserializer manifest_file_deserializer (std::move (buffer), full_filename , getFormatSettings (local_context));
1082+ auto [schema_id, schema_object] = parseTableSchemaFromManifestFile (manifest_file_deserializer, full_filename );
11231083 schema_processor.addIcebergTableSchema (schema_object);
11241084 return std::make_shared<ManifestFileContent>(
11251085 manifest_file_deserializer,
@@ -1128,12 +1088,13 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
11281088 schema_object,
11291089 schema_processor,
11301090 inherited_sequence_number,
1091+ table_location,
11311092 local_context);
11321093 };
11331094
11341095 if (manifest_cache)
11351096 {
1136- auto manifest_file = manifest_cache->getOrSetManifestFile (IcebergMetadataFilesCache::getKey (configuration_ptr, filename ), create_fn);
1097+ auto manifest_file = manifest_cache->getOrSetManifestFile (IcebergMetadataFilesCache::getKey (configuration_ptr, full_filename ), create_fn);
11371098 schema_processor.addIcebergTableSchema (manifest_file->getSchemaObject ());
11381099 return manifest_file;
11391100 }
@@ -1275,12 +1236,14 @@ class IcebergKeysIterator : public IObjectIterator
12751236public:
12761237 IcebergKeysIterator (
12771238 Strings && data_files_,
1239+ const std::string & table_location_,
12781240 ObjectStoragePtr object_storage_,
12791241 std::map<String, ObjectStoragePtr> & secondary_storages_,
12801242 IDataLakeMetadata::FileProgressCallback callback_,
12811243 ContextPtr local_context_,
12821244 StorageObjectStorage::ConfigurationPtr configuration_ptr_)
12831245 : data_files(data_files_)
1246+ , table_location(table_location_)
12841247 , object_storage(object_storage_)
12851248 , secondary_storages(secondary_storages_)
12861249 , callback(callback_)
@@ -1300,33 +1263,23 @@ class IcebergKeysIterator : public IObjectIterator
13001263 if (current_index >= data_files.size ())
13011264 return nullptr ;
13021265
1303- auto file_uri = data_files[current_index];
1304- auto file_uri_parsed = Iceberg::parseUri (file_uri);
1305-
1306- ObjectStoragePtr storage_to_use;
1266+ const auto & raw_path = data_files[current_index];
13071267
1308- if (file_uri_parsed.authority == object_storage->getObjectsNamespace ())
1309- storage_to_use = object_storage;
1310- else if (secondary_storages.contains (file_uri_parsed.authority ))
1311- storage_to_use = secondary_storages.at (file_uri_parsed.authority );
1312- else
1313- {
1314- std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage (
1315- file_uri_parsed.authority , local_context->getConfigRef (), configuration_ptr->getTypeName () + " ." , local_context);
1316- storage_to_use = ObjectStoragePtr (cloned.release ());
1317- secondary_storages[file_uri_parsed.authority ] = storage_to_use;
1318- }
1268+ // Route to correct storage
1269+ auto [storage_to_use, key] = Iceberg::resolveObjectStorageForPath (
1270+ table_location, raw_path, object_storage, secondary_storages, local_context);
13191271
1320- auto object_metadata = storage_to_use->getObjectMetadata (file_uri_parsed. path );
1272+ auto object_metadata = storage_to_use->getObjectMetadata (key );
13211273
13221274 if (callback)
13231275 callback (FileProgress (0 , object_metadata.size_bytes ));
13241276
1325- return std::make_shared<ObjectInfo>(file_uri_parsed. path , std::move (object_metadata), file_uri );
1277+ return std::make_shared<ObjectInfo>(key , std::move (object_metadata), raw_path, storage_to_use );
13261278 }
13271279
13281280private:
13291281 Strings data_files;
1282+ const String table_location;
13301283 ObjectStoragePtr object_storage;
13311284 std::map<String, ObjectStoragePtr> & secondary_storages;
13321285 std::atomic<size_t > index = 0 ;
@@ -1342,7 +1295,7 @@ ObjectIterator IcebergMetadata::createIcebergKeysIterator(
13421295 IDataLakeMetadata::FileProgressCallback callback_,
13431296 ContextPtr local_context)
13441297{
1345- return std::make_shared<IcebergKeysIterator>(std::move (data_files_), object_storage, secondary_storages, callback_, local_context, configuration.lock ());
1298+ return std::make_shared<IcebergKeysIterator>(std::move (data_files_), table_location, object_storage, secondary_storages, callback_, local_context, configuration.lock ());
13461299}
13471300
13481301ObjectIterator IcebergMetadata::iterate (
0 commit comments