Skip to content

Commit 93296ec

Browse files
committed
store per-file object_storage_ptr in object_info
tmp upd upd
1 parent 8112d64 commit 93296ec

File tree

6 files changed

+46
-44
lines changed

6 files changed

+46
-44
lines changed

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,16 @@ struct PathWithMetadata
125125
std::optional<ObjectMetadata> metadata;
126126
CommandInTaskResponse command;
127127
String absolute_path;
128+
std::optional<ObjectStoragePtr> object_storage_to_use = std::nullopt;
128129

129130
PathWithMetadata() = default;
130131

131-
explicit PathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt, String absolute_path_ = "")
132+
explicit PathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt, String absolute_path_ = "", std::optional<ObjectStoragePtr> object_storage_to_use_ = std::nullopt)
132133
: metadata(std::move(metadata_))
133134
, command(task_string)
135+
, absolute_path(absolute_path_)
136+
, object_storage_to_use(object_storage_to_use_)
134137
{
135-
absolute_path = absolute_path_;
136138
if (!command.is_parsed())
137139
{
138140
relative_path = task_string;
@@ -152,6 +154,8 @@ struct PathWithMetadata
152154

153155
void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file);
154156
const CommandInTaskResponse & getCommand() const { return command; }
157+
158+
std::optional<ObjectStoragePtr> getObjectStorage() const { return object_storage_to_use; }
155159
};
156160

157161
struct ObjectKeyWithMetadata

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
356356
IDataLakeMetadata * getExternalMetadata() override { return getImpl().getExternalMetadata(); }
357357

358358
std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(ContextPtr context, const String & path) const override
359-
{ return getImpl().getInitialSchemaByPath(context, path); }
359+
{ return getImpl().getInitialSchemaByPath(context, path); }
360360

361361
std::shared_ptr<const ActionsDAG> getSchemaTransformer(ContextPtr context, const String & path) const override
362362
{ return getImpl().getSchemaTransformer(context, path); }

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -842,7 +842,7 @@ std::shared_ptr<NamesAndTypesList> IcebergMetadata::getInitialSchemaByPath(Conte
842842
}
843843

844844
SharedLockGuard lock(mutex);
845-
auto version_if_outdated = getSchemaVersionByFileIfOutdated(Iceberg::makeAbsolutePath(table_location, data_path));
845+
auto version_if_outdated = getSchemaVersionByFileIfOutdated(data_path);
846846
return version_if_outdated.has_value() ? schema_processor.getClickhouseTableSchemaById(version_if_outdated.value()) : nullptr;
847847
}
848848

@@ -856,7 +856,7 @@ std::shared_ptr<const ActionsDAG> IcebergMetadata::getSchemaTransformer(ContextP
856856
}
857857

858858
SharedLockGuard lock(mutex);
859-
auto version_if_outdated = getSchemaVersionByFileIfOutdated(Iceberg::makeAbsolutePath(table_location, data_path));
859+
auto version_if_outdated = getSchemaVersionByFileIfOutdated(data_path);
860860
return version_if_outdated.has_value()
861861
? schema_processor.getSchemaTransformationDagByIds(version_if_outdated.value(), relevant_snapshot_schema_id)
862862
: nullptr;
@@ -1327,7 +1327,7 @@ class IcebergKeysIterator : public IObjectIterator
13271327
if (callback)
13281328
callback(FileProgress(0, object_metadata.size_bytes));
13291329

1330-
return std::make_shared<ObjectInfo>(file_uri_parsed.path, std::move(object_metadata), file_uri);
1330+
return std::make_shared<ObjectInfo>(file_uri_parsed.path, std::move(object_metadata), file_uri, storage_to_use);
13311331
}
13321332

13331333
private:

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,15 @@ bool isRelativePath(const std::string & path)
9292
if (!extractStorageType(path).empty())
9393
return false;
9494

95-
// // Non-relative if it starts with '/' (absolute POSIX path)
96-
// if (!path.empty() && path.front() == '/')
97-
// return false;
98-
9995
return true;
10096
}
10197

10298

