Skip to content

Commit 69c3025

Browse files
committed
merge remote
2 parents f790be1 + abb3c0b commit 69c3025

File tree

73 files changed

+2058
-140
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+2058
-140
lines changed

programs/server/Server.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
#include <Storages/System/attachSystemTables.h>
8484
#include <Storages/System/attachInformationSchemaTables.h>
8585
#include <Storages/Cache/registerRemoteFileMetadatas.h>
86+
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
8687
#include <AggregateFunctions/registerAggregateFunctions.h>
8788
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
8889
#include <Functions/registerFunctions.h>
@@ -156,6 +157,10 @@
156157
# include <azure/core/diagnostics/logger.hpp>
157158
#endif
158159

160+
#if USE_PARQUET
161+
# include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
162+
#endif
163+
159164

160165
#include <incbin.h>
161166
/// A minimal file used when the server is run without installation
@@ -326,6 +331,11 @@ namespace ServerSetting
326331
extern const ServerSettingsUInt64 os_cpu_busy_time_threshold;
327332
extern const ServerSettingsFloat min_os_cpu_wait_time_ratio_to_drop_connection;
328333
extern const ServerSettingsFloat max_os_cpu_wait_time_ratio_to_drop_connection;
334+
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
335+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
336+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
337+
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;
338+
329339
}
330340

331341
namespace ErrorCodes
@@ -413,6 +423,7 @@ namespace ErrorCodes
413423
extern const int NETWORK_ERROR;
414424
extern const int CORRUPTED_DATA;
415425
extern const int BAD_ARGUMENTS;
426+
extern const int STARTUP_SCRIPTS_ERROR;
416427
}
417428

418429

@@ -2421,8 +2432,16 @@ try
24212432
if (dns_cache_updater)
24222433
dns_cache_updater->start();
24232434

2435+
ObjectStorageListObjectsCache::instance().setMaxSizeInBytes(server_settings[ServerSetting::object_storage_list_objects_cache_size]);
2436+
ObjectStorageListObjectsCache::instance().setMaxCount(server_settings[ServerSetting::object_storage_list_objects_cache_max_entries]);
2437+
ObjectStorageListObjectsCache::instance().setTTL(server_settings[ServerSetting::object_storage_list_objects_cache_ttl]);
2438+
24242439
auto replicas_reconnector = ReplicasReconnector::init(global_context);
24252440

2441+
#if USE_PARQUET
2442+
ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]);
2443+
#endif
2444+
24262445
/// Set current database name before loading tables and databases because
24272446
/// system logs may copy global context.
24282447
std::string default_database = server_settings[ServerSetting::default_database];

src/Access/Common/AccessType.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ enum class AccessType : uint8_t
184184
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
185185
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
186186
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
187+
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
188+
M(SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, "SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
187189
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
188190
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
189191
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \

src/Common/ProfileEvents.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,12 @@ The server successfully detected this situation and will download merged part fr
10411041
M(IndexGenericExclusionSearchAlgorithm, "Number of times the generic exclusion search algorithm is used over the index marks", ValueType::Number) \
10421042
M(ParallelReplicasQueryCount, "Number of (sub)queries executed using parallel replicas during a query execution", ValueType::Number) \
10431043
M(DistributedConnectionReconnectCount, "Number of reconnects to other servers done during distributed query execution. It can happen when a stale connection has been acquired from connection pool", ValueType::Number) \
1044-
1044+
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
1045+
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
1046+
M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \
1047+
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
1048+
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
1049+
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \
10451050

10461051
#ifdef APPLY_FOR_EXTERNAL_EVENTS
10471052
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)

src/Common/TTLCachePolicy.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,10 @@ class TTLCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunc
271271
return res;
272272
}
273273

274-
private:
274+
protected:
275275
using Cache = std::unordered_map<Key, MappedPtr, HashFunction>;
276276
Cache cache;
277-
277+
private:
278278
/// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators
279279
/// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a
280280
/// binary search on the sorted container and erase all left of the found key.

src/Core/FormatFactorySettings.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1351,8 +1351,7 @@ Limits the size of the blocks formed during data parsing in input formats in byt
13511351
DECLARE(Bool, input_format_parquet_allow_geoparquet_parser, true, R"(
13521352
Use geo column parser to convert Array(UInt8) into Point/Linestring/Polygon/MultiLineString/MultiPolygon types
13531353
)", 0) \
1354-
1355-
1354+
DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \
13561355
// End of FORMAT_FACTORY_SETTINGS
13571356

13581357
#define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \

