Skip to content

Commit 67a38d1

Browse files
authored
Merge pull request #1039 from Altinity/fp_antaya_25_8_parquet_metadata_caching
Antalya 25.8 - Forward port of #938 - Parquet metadata caching
2 parents e431b71 + 16bffd1 commit 67a38d1

23 files changed

+326
-12
lines changed

programs/server/Server.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@
157157
# include <azure/core/diagnostics/logger.hpp>
158158
#endif
159159

160+
#if USE_PARQUET
161+
# include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
162+
#endif
163+
160164

161165
#include <incbin.h>
162166
/// A minimal file used when the server is run without installation
@@ -338,6 +342,7 @@ namespace ServerSetting
338342
extern const ServerSettingsBool abort_on_logical_error;
339343
extern const ServerSettingsUInt64 jemalloc_flush_profile_interval_bytes;
340344
extern const ServerSettingsBool jemalloc_flush_profile_on_memory_exceeded;
345+
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
341346
}
342347

343348
namespace ErrorCodes
@@ -2524,6 +2529,10 @@ try
25242529

25252530
auto replicas_reconnector = ReplicasReconnector::init(global_context);
25262531

2532+
#if USE_PARQUET
2533+
ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]);
2534+
#endif
2535+
25272536
/// Set current database name before loading tables and databases because
25282537
/// system logs may copy global context.
25292538
std::string default_database = server_settings[ServerSetting::default_database];

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ enum class AccessType : uint8_t
320320
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
321321
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
322322
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
323+
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
323324
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
324325
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
325326
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \

src/Common/ProfileEvents.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1139,7 +1139,8 @@ The server successfully detected this situation and will download merged part fr
11391139
M(AsyncLoggingErrorFileLogDroppedMessages, "How many messages have been dropped from error file log due to the async log queue being full", ValueType::Number) \
11401140
M(AsyncLoggingSyslogDroppedMessages, "How many messages have been dropped from the syslog due to the async log queue being full", ValueType::Number) \
11411141
M(AsyncLoggingTextLogDroppedMessages, "How many messages have been dropped from text_log due to the async log queue being full", ValueType::Number) \
1142-
1142+
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
1143+
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
11431144

11441145
#ifdef APPLY_FOR_EXTERNAL_EVENTS
11451146
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)

src/Core/FormatFactorySettings.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,8 +1458,7 @@ Use geo column parser to convert Array(UInt8) into Point/Linestring/Polygon/Mult
14581458
DECLARE(Bool, output_format_parquet_geometadata, true, R"(
14591459
Allow to write information about geo columns in parquet metadata and encode columns in WKB format.
14601460
)", 0) \
1461-
1462-
1461+
DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \
14631462
// End of FORMAT_FACTORY_SETTINGS
14641463

14651464
#define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \

src/Core/ServerSettings.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,8 +1139,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
11391139
DECLARE(UInt64, threadpool_local_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from local filesystem.)", 0) \
11401140
DECLARE(NonZeroUInt64, threadpool_remote_fs_reader_pool_size, 250, R"(Number of threads in the Thread pool used for reading from remote filesystem when `remote_filesystem_read_method = 'threadpool'`.)", 0) \
11411141
DECLARE(UInt64, threadpool_remote_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from remote filesystem.)", 0) \
1142-
1143-
1142+
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
11441143
// clang-format on
11451144

11461145
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below

src/Core/SettingsChangesHistory.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
279279
{"parallel_hash_join_threshold", 0, 0, "New setting"},
280280
/// Release closed. Please use 25.4
281281
});
282+
addSettingsChanges(settings_changes_history, "24.12.2.20000",
283+
{
284+
// Altinity Antalya modifications atop of 24.12
285+
{"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586
286+
});
282287
addSettingsChanges(settings_changes_history, "25.2",
283288
{
284289
/// Release closed. Please use 25.3

src/Interpreters/InterpreterSystemQuery.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@
8181
#include <Formats/ProtobufSchemas.h>
8282
#endif
8383

84+
#if USE_PARQUET
85+
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
86+
#endif
87+
8488
#if USE_AWS_S3
8589
#include <IO/S3/Client.h>
8690
#endif
@@ -436,6 +440,16 @@ BlockIO InterpreterSystemQuery::execute()
436440
getContext()->clearQueryResultCache(query.query_result_cache_tag);
437441
break;
438442
}
443+
case Type::DROP_PARQUET_METADATA_CACHE:
444+
{
445+
#if USE_PARQUET
446+
getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE);
447+
ParquetFileMetaDataCache::instance()->clear();
448+
break;
449+
#else
450+
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet");
451+
#endif
452+
}
439453
case Type::DROP_COMPILED_EXPRESSION_CACHE:
440454
#if USE_EMBEDDED_COMPILER
441455
getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE);
@@ -1589,6 +1603,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
15891603
case Type::DROP_PAGE_CACHE:
15901604
case Type::DROP_SCHEMA_CACHE:
15911605
case Type::DROP_FORMAT_SCHEMA_CACHE:
1606+
case Type::DROP_PARQUET_METADATA_CACHE:
15921607
case Type::DROP_S3_CLIENT_CACHE:
15931608
{
15941609
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);

src/Parsers/ASTSystemQuery.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
484484
case Type::DROP_COMPILED_EXPRESSION_CACHE:
485485
case Type::DROP_S3_CLIENT_CACHE:
486486
case Type::DROP_ICEBERG_METADATA_CACHE:
487+
case Type::DROP_PARQUET_METADATA_CACHE:
487488
case Type::RESET_COVERAGE:
488489
case Type::RESTART_REPLICAS:
489490
case Type::JEMALLOC_PURGE:

src/Parsers/ASTSystemQuery.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
4141
DROP_SCHEMA_CACHE,
4242
DROP_FORMAT_SCHEMA_CACHE,
4343
DROP_S3_CLIENT_CACHE,
44+
DROP_PARQUET_METADATA_CACHE,
4445
STOP_LISTEN,
4546
START_LISTEN,
4647
RESTART_REPLICAS,

src/Processors/Formats/IInputFormat.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <Processors/Formats/InputFormatErrorsLogger.h>
66
#include <Core/BlockMissingValues.h>
77
#include <Processors/ISource.h>
8+
#include <Core/Settings.h>
89

910

1011
namespace DB
@@ -79,6 +80,9 @@ class IInputFormat : public ISource
7980

8081
void needOnlyCount() { need_only_count = true; }
8182

83+
/// Set additional info/key/id related to underlying storage of the ReadBuffer
84+
virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {}
85+
8286
protected:
8387
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
8488

0 commit comments

Comments
 (0)