Skip to content

Commit d7ae54a

Browse files
authored
Merge branch 'antalya-25.8' into fp_antalya_25_8_export_mt_part
2 parents 5980064 + 62d227b commit d7ae54a

21 files changed

+785
-18
lines changed

programs/server/Server.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
#include <Storages/System/attachSystemTables.h>
8484
#include <Storages/System/attachInformationSchemaTables.h>
8585
#include <Storages/Cache/registerRemoteFileMetadatas.h>
86+
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
8687
#include <AggregateFunctions/registerAggregateFunctions.h>
8788
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
8889
#include <Functions/registerFunctions.h>
@@ -342,6 +343,9 @@ namespace ServerSetting
342343
extern const ServerSettingsBool abort_on_logical_error;
343344
extern const ServerSettingsUInt64 jemalloc_flush_profile_interval_bytes;
344345
extern const ServerSettingsBool jemalloc_flush_profile_on_memory_exceeded;
346+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;
347+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
348+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
345349
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
346350
}
347351

@@ -353,6 +357,9 @@ namespace ErrorCodes
353357
namespace FileCacheSetting
354358
{
355359
extern const FileCacheSettingsBool load_metadata_asynchronously;
360+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
361+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
362+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;
356363
}
357364

358365
}
@@ -2527,6 +2534,10 @@ try
25272534
if (dns_cache_updater)
25282535
dns_cache_updater->start();
25292536

2537+
ObjectStorageListObjectsCache::instance().setMaxSizeInBytes(server_settings[ServerSetting::object_storage_list_objects_cache_size]);
2538+
ObjectStorageListObjectsCache::instance().setMaxCount(server_settings[ServerSetting::object_storage_list_objects_cache_max_entries]);
2539+
ObjectStorageListObjectsCache::instance().setTTL(server_settings[ServerSetting::object_storage_list_objects_cache_ttl]);
2540+
25302541
auto replicas_reconnector = ReplicasReconnector::init(global_context);
25312542

25322543
#if USE_PARQUET

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ enum class AccessType : uint8_t
321321
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
322322
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
323323
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
324+
M(SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, "SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
324325
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
325326
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
326327
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \

src/Common/ProfileEvents.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,6 +1145,10 @@ The server successfully detected this situation and will download merged part fr
11451145
M(AsyncLoggingErrorFileLogDroppedMessages, "How many messages have been dropped from error file log due to the async log queue being full", ValueType::Number) \
11461146
M(AsyncLoggingSyslogDroppedMessages, "How many messages have been dropped from the syslog due to the async log queue being full", ValueType::Number) \
11471147
M(AsyncLoggingTextLogDroppedMessages, "How many messages have been dropped from text_log due to the async log queue being full", ValueType::Number) \
1148+
M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \
1149+
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
1150+
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
1151+
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \
11481152
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
11491153
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
11501154

src/Common/TTLCachePolicy.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,10 @@ class TTLCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunc
271271
return res;
272272
}
273273

274-
private:
274+
protected:
275275
using Cache = std::unordered_map<Key, MappedPtr, HashFunction>;
276276
Cache cache;
277-
277+
private:
278278
/// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators
279279
/// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a
280280
/// binary search on the sorted container and erase all left of the found key.

src/Core/ServerSettings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,6 +1140,9 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
11401140
DECLARE(UInt64, threadpool_local_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from local filesystem.)", 0) \
11411141
DECLARE(NonZeroUInt64, threadpool_remote_fs_reader_pool_size, 250, R"(Number of threads in the Thread pool used for reading from remote filesystem when `remote_filesystem_read_method = 'threadpool'`.)", 0) \
11421142
DECLARE(UInt64, threadpool_remote_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from remote filesystem.)", 0) \
1143+
DECLARE(UInt64, object_storage_list_objects_cache_size, 500000000, "Maximum size of ObjectStorage list objects cache in bytes. Zero means disabled.", 0) \
1144+
DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \
1145+
DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0) \
11431146
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
11441147
// clang-format on
11451148

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7013,6 +7013,9 @@ Write full paths (including s3://) into iceberg metadata files.
70137013
)", EXPERIMENTAL) \
70147014
DECLARE(String, iceberg_metadata_compression_method, "", R"(
70157015
Method to compress `.metadata.json` file.
7016+
)", EXPERIMENTAL) \
7017+
DECLARE(Bool, use_object_storage_list_objects_cache, false, R"(
7018+
Cache the list of objects returned by list objects calls in object storage
70167019
)", EXPERIMENTAL) \
70177020
DECLARE(Bool, make_distributed_plan, false, R"(
70187021
Make distributed query plan.

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
119119
{"allow_experimental_lightweight_update", false, true, "Lightweight updates were moved to Beta."},
120120
{"s3_slow_all_threads_after_retryable_error", false, false, "Added an alias for setting `backup_slow_all_threads_after_retryable_s3_error`"},
121121
{"iceberg_metadata_log_level", "none", "none", "New setting."},
122+
{"use_object_storage_list_objects_cache", false, false, "New setting."},
122123
});
123124
addSettingsChanges(settings_changes_history, "25.7",
124125
{

src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class AzureObjectStorage : public IObjectStorage
3535
const String & description_,
3636
const String & common_key_prefix_);
3737

38+
bool supportsListObjectsCache() override { return true; }
39+
3840
void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override;
3941

4042
/// Sanitizer build may crash with max_keys=1; this looks like a false positive.

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,14 @@ class IObjectStorage
367367
}
368368
virtual std::shared_ptr<const S3::Client> tryGetS3StorageClient() { return nullptr; }
369369
#endif
370+
371+
372+
virtual bool supportsListObjectsCache() { return false; }
373+
374+
private:
375+
mutable std::mutex throttlers_mutex;
376+
ThrottlerPtr remote_read_throttler;
377+
ThrottlerPtr remote_write_throttler;
370378
};
371379

372380
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;

src/Disks/ObjectStorages/S3/S3ObjectStorage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class S3ObjectStorage : public IObjectStorage
6161

6262
ObjectStorageType getType() const override { return ObjectStorageType::S3; }
6363

64+
bool supportsListObjectsCache() override { return true; }
65+
6466
bool exists(const StoredObject & object) const override;
6567

6668
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT

0 commit comments

Comments
 (0)