src/Core/ServerSettings.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1064,7 +1064,10 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
10641064
See [Controlling behavior on server CPU overload](/operations/settings/server-overload) for more details.
10651065
)", 0) \
10661066
DECLARE(Float, distributed_cache_keep_up_free_connections_ratio, 0.1f, "Soft limit for number of active connection distributed cache will try to keep free. After the number of free connections goes below distributed_cache_keep_up_free_connections_ratio * max_connections, connections with oldest activity will be closed until the number goes above the limit.", 0) \
1067-
1067+
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
1068+
DECLARE(UInt64, object_storage_list_objects_cache_size, 500000000, "Maximum size of ObjectStorage list objects cache in bytes. Zero means disabled.", 0) \
1069+
DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \
1070+
DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0)
10681071

10691072
// clang-format on
10701073

src/Core/Settings.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6865,10 +6865,16 @@ Default number of buckets for distributed shuffle-hash-join.
68656865
)", EXPERIMENTAL) \
68666866
DECLARE(UInt64, distributed_plan_default_reader_bucket_count, 8, R"(
68676867
Default number of tasks for parallel reading in distributed query. Tasks are spread across between replicas.
6868+
)", EXPERIMENTAL) \
6869+
DECLARE(Bool, use_object_storage_list_objects_cache, false, R"(
6870+
Cache the list of objects returned by list objects calls in object storage
68686871
)", EXPERIMENTAL) \
68696872
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
68706873
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
68716874
)", 0) \
6875+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
6876+
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
6877+
)", EXPERIMENTAL) \
68726878
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
68736879
Force specified kind of Exchange operators between distributed query stages.
68746880
@@ -6877,13 +6883,17 @@ Possible values:
68776883
- '' - do not force any kind of Exchange operators, let the optimizer choose,
68786884
- 'Persisted' - use temporary files in object storage,
68796885
- 'Streaming' - stream exchange data over network.
6886+
)", EXPERIMENTAL) \
6887+
DECLARE(Bool, object_storage_remote_initiator, false, R"(
6888+
Execute request to object storage as remote on one of object_storage_cluster nodes.
68806889
)", EXPERIMENTAL) \
68816890
\
68826891
/** Experimental timeSeries* aggregate functions. */ \
68836892
DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"(
68846893
Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation.
68856894
)", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \
68866895
\
6896+
68876897
/* ####################################################### */ \
68886898
/* ############ END OF EXPERIMENTAL FEATURES ############# */ \
68896899
/* ####################################################### */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7373
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
7474
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
7575
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
76+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
7677
{"object_storage_cluster", "", "", "New setting"},
7778
{"object_storage_max_nodes", 0, 0, "New setting"},
79+
{"object_storage_remote_initiator", false, false, "New setting."},
7880
});
7981
addSettingsChanges(settings_changes_history, "25.6",
8082
{
@@ -173,6 +175,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
173175
{"parallel_hash_join_threshold", 0, 0, "New setting"},
174176
{"function_date_trunc_return_type_behavior", 1, 0, "Change the result type for dateTrunc function for DateTime64/Date32 arguments to DateTime64/Date32 regardless of time unit to get correct result for negative values"},
175177
/// Release closed. Please use 25.5
178+
// Altinity Antalya modifications atop of 25.2
179+
{"object_storage_cluster", "", "", "New setting"},
180+
{"object_storage_max_nodes", 0, 0, "New setting"},
181+
{"use_object_storage_list_objects_cache", true, false, "New setting."},
176182
});
177183
addSettingsChanges(settings_changes_history, "25.3",
178184
{
@@ -191,6 +197,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
191197
{"parallel_hash_join_threshold", 0, 0, "New setting"},
192198
/// Release closed. Please use 25.4
193199
});
200+
addSettingsChanges(settings_changes_history, "24.12.2.20000",
201+
{
202+
// Altinity Antalya modifications atop of 24.12
203+
{"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586
204+
});
194205
addSettingsChanges(settings_changes_history, "25.2",
195206
{
196207
/// Release closed. Please use 25.3

src/Databases/DataLake/DataLakeConstants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace DataLake
88
{
99

1010
static constexpr auto DATABASE_ENGINE_NAME = "DataLakeCatalog";
11+
static constexpr auto DATABASE_ALIAS_NAME = "Iceberg";
1112
static constexpr std::string_view FILE_PATH_PREFIX = "file:/";
1213

1314
/// Some catalogs (Unity or Glue) may store not only Iceberg/DeltaLake tables but other kinds of "tables"

src/Databases/DataLake/DatabaseDataLake.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,11 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
646646
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
647647
}
648648

649+
if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
650+
{
651+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
652+
}
653+
649654
for (auto & engine_arg : engine_args)
650655
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
651656

@@ -724,6 +729,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
724729
std::move(engine_for_tables));
725730
};
726731
factory.registerDatabase("DataLakeCatalog", create_fn, { .supports_arguments = true, .supports_settings = true });
732+
factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true });
727733
}
728734

729735
}

0 commit comments

Comments
 (0)