Skip to content

Commit e803bc1

Browse files
al13n321zvonand
authored andcommitted
Merge pull request ClickHouse#82949 from ClickHouse/shp2
Unrevert "Refactor how IInputFormat deals with reading many files at once"
1 parent 2000540 commit e803bc1

File tree

84 files changed

+1041
-397
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+1041
-397
lines changed

.clang-tidy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Checks: [
2828
'-bugprone-unchecked-optional-access',
2929
'-bugprone-crtp-constructor-accessibility',
3030
'-bugprone-not-null-terminated-result',
31+
'-bugprone-forward-declaration-namespace',
3132

3233
'-cert-dcl16-c',
3334
'-cert-err58-cpp',

ci/jobs/scripts/check_style/aspell-ignore/en/aspell-dict.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -800,8 +800,6 @@ PagerDuty
800800
ParallelFormattingOutputFormatThreads
801801
ParallelFormattingOutputFormatThreadsActive
802802
ParallelParsingInputFormat
803-
ParallelParsingInputFormatThreads
804-
ParallelParsingInputFormatThreadsActive
805803
ParallelReplicasMode
806804
ParquetCompression
807805
ParquetMetadata

docs/en/operations/system-tables/metrics.md

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -524,14 +524,6 @@ Number of threads in the ParallelFormattingOutputFormatThreads thread pool.
524524

525525
Number of threads in the ParallelFormattingOutputFormatThreads thread pool running a task.
526526

527-
### ParallelParsingInputFormatThreads {#parallelparsinginputformatthreads}
528-
529-
Number of threads in the ParallelParsingInputFormat thread pool.
530-
531-
### ParallelParsingInputFormatThreadsActive {#parallelparsinginputformatthreadsactive}
532-
533-
Number of threads in the ParallelParsingInputFormat thread pool running a task.
534-
535527
### PartMutation {#partmutation}
536528

537529
Number of mutations (ALTER DELETE/UPDATE)

programs/keeper-bench/Runner.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -576,8 +576,7 @@ struct ZooKeeperRequestFromLogReader
576576
context,
577577
context->getSettingsRef()[DB::Setting::max_block_size],
578578
format_settings,
579-
1,
580-
std::nullopt,
579+
DB::FormatParserGroup::singleThreaded(context->getSettingsRef()),
581580
/*is_remote_fs*/ false,
582581
DB::CompressionMethod::None,
583582
false);

programs/local/LocalServer.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ namespace ServerSetting
129129
extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_size;
130130
extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_free_size;
131131
extern const ServerSettingsUInt64 prefixes_deserialization_thread_pool_thread_pool_queue_size;
132+
extern const ServerSettingsUInt64 max_format_parsing_thread_pool_size;
133+
extern const ServerSettingsUInt64 max_format_parsing_thread_pool_free_size;
134+
extern const ServerSettingsUInt64 format_parsing_thread_pool_queue_size;
132135
}
133136

134137
namespace ErrorCodes
@@ -266,6 +269,11 @@ void LocalServer::initialize(Poco::Util::Application & self)
266269
server_settings[ServerSetting::max_prefixes_deserialization_thread_pool_size],
267270
server_settings[ServerSetting::max_prefixes_deserialization_thread_pool_free_size],
268271
server_settings[ServerSetting::prefixes_deserialization_thread_pool_thread_pool_queue_size]);
272+
273+
getFormatParsingThreadPool().initialize(
274+
server_settings[ServerSetting::max_format_parsing_thread_pool_size],
275+
server_settings[ServerSetting::max_format_parsing_thread_pool_free_size],
276+
server_settings[ServerSetting::format_parsing_thread_pool_queue_size]);
269277
}
270278

271279

programs/server/Server.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@ namespace ServerSetting
316316
extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_size;
317317
extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_free_size;
318318
extern const ServerSettingsUInt64 prefixes_deserialization_thread_pool_thread_pool_queue_size;
319+
extern const ServerSettingsUInt64 max_format_parsing_thread_pool_size;
320+
extern const ServerSettingsUInt64 max_format_parsing_thread_pool_free_size;
321+
extern const ServerSettingsUInt64 format_parsing_thread_pool_queue_size;
319322
extern const ServerSettingsUInt64 page_cache_history_window_ms;
320323
extern const ServerSettingsString page_cache_policy;
321324
extern const ServerSettingsDouble page_cache_size_ratio;
@@ -1365,6 +1368,11 @@ try
13651368
server_settings[ServerSetting::max_prefixes_deserialization_thread_pool_free_size],
13661369
server_settings[ServerSetting::prefixes_deserialization_thread_pool_thread_pool_queue_size]);
13671370

1371+
getFormatParsingThreadPool().initialize(
1372+
server_settings[ServerSetting::max_format_parsing_thread_pool_size],
1373+
server_settings[ServerSetting::max_format_parsing_thread_pool_free_size],
1374+
server_settings[ServerSetting::format_parsing_thread_pool_queue_size]);
1375+
13681376
std::string path_str = getCanonicalPath(config().getString("path", DBMS_DEFAULT_PATH));
13691377
fs::path path = path_str;
13701378

@@ -2088,6 +2096,11 @@ try
20882096
new_server_settings[ServerSetting::max_prefixes_deserialization_thread_pool_free_size],
20892097
new_server_settings[ServerSetting::prefixes_deserialization_thread_pool_thread_pool_queue_size]);
20902098

