Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace DB

namespace Setting
{
extern const SettingsMaxThreads max_threads;
extern const SettingsBool parallelize_output_from_storages;
}


Expand All @@ -50,6 +50,7 @@ ReadFromObjectStorageStep::ReadFromObjectStorageStep(
, need_only_count(need_only_count_)
, max_block_size(max_block_size_)
, num_streams(num_streams_)
, max_num_streams(num_streams_)
, distributed_processing(distributed_processing_)
{
}
Expand Down Expand Up @@ -128,6 +129,13 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(std::make_shared<const Block>(info.source_header)));

size_t output_ports = pipe.numOutputPorts();
const bool parallelize_output = context->getSettingsRef()[Setting::parallelize_output_from_storages];
if (parallelize_output
&& FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->getFormat(), context)
&& output_ports > 0 && output_ports < max_num_streams)
pipe.resize(max_num_streams);

for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);

Expand Down
1 change: 1 addition & 0 deletions src/Processors/QueryPlan/ReadFromObjectStorageStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter
const bool need_only_count;
const size_t max_block_size;
size_t num_streams;
const size_t max_num_streams;
const bool distributed_processing;

void createIterator();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Resize 1 → 4
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env bash
# Tags: no-fasttest

# Verify that reading a single file from object storage (ENGINE = S3)
# parallelizes the pipeline output via Resize, so downstream processors
# like AggregatingTransform can run on multiple threads.
# Without this, queries on a single S3/data-lake Parquet file run
# entirely single-threaded (e.g. Q28 in ClickBench: 79× slower).

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

${CLICKHOUSE_CLIENT} --query "
CREATE TABLE test_s3_${CLICKHOUSE_DATABASE} (x UInt64, y String)
ENGINE = S3('http://localhost:19999/dummy.parquet', Parquet);
"

# The pipeline should contain 'Resize 1 → 4' between the source and
# the processing transforms, proving the single source output is
# distributed across max_threads workers.
${CLICKHOUSE_CLIENT} --query "
EXPLAIN PIPELINE
SELECT count(), sum(x) FROM test_s3_${CLICKHOUSE_DATABASE}
GROUP BY y
SETTINGS max_threads = 4;
" | grep -o 'Resize 1 → 4'

${CLICKHOUSE_CLIENT} --query "DROP TABLE test_s3_${CLICKHOUSE_DATABASE};"
Loading