10399
UriParts parseUri(const std::string & uri)
104100
{
101+
if (uri.empty())
102+
return {};
103+
105104
UriParts parts;
106105

107106
// scheme://authority/path
@@ -123,34 +122,20 @@ UriParts parseUri(const std::string & uri)
123122
return parts;
124123
}
125124

126-
// scheme:/path or file:///path
127-
if (!uri.empty() && uri.front() == '/')
125+
// if part has no scheme and starts with '/' -- it is an absolute uri for local file: file:///path
126+
if (uri.front() == '/')
128127
{
128+
parts.scheme = "file";
129129
parts.path = std::string(uri);
130130
return parts;
131131
}
132132

133-
// Best-effort for "scheme:..." without //
134-
auto colon = uri.find(':');
135-
auto slash = uri.find('/');
136-
if (colon != std::string_view::npos && (slash == std::string_view::npos || colon < slash))
137-
{
138-
parts.scheme = Poco::toLower(uri.substr(0, colon));
139-
auto after = uri.substr(colon + 1);
140-
// file:/path or file:///path -> treat as absolute path
141-
if (!after.empty() && after.front() == '/')
142-
parts.path = std::string(after);
143-
else
144-
parts.path = std::string(after);
145-
return parts;
146-
}
147-
148-
// Relative or plain path
133+
// Relative path, return as is
149134
parts.path = std::string(uri);
150135
return parts;
151136
}
152137

153-
std::string makeAbsolutePath(const std::string & table_location, const std::string & path)
138+
std::string makeAbsolutePath(const std::string & table_location, const std::string & path, [[maybe_unused]] bool is_s3_key)
154139
{
155140
if (!isRelativePath(path))
156141
return path;

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ struct UriParts
3434

3535
UriParts parseUri(const std::string & uri);
3636

37-
std::string makeAbsolutePath(const std::string & table_location, const std::string & path);
37+
std::string makeAbsolutePath(const std::string & table_location, const std::string & path, bool is_s3_key = false);
3838

3939
}
4040

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,8 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
472472
ObjectInfoPtr object_info;
473473
auto query_settings = configuration->getQuerySettings(context_);
474474

475+
ObjectStoragePtr storage_to_use = object_storage;
476+
475477
bool not_a_path = false;
476478

477479
do
@@ -499,7 +501,10 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
499501
if (object_info->getPath().empty())
500502
return {};
501503

502-
object_info->loadMetadata(object_storage, query_settings.ignore_non_existent_file);
504+
if (auto storage_for_file = object_info->getObjectStorage(); storage_for_file.has_value())
505+
storage_to_use = storage_for_file.value();
506+
507+
object_info->loadMetadata(storage_to_use, query_settings.ignore_non_existent_file);
503508
}
504509
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));
505510

@@ -552,7 +557,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
552557
else
553558
{
554559
compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->getCompressionMethod());
555-
read_buf = createReadBuffer(*object_info, object_storage, context_, log);
560+
read_buf = createReadBuffer(*object_info, storage_to_use, context_, log);
556561
}
557562

558563
Block initial_header = read_from_format_info.format_header;
@@ -644,14 +649,16 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
644649
const auto & settings = context_->getSettingsRef();
645650
const auto & effective_read_settings = read_settings.has_value() ? read_settings.value() : context_->getReadSettings();
646651

652+
ObjectStoragePtr storage_to_use = object_info.getObjectStorage().value_or(object_storage);
653+
647654
bool use_distributed_cache = false;
648655
#if ENABLE_DISTRIBUTED_CACHE
649656
ObjectStorageConnectionInfoPtr connection_info;
650657
if (settings[Setting::table_engine_read_through_distributed_cache]
651658
&& DistributedCache::Registry::instance().isReady(
652659
effective_read_settings.distributed_cache_settings.read_only_from_current_az))
653660
{
654-
connection_info = object_storage->getConnectionInfo();
661+
connection_info = storage_to_use->getConnectionInfo();
655662
if (connection_info)
656663
use_distributed_cache = true;
657664
}
@@ -664,15 +671,15 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
664671
filesystem_cache_name = settings[Setting::filesystem_cache_name].value;
665672
use_filesystem_cache = effective_read_settings.enable_filesystem_cache
666673
&& !filesystem_cache_name.empty()
667-
&& (object_storage->getType() == ObjectStorageType::Azure
668-
|| object_storage->getType() == ObjectStorageType::S3);
674+
&& (storage_to_use->getType() == ObjectStorageType::Azure
675+
|| storage_to_use->getType() == ObjectStorageType::S3);
669676
}
670677