2099+
getFormatParsingThreadPool().reloadConfiguration(
2100+
new_server_settings[ServerSetting::max_format_parsing_thread_pool_size],
2101+
new_server_settings[ServerSetting::max_format_parsing_thread_pool_free_size],
2102+
new_server_settings[ServerSetting::format_parsing_thread_pool_queue_size]);
2103+
20912104
global_context->setMergeWorkload(new_server_settings[ServerSetting::merge_workload]);
20922105
global_context->setMutationWorkload(new_server_settings[ServerSetting::mutation_workload]);
20932106
global_context->setThrowOnUnknownWorkload(new_server_settings[ServerSetting::throw_on_unknown_workload]);

src/Client/ClientBase.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,11 @@
7070
#include <IO/WriteBufferFromFileDescriptor.h>
7171
#include <IO/CompressionMethod.h>
7272
#include <IO/ForkWriteBuffer.h>
73+
#include <IO/SharedThreadPools.h>
7374

7475
#include <Access/AccessControl.h>
7576
#include <Storages/ColumnsDescription.h>
77+
#include <Storages/SelectQueryInfo.h>
7678
#include <TableFunctions/ITableFunction.h>
7779

7880
#include <filesystem>

src/Common/CurrentMetrics.cpp

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,6 @@
153153
M(ParallelFormattingOutputFormatThreads, "Number of threads in the ParallelFormattingOutputFormatThreads thread pool.") \
154154
M(ParallelFormattingOutputFormatThreadsActive, "Number of threads in the ParallelFormattingOutputFormatThreads thread pool running a task.") \
155155
M(ParallelFormattingOutputFormatThreadsScheduled, "Number of queued or active jobs in the ParallelFormattingOutputFormatThreads thread pool.") \
156-
M(ParallelParsingInputFormatThreads, "Number of threads in the ParallelParsingInputFormat thread pool.") \
157-
M(ParallelParsingInputFormatThreadsActive, "Number of threads in the ParallelParsingInputFormat thread pool running a task.") \
158-
M(ParallelParsingInputFormatThreadsScheduled, "Number of queued or active jobs in the ParallelParsingInputFormat thread pool.") \
159156
M(MergeTreeBackgroundExecutorThreads, "Number of threads in the MergeTreeBackgroundExecutor thread pool.") \
160157
M(MergeTreeBackgroundExecutorThreadsActive, "Number of threads in the MergeTreeBackgroundExecutor thread pool running a task.") \
161158
M(MergeTreeBackgroundExecutorThreadsScheduled, "Number of queued or active jobs in the MergeTreeBackgroundExecutor thread pool.") \
@@ -239,21 +236,15 @@
239236
M(QueryPipelineExecutorThreads, "Number of threads in the PipelineExecutor thread pool.") \
240237
M(QueryPipelineExecutorThreadsActive, "Number of threads in the PipelineExecutor thread pool running a task.") \
241238
M(QueryPipelineExecutorThreadsScheduled, "Number of queued or active jobs in the PipelineExecutor thread pool.") \
242-
M(ParquetDecoderThreads, "Number of threads in the ParquetBlockInputFormat thread pool.") \
243-
M(ParquetDecoderThreadsActive, "Number of threads in the ParquetBlockInputFormat thread pool running a task.") \
244-
M(ParquetDecoderThreadsScheduled, "Number of queued or active jobs in the ParquetBlockInputFormat thread pool.") \
245-
M(ParquetDecoderIOThreads, "Number of threads in the ParquetBlockInputFormat io thread pool.") \
246-
M(ParquetDecoderIOThreadsActive, "Number of threads in the ParquetBlockInputFormat io thread pool running a task.") \
247-
M(ParquetDecoderIOThreadsScheduled, "Number of queued or active jobs in the ParquetBlockInputFormat io thread pool.") \
248239
M(ParquetEncoderThreads, "Number of threads in ParquetBlockOutputFormat thread pool.") \
249240
M(ParquetEncoderThreadsActive, "Number of threads in ParquetBlockOutputFormat thread pool running a task.") \
250241
M(ParquetEncoderThreadsScheduled, "Number of queued or active jobs in ParquetBlockOutputFormat thread pool.") \
251242
M(MergeTreeSubcolumnsReaderThreads, "Number of threads in the thread pool used for subcolumns reading in MergeTree.") \
252243
M(MergeTreeSubcolumnsReaderThreadsActive, "Number of threads in the thread pool used for subcolumns reading in MergeTree running a task.") \
253244
M(MergeTreeSubcolumnsReaderThreadsScheduled, "Number of queued or active jobs in the thread pool used for subcolumns reading in MergeTree.") \
254-
M(DWARFReaderThreads, "Number of threads in the DWARFBlockInputFormat thread pool.") \
255-
M(DWARFReaderThreadsActive, "Number of threads in the DWARFBlockInputFormat thread pool running a task.") \
256-
M(DWARFReaderThreadsScheduled, "Number of queued or active jobs in the DWARFBlockInputFormat thread pool.") \
245+
M(FormatParsingThreads, "Number of threads in the thread pool used for parsing input.") \
246+
M(FormatParsingThreadsActive, "Number of threads in the thread pool used for parsing input running a task.") \
247+
M(FormatParsingThreadsScheduled, "Number of queued or active jobs in the thread pool used for parsing input.") \
257248
M(OutdatedPartsLoadingThreads, "Number of threads in the threadpool for loading Outdated data parts.") \
258249
M(OutdatedPartsLoadingThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \
259250
M(OutdatedPartsLoadingThreadsScheduled, "Number of queued or active jobs in the threadpool for loading Outdated data parts.") \

0 commit comments

Comments
 (0)