@@ -842,7 +842,7 @@ std::shared_ptr<NamesAndTypesList> IcebergMetadata::getInitialSchemaByPath(Conte
842842 }
843843
844844 SharedLockGuard lock (mutex);
845- auto version_if_outdated = getSchemaVersionByFileIfOutdated (data_path);
845+ auto version_if_outdated = getSchemaVersionByFileIfOutdated (Iceberg::makeAbsolutePath (table_location, data_path) );
846846 return version_if_outdated.has_value () ? schema_processor.getClickhouseTableSchemaById (version_if_outdated.value ()) : nullptr ;
847847}
848848
@@ -856,7 +856,7 @@ std::shared_ptr<const ActionsDAG> IcebergMetadata::getSchemaTransformer(ContextP
856856 }
857857
858858 SharedLockGuard lock (mutex);
859- auto version_if_outdated = getSchemaVersionByFileIfOutdated (data_path);
859+ auto version_if_outdated = getSchemaVersionByFileIfOutdated (Iceberg::makeAbsolutePath (table_location, data_path) );
860860 return version_if_outdated.has_value ()
861861 ? schema_processor.getSchemaTransformationDagByIds (version_if_outdated.value (), relevant_snapshot_schema_id)
862862 : nullptr ;
@@ -921,38 +921,38 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
921921 if (configuration_ptr == nullptr )
922922 throw Exception (ErrorCodes::LOGICAL_ERROR, " Configuration is expired" );
923923
924+ const String full_filename = Iceberg::makeAbsolutePath (table_location, filename);
925+
924926 auto create_fn = [&]()
925927 {
926928 auto base_table_uri_parsed = Iceberg::parseUri (table_location);
927- auto file_uri_parsed = Iceberg::parseUri (filename );
929+ auto file_uri_parsed = Iceberg::parseUri (full_filename );
928930
929931 ObjectStoragePtr storage_to_use = object_storage;
930- String key_or_path = filename;
932+ String key = filename;
931933
932934 if (!file_uri_parsed.scheme .empty () && !base_table_uri_parsed.scheme .empty () && file_uri_parsed.scheme == base_table_uri_parsed.scheme )
933935 {
934- if (file_uri_parsed.authority == base_table_uri_parsed.authority )
935- {
936- // Same namespace as table_location -> use primary storage and strip leading '/'
937- key_or_path = file_uri_parsed.path ;
938- if (!key_or_path.empty () && key_or_path.front () == ' /' )
939- key_or_path.erase (0 , 1 );
940- }
941- else if (!file_uri_parsed.scheme .empty () && file_uri_parsed.scheme == base_table_uri_parsed.scheme && !file_uri_parsed.authority .empty ())
936+ key = file_uri_parsed.path ;
937+ if (!key.empty () && key.front () == ' /' )
938+ key.erase (0 , 1 );
939+
940+ // Same storage type, but different namespace (location). Clone storage.
941+ if (file_uri_parsed.authority != base_table_uri_parsed.authority )
942942 {
943- // Same scheme, different authority (e.g. another S3 bucket) -> clone storage for that authority
944- std::unique_ptr<IObjectStorage> cloned = object_storage-> cloneObjectStorage (
945- file_uri_parsed. authority , local_context-> getConfigRef (), configuration_ptr-> getTypeName () + " . " , local_context);
946- storage_to_use = ObjectStoragePtr (cloned. release ());
947-
948- key_or_path = file_uri_parsed.path ;
949- if (!key_or_path. empty () && key_or_path. front () == ' / ' )
950- key_or_path. erase ( 0 , 1 );
943+ if (secondary_storages. contains (file_uri_parsed. authority ))
944+ storage_to_use = secondary_storages[file_uri_parsed. authority ];
945+ else
946+ {
947+ std::unique_ptr<IObjectStorage> cloned = object_storage-> cloneObjectStorage (
948+ file_uri_parsed.authority , local_context-> getConfigRef (), configuration_ptr-> getTypeName () + " . " , local_context) ;
949+ storage_to_use = ObjectStoragePtr (cloned. release ());
950+ }
951951 }
952952 }
953953 // TODO: what if storage type is different?
954954
955- StorageObjectStorage::ObjectInfo object_info (key_or_path , std::nullopt , filename );
955+ StorageObjectStorage::ObjectInfo object_info (key , std::nullopt , full_filename );
956956
957957 auto read_settings = local_context->getReadSettings ();
958958 // / Do not utilize filesystem cache if more precise cache enabled
@@ -966,11 +966,11 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
966966
967967 for (size_t i = 0 ; i < manifest_list_deserializer.rows (); ++i)
968968 {
969- const std::string manifest_file_name = manifest_list_deserializer.getValueFromRowByName (i, f_manifest_path, TypeIndex::String).safeGet <std::string>();
969+ const std::string manifest_file_path = Iceberg::makeAbsolutePath (table_location, manifest_list_deserializer.getValueFromRowByName (i, f_manifest_path, TypeIndex::String).safeGet <std::string>() );
970970 Int64 added_sequence_number = 0 ;
971971 if (format_version > 1 )
972972 added_sequence_number = manifest_list_deserializer.getValueFromRowByName (i, f_sequence_number, TypeIndex::Int64).safeGet <Int64>();
973- manifest_file_cache_keys.emplace_back (manifest_file_name , added_sequence_number);
973+ manifest_file_cache_keys.emplace_back (manifest_file_path , added_sequence_number);
974974 }
975975 // / We only return the list of {file name, seq number} for cache.
976976 // / Because ManifestList holds a list of ManifestFilePtr which consume much memory space.
@@ -980,7 +980,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
980980
981981 ManifestFileCacheKeys manifest_file_cache_keys;
982982 if (manifest_cache)
983- manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys (IcebergMetadataFilesCache::getKey (configuration_ptr, filename ), create_fn);
983+ manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys (IcebergMetadataFilesCache::getKey (configuration_ptr, full_filename ), create_fn);
984984 else
985985 manifest_file_cache_keys = create_fn ();
986986 return manifest_file_cache_keys;
@@ -1080,46 +1080,45 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
10801080{
10811081 auto configuration_ptr = configuration.lock ();
10821082
1083+ const String full_filename = Iceberg::makeAbsolutePath (table_location, filename);
1084+
10831085 auto create_fn = [&]()
10841086 {
10851087 auto base_table_uri_parsed = Iceberg::parseUri (table_location);
1086- auto file_uri_parsed = Iceberg::parseUri (filename );
1088+ auto file_uri_parsed = Iceberg::parseUri (full_filename );
10871089
10881090 ObjectStoragePtr storage_to_use = object_storage;
1089- String key_or_path = filename;
1091+ String key = filename;
10901092
10911093 if (!file_uri_parsed.scheme .empty () && !base_table_uri_parsed.scheme .empty () && file_uri_parsed.scheme == base_table_uri_parsed.scheme )
10921094 {
1093- if (file_uri_parsed.authority == base_table_uri_parsed.authority )
1094- {
1095- // Same namespace as table_location -> use primary storage and strip leading '/'
1096- key_or_path = file_uri_parsed.path ;
1097- if (!key_or_path.empty () && key_or_path.front () == ' /' )
1098- key_or_path.erase (0 , 1 );
1099- }
1100- else if (!file_uri_parsed.scheme .empty () && file_uri_parsed.scheme == base_table_uri_parsed.scheme && !file_uri_parsed.authority .empty ())
1095+ key = file_uri_parsed.path ;
1096+ if (!key.empty () && key.front () == ' /' )
1097+ key.erase (0 , 1 );
1098+
1099+ if (file_uri_parsed.authority != base_table_uri_parsed.authority )
11011100 {
1102- // Same scheme, different authority (e.g. another S3 bucket) -> clone storage for that authority
1103- std::unique_ptr<IObjectStorage> cloned = object_storage-> cloneObjectStorage (
1104- file_uri_parsed. authority , local_context-> getConfigRef (), configuration_ptr-> getTypeName () + " . " , local_context);
1105- storage_to_use = ObjectStoragePtr (cloned. release ());
1106-
1107- key_or_path = file_uri_parsed.path ;
1108- if (!key_or_path. empty () && key_or_path. front () == ' / ' )
1109- key_or_path. erase ( 0 , 1 );
1101+ if (secondary_storages. contains (file_uri_parsed. authority ))
1102+ storage_to_use = secondary_storages[file_uri_parsed. authority ];
1103+ else
1104+ {
1105+ std::unique_ptr<IObjectStorage> cloned = object_storage-> cloneObjectStorage (
1106+ file_uri_parsed.authority , local_context-> getConfigRef (), configuration_ptr-> getTypeName () + " . " , local_context) ;
1107+ storage_to_use = ObjectStoragePtr (cloned. release ());
1108+ }
11101109 }
11111110 }
11121111
1113- ObjectInfo manifest_object_info (key_or_path , std::nullopt , filename );
1112+ ObjectInfo manifest_object_info (key , std::nullopt , full_filename );
11141113
11151114 auto read_settings = local_context->getReadSettings ();
11161115 // / Do not utilize filesystem cache if more precise cache enabled
11171116 if (manifest_cache)
11181117 read_settings.enable_filesystem_cache = false ;
11191118
11201119 auto buffer = StorageObjectStorageSource::createReadBuffer (manifest_object_info, storage_to_use, local_context, log, read_settings);
1121- AvroForIcebergDeserializer manifest_file_deserializer (std::move (buffer), filename , getFormatSettings (local_context));
1122- auto [schema_id, schema_object] = parseTableSchemaFromManifestFile (manifest_file_deserializer, filename );
1120+ AvroForIcebergDeserializer manifest_file_deserializer (std::move (buffer), full_filename , getFormatSettings (local_context));
1121+ auto [schema_id, schema_object] = parseTableSchemaFromManifestFile (manifest_file_deserializer, full_filename );
11231122 schema_processor.addIcebergTableSchema (schema_object);
11241123 return std::make_shared<ManifestFileContent>(
11251124 manifest_file_deserializer,
@@ -1128,12 +1127,13 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
11281127 schema_object,
11291128 schema_processor,
11301129 inherited_sequence_number,
1130+ table_location,
11311131 local_context);
11321132 };
11331133
11341134 if (manifest_cache)
11351135 {
1136- auto manifest_file = manifest_cache->getOrSetManifestFile (IcebergMetadataFilesCache::getKey (configuration_ptr, filename ), create_fn);
1136+ auto manifest_file = manifest_cache->getOrSetManifestFile (IcebergMetadataFilesCache::getKey (configuration_ptr, full_filename ), create_fn);
11371137 schema_processor.addIcebergTableSchema (manifest_file->getSchemaObject ());
11381138 return manifest_file;
11391139 }
0 commit comments