Skip to content

Commit 708d59b

Browse files
authored
Merge branch 'antalya-25.8' into frontport/antalya-25.8/iceberg_features
2 parents cc32d53 + 70fba7c commit 708d59b

21 files changed

+785
-18
lines changed

programs/server/Server.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
#include <Storages/System/attachSystemTables.h>
8484
#include <Storages/System/attachInformationSchemaTables.h>
8585
#include <Storages/Cache/registerRemoteFileMetadatas.h>
86+
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
8687
#include <AggregateFunctions/registerAggregateFunctions.h>
8788
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
8889
#include <Functions/registerFunctions.h>
@@ -342,6 +343,9 @@ namespace ServerSetting
342343
extern const ServerSettingsBool abort_on_logical_error;
343344
extern const ServerSettingsUInt64 jemalloc_flush_profile_interval_bytes;
344345
extern const ServerSettingsBool jemalloc_flush_profile_on_memory_exceeded;
346+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;
347+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
348+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
345349
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
346350
}
347351

@@ -353,6 +357,9 @@ namespace ErrorCodes
353357
namespace FileCacheSetting
354358
{
355359
extern const FileCacheSettingsBool load_metadata_asynchronously;
360+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
361+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
362+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;
356363
}
357364

358365
}
@@ -2527,6 +2534,10 @@ try
25272534
if (dns_cache_updater)
25282535
dns_cache_updater->start();
25292536

2537+
ObjectStorageListObjectsCache::instance().setMaxSizeInBytes(server_settings[ServerSetting::object_storage_list_objects_cache_size]);
2538+
ObjectStorageListObjectsCache::instance().setMaxCount(server_settings[ServerSetting::object_storage_list_objects_cache_max_entries]);
2539+
ObjectStorageListObjectsCache::instance().setTTL(server_settings[ServerSetting::object_storage_list_objects_cache_ttl]);
2540+
25302541
auto replicas_reconnector = ReplicasReconnector::init(global_context);
25312542

25322543
#if USE_PARQUET

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ enum class AccessType : uint8_t
320320
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
321321
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
322322
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
323+
M(SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, "SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
323324
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
324325
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
325326
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \

src/Common/ProfileEvents.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,10 @@ The server successfully detected this situation and will download merged part fr
11391139
M(AsyncLoggingErrorFileLogDroppedMessages, "How many messages have been dropped from error file log due to the async log queue being full", ValueType::Number) \
11401140
M(AsyncLoggingSyslogDroppedMessages, "How many messages have been dropped from the syslog due to the async log queue being full", ValueType::Number) \
11411141
M(AsyncLoggingTextLogDroppedMessages, "How many messages have been dropped from text_log due to the async log queue being full", ValueType::Number) \
1142+
M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \
1143+
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
1144+
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
1145+
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \
11421146
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
11431147
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
11441148

src/Common/TTLCachePolicy.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,10 @@ class TTLCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunc
271271
return res;
272272
}
273273

274-
private:
274+
protected:
275275
using Cache = std::unordered_map<Key, MappedPtr, HashFunction>;
276276
Cache cache;
277-
277+
private:
278278
/// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators
279279
/// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a
280280
/// binary search on the sorted container and erase all left of the found key.

src/Core/ServerSettings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,9 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
11391139
DECLARE(UInt64, threadpool_local_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from local filesystem.)", 0) \
11401140
DECLARE(NonZeroUInt64, threadpool_remote_fs_reader_pool_size, 250, R"(Number of threads in the Thread pool used for reading from remote filesystem when `remote_filesystem_read_method = 'threadpool'`.)", 0) \
11411141
DECLARE(UInt64, threadpool_remote_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from remote filesystem.)", 0) \
1142+
DECLARE(UInt64, object_storage_list_objects_cache_size, 500000000, "Maximum size of ObjectStorage list objects cache in bytes. Zero means disabled.", 0) \
1143+
DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \
1144+
DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0) \
11421145
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
11431146
// clang-format on
11441147

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7010,6 +7010,9 @@ Write full paths (including s3://) into iceberg metadata files.
70107010
)", EXPERIMENTAL) \
70117011
DECLARE(String, iceberg_metadata_compression_method, "", R"(
70127012
Method to compress `.metadata.json` file.
7013+
)", EXPERIMENTAL) \
7014+
DECLARE(Bool, use_object_storage_list_objects_cache, false, R"(
7015+
Cache the list of objects returned by list objects calls in object storage
70137016
)", EXPERIMENTAL) \
70147017
DECLARE(Bool, make_distributed_plan, false, R"(
70157018
Make distributed query plan.

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
120120
{"allow_experimental_lightweight_update", false, true, "Lightweight updates were moved to Beta."},
121121
{"s3_slow_all_threads_after_retryable_error", false, false, "Added an alias for setting `backup_slow_all_threads_after_retryable_s3_error`"},
122122
{"iceberg_metadata_log_level", "none", "none", "New setting."},
123+
{"use_object_storage_list_objects_cache", false, false, "New setting."},
123124
});
124125
addSettingsChanges(settings_changes_history, "25.7",
125126
{

src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class AzureObjectStorage : public IObjectStorage
3535
const String & description_,
3636
const String & common_key_prefix_);
3737

38+
bool supportsListObjectsCache() override { return true; }
39+
3840
void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override;
3941

4042
/// Sanitizer build may crash with max_keys=1; this looks like a false positive.

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,14 @@ class IObjectStorage
365365
}
366366
virtual std::shared_ptr<const S3::Client> tryGetS3StorageClient() { return nullptr; }
367367
#endif
368+
369+
370+
virtual bool supportsListObjectsCache() { return false; }
371+
372+
private:
373+
mutable std::mutex throttlers_mutex;
374+
ThrottlerPtr remote_read_throttler;
375+
ThrottlerPtr remote_write_throttler;
368376
};
369377

370378
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;

src/Disks/ObjectStorages/S3/S3ObjectStorage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class S3ObjectStorage : public IObjectStorage
6161

6262
ObjectStorageType getType() const override { return ObjectStorageType::S3; }
6363

64+
bool supportsListObjectsCache() override { return true; }
65+
6466
bool exists(const StoredObject & object) const override;
6567

6668
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT

0 commit comments

Comments
 (0)