671678
/// We need object metadata for two cases:
672679
/// 1. object size suggests whether we need to use prefetch
673680
/// 2. object etag suggests a cache key in case we use filesystem cache
674681
if (!object_info.metadata)
675-
object_info.metadata = object_storage->getObjectMetadata(object_info.getPath());
682+
object_info.metadata = storage_to_use->getObjectMetadata(object_info.getPath());
676683

677684
const auto & object_size = object_info.metadata->size_bytes;
678685

@@ -710,9 +717,9 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
710717
{
711718
const std::string path = object_info.getPath();
712719
StoredObject object(path, "", object_size);
713-
auto read_buffer_creator = [object, nested_buffer_read_settings, object_storage]()
720+
auto read_buffer_creator = [object, nested_buffer_read_settings, storage_to_use]()
714721
{
715-
return object_storage->readObject(object, nested_buffer_read_settings);
722+
return storage_to_use->readObject(object, nested_buffer_read_settings);
716723
};
717724

718725
impl = std::make_unique<ReadBufferFromDistributedCache>(
@@ -745,9 +752,9 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
745752
const auto cache_key = FileCacheKey::fromKey(hash.get128());
746753
auto cache = FileCacheFactory::instance().get(filesystem_cache_name);
747754

748-
auto read_buffer_creator = [path = object_info.getPath(), object_size, modified_read_settings, object_storage]()
755+
auto read_buffer_creator = [path = object_info.getPath(), object_size, modified_read_settings, storage_to_use]()
749756
{
750-
return object_storage->readObject(StoredObject(path, "", object_size), modified_read_settings);
757+
return storage_to_use->readObject(StoredObject(path, "", object_size), modified_read_settings);
751758
};
752759

753760
modified_read_settings.filesystem_cache_boundary_alignment = settings[Setting::filesystem_cache_boundary_alignment];
@@ -777,7 +784,7 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
777784
}
778785

779786
if (!impl)
780-
impl = object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), modified_read_settings);
787+
impl = storage_to_use->readObject(StoredObject(object_info.getPath(), "", object_size), modified_read_settings);
781788

782789
if (!use_async_buffer)
783790
return impl;
@@ -1157,7 +1164,7 @@ StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr o
11571164
/* path_to_archive */object_info->getPath(),
11581165
/* archive_read_function */[=, this]()
11591166
{
1160-
return StorageObjectStorageSource::createReadBuffer(*object_info, object_storage, getContext(), log);
1167+
return StorageObjectStorageSource::createReadBuffer(*object_info, object_info->getObjectStorage().value_or(object_storage), getContext(), log);
11611168
},
11621169
/* archive_size */size);
11631170
}
@@ -1177,7 +1184,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor
11771184
return {};
11781185

11791186
if (!archive_object->metadata)
1180-
archive_object->metadata = object_storage->getObjectMetadata(archive_object->getPath());
1187+
{
1188+
ObjectStoragePtr storage_to_use = archive_object->getObjectStorage().value_or(object_storage);
1189+
archive_object->metadata = storage_to_use->getObjectMetadata(archive_object->getPath());
1190+
}
11811191

11821192
archive_reader = createArchiveReader(archive_object);
11831193
file_enumerator = archive_reader->firstFile();
@@ -1202,7 +1212,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor
12021212
return {};
12031213

12041214
if (!archive_object->metadata)
1205-
archive_object->metadata = object_storage->getObjectMetadata(archive_object->getPath());
1215+
{
1216+
ObjectStoragePtr storage_to_use = archive_object->getObjectStorage().value_or(object_storage);
1217+
archive_object->metadata = storage_to_use->getObjectMetadata(archive_object->getPath());
1218+
}
12061219

12071220
archive_reader = createArchiveReader(archive_object);
12081221
if (!archive_reader->fileExists(path_in_archive))

0 commit comments

Comments
 (0)