Skip to content

Commit dda44b8

Browse files
authored
patch: split Resize/StrictResize to avoid lock contention (#349)
1 parent 2a962a8 commit dda44b8

File tree

9 files changed

+117
-10
lines changed

9 files changed

+117
-10
lines changed

src/Core/Settings.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6547,6 +6547,27 @@ When the query prioritization mechanism is employed (see setting `priority`), lo
65476547
)", BETA) \
65486548
DECLARE(Float, min_os_cpu_wait_time_ratio_to_throw, 0.0, "Min ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 0 at this point.", 0) \
65496549
DECLARE(Float, max_os_cpu_wait_time_ratio_to_throw, 0.0, "Max ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 1 at this point.", 0) \
6550+
DECLARE(UInt64, min_outstreams_per_resize_after_split, 24, R"(
6551+
Specifies the minimum number of output streams of a `Resize` or `StrictResize` processor after the split is performed during pipeline generation. If the resulting number of streams is less than this value, the split operation will not occur.
6552+
### What is a Resize Node
6553+
A `Resize` node is a processor in the query pipeline that adjusts the number of data streams flowing through the pipeline. It can either increase or decrease the number of streams to balance the workload across multiple threads or processors. For example, if a query requires more parallelism, the `Resize` node can split a single stream into multiple streams. Conversely, it can merge multiple streams into fewer streams to consolidate data processing.
6554+
The `Resize` node ensures that data is evenly distributed across streams, maintaining the structure of the data blocks. This helps optimize resource utilization and improve query performance.
6555+
### Why the Resize Node Needs to Be Split
6556+
During pipeline execution, ExecutingGraph::Node::status_mutex of the centrally-hubbed `Resize` node is heavily contended especially in high-core-count environments, and this contention leads to:
6557+
1. Increased latency for ExecutingGraph::updateNode, directly impacting query performance.
6558+
2. Excessive CPU cycles are wasted in spin-lock contention (native_queued_spin_lock_slowpath), degrading efficiency.
6559+
3. Reduced CPU utilization, limiting parallelism and throughput.
6560+
### How the Resize Node Gets Split
6561+
1. The number of output streams is checked to ensure the split could be performed: the output streams of each split processor meet or exceed the `min_outstreams_per_resize_after_split` threshold.
6562+
2. The `Resize` node is divided into smaller `Resize` nodes with equal count of ports, each handling a subset of input and output streams.
6563+
3. Each group is processed independently, reducing the lock contention.
6564+
### Splitting Resize Node with Arbitrary Inputs/Outputs
6565+
In some cases, where the inputs/outputs are indivisible by the number of split `Resize` nodes, some inputs are connected to `NullSource`s and some outputs are connected to `NullSink`s. This allows the split to occur without affecting the overall data flow.
6566+
### Purpose of the Setting
6567+
The `min_outstreams_per_resize_after_split` setting ensures that the splitting of `Resize` nodes is meaningful and avoids creating too few streams, which could lead to inefficient parallel processing. By enforcing a minimum number of output streams, this setting helps maintain a balance between parallelism and overhead, optimizing query execution in scenarios involving stream splitting and merging.
6568+
### Disabling the Setting
6569+
To disable the split of `Resize` nodes, set this setting to 0. This will prevent the splitting of `Resize` nodes during pipeline generation, allowing them to retain their original structure without division into smaller nodes.
6570+
)", 0) \
65506571
\
65516572
/* ####################################################### */ \
65526573
/* ########### START OF EXPERIMENTAL FEATURES ############ */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
108108
{"input_format_parquet_allow_geoparquet_parser", false, true, "A new setting to use geo columns in parquet file"},
109109
{"enable_url_encoding", true, false, "Changed existing setting's default value"},
110110
{"s3_slow_all_threads_after_network_error", false, true, "New setting"},
111+
{"min_outstreams_per_resize_after_split", 0, 24, "New setting."},
111112
});
112113
addSettingsChanges(settings_changes_history, "25.4",
113114
{

src/Processors/QueryPlan/AggregatingStep.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
495495
/// Add resize transform to uniformly distribute data between aggregating streams.
496496
/// But not if we execute aggregation over partitioned data in which case data streams shouldn't be mixed.
497497
if (!storage_has_evenly_distributed_read && !skip_merging)
498-
pipeline.resize(pipeline.getNumStreams(), true);
498+
pipeline.resize(pipeline.getNumStreams(), true, settings.min_outstreams_per_resize_after_split);
499499

500500
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
501501

@@ -514,7 +514,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
514514
skip_merging);
515515
});
516516

517-
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads);
517+
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, false, settings.min_outstreams_per_resize_after_split);
518518

519519
aggregating = collector.detachProcessors(0);
520520
}

src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace Setting
1010
extern const SettingsBool query_plan_merge_filters;
1111
extern const SettingsMaxThreads max_threads;
1212
extern const SettingsUInt64 aggregation_memory_efficient_merge_threads;
13+
extern const SettingsUInt64 min_outstreams_per_resize_after_split;
1314
}
1415

1516
BuildQueryPipelineSettings::BuildQueryPipelineSettings(ContextPtr from)
@@ -22,6 +23,7 @@ BuildQueryPipelineSettings::BuildQueryPipelineSettings(ContextPtr from)
2223

2324
max_threads = from->getSettingsRef()[Setting::max_threads];
2425
aggregation_memory_efficient_merge_threads = from->getSettingsRef()[Setting::aggregation_memory_efficient_merge_threads];
26+
min_outstreams_per_resize_after_split = from->getSettingsRef()[Setting::min_outstreams_per_resize_after_split];
2527

2628
/// Setting query_plan_merge_filters is enabled by default.
2729
/// But it can brake short-circuit without splitting filter step into smaller steps.

src/Processors/QueryPlan/BuildQueryPipelineSettings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ struct BuildQueryPipelineSettings
2828

2929
size_t max_threads;
3030
size_t aggregation_memory_efficient_merge_threads;
31+
size_t min_outstreams_per_resize_after_split;
3132

3233
const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; }
3334
};

src/QueryPipeline/Pipe.cpp

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -683,21 +683,102 @@ void Pipe::addChains(std::vector<Chain> chains)
683683
max_parallel_streams = std::max(max_parallel_streams, max_parallel_streams_for_chains);
684684
}
685685

686-
void Pipe::resize(size_t num_streams, bool strict)
686+
void Pipe::addSplitResizeTransform(size_t num_streams, size_t min_outstreams_per_resize_after_split, bool strict)
687+
{
688+
OutputPortRawPtrs resize_output_ports(num_streams);
689+
690+
size_t groups = std::min<size_t>(numOutputPorts(), num_streams / min_outstreams_per_resize_after_split);
691+
size_t instream_per_group = (numOutputPorts() + groups - 1) / groups;
692+
size_t groups_with_extra_instream = numOutputPorts() % groups;
693+
size_t outstreams_per_group = (num_streams + groups - 1) / groups;
694+
size_t groups_with_extra_outstream = num_streams % groups;
695+
696+
chassert(groups > 1);
697+
698+
for (size_t i = 0, next_input = 0, next_output = 0; i < groups; ++i)
699+
{
700+
ProcessorPtr resize;
701+
if (strict)
702+
resize = std::make_shared<StrictResizeProcessor>(getHeader(), instream_per_group, outstreams_per_group);
703+
else
704+
resize = std::make_shared<ResizeProcessor>(getHeader(), instream_per_group, outstreams_per_group);
705+
706+
for (auto it = resize->getInputs().begin(); it != resize->getInputs().end(); ++it)
707+
{
708+
if (std::next(it) != resize->getInputs().end() || groups_with_extra_instream == 0 || i < groups_with_extra_instream)
709+
{
710+
connect(*output_ports[next_input], *it);
711+
++next_input;
712+
}
713+
else
714+
{
715+
auto null_source = std::make_shared<NullSource>(getHeader());
716+
connect(null_source->getPort(), *it);
717+
processors->emplace_back(std::move(null_source));
718+
}
719+
}
720+
721+
for (auto it = resize->getOutputs().begin(); it != resize->getOutputs().end(); ++it)
722+
{
723+
if (std::next(it) != resize->getOutputs().end() || groups_with_extra_outstream == 0 || i < groups_with_extra_outstream)
724+
{
725+
resize_output_ports[next_output] = &*it;
726+
++next_output;
727+
}
728+
else
729+
{
730+
auto null_sink = std::make_shared<NullSink>(getHeader());
731+
connect(*it, null_sink->getPort());
732+
processors->emplace_back(std::move(null_sink));
733+
}
734+
}
735+
736+
if (collected_processors)
737+
collected_processors->emplace_back(resize);
738+
processors->emplace_back(std::move(resize));
739+
}
740+
741+
output_ports = std::move(resize_output_ports);
742+
743+
header = output_ports.front()->getHeader();
744+
for (size_t i = 1; i < output_ports.size(); ++i)
745+
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipes");
746+
747+
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
748+
}
749+
750+
void Pipe::resize(size_t num_streams, bool strict, UInt64 min_outstreams_per_resize_after_split)
687751
{
688752
if (output_ports.empty())
689753
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot resize an empty Pipe");
690754

691-
if (strict && num_streams == numOutputPorts())
692-
return;
693-
694755
/// We need to not add the resize in case of 1-1 because in case
695756
/// it is not force resize and we have n outputs (look at the code above),
696757
/// and one of the outputs is dead, we can push all data to n-1 outputs,
697758
/// which doesn't make sense for 1-1 scenario
698759
if (numOutputPorts() == 1 && num_streams == 1)
699760
return;
700761

762+
/// Performance bottleneck identified: Severe lock contention for ExecutingGraph::Node::status_mutex.
763+
/// Issues observed:
764+
/// 1. Increased latency of ExecutingGraph::updateNode, lengthening overall query latency.
765+
/// 2. Unnecessary CPU cycle consumption in native_queued_spin_lock_slowpath (kernel).
766+
/// 3. Decreased CPU utilization.
767+
///
768+
/// Proposed solution: Split ResizeProcessor when multiple threads are allocated to execute a query.
769+
/// Benefits:
770+
/// 1. Mitigates lock contention.
771+
/// 2. Maintains ResizeProcessor's benefit of balancing data flow among multiple streams.
772+
///
773+
/// Disable this optimization when min_outstreams_per_resize_after_split is 0
774+
if (output_ports.size() > 1 && min_outstreams_per_resize_after_split != 0 && num_streams / min_outstreams_per_resize_after_split > 1)
775+
{
776+
addSplitResizeTransform(num_streams, min_outstreams_per_resize_after_split, strict);
777+
return;
778+
}
779+
if (strict && num_streams == numOutputPorts())
780+
return;
781+
701782
ProcessorPtr resize;
702783

703784
if (strict)

src/QueryPipeline/Pipe.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class Pipe
9090
void addChains(std::vector<Chain> chains);
9191

9292
/// Changes the number of output ports if needed. Adds (Strict)ResizeProcessor.
93-
void resize(size_t num_streams, bool strict = false);
93+
void resize(size_t num_streams, bool strict = false, UInt64 min_outstreams_per_resize_after_split = 0);
9494

9595
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
9696

@@ -130,6 +130,7 @@ class Pipe
130130
bool isCompleted() const { return !empty() && output_ports.empty(); }
131131
static Pipe unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header);
132132
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
133+
void addSplitResizeTransform(size_t num_streams, size_t min_outstreams_per_resize_after_split, bool strict);
133134

134135
friend class QueryPipelineBuilder;
135136
friend class QueryPipeline;

src/QueryPipeline/QueryPipelineBuilder.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,10 @@ void QueryPipelineBuilder::addMergingAggregatedMemoryEfficientTransform(Aggregat
193193
DB::addMergingAggregatedMemoryEfficientTransform(pipe, std::move(params), num_merging_processors);
194194
}
195195

196-
void QueryPipelineBuilder::resize(size_t num_streams, bool strict)
196+
void QueryPipelineBuilder::resize(size_t num_streams, bool strict, UInt64 min_outstreams_per_resize_after_split)
197197
{
198198
checkInitializedAndNotCompleted();
199-
pipe.resize(num_streams, strict);
199+
pipe.resize(num_streams, strict, min_outstreams_per_resize_after_split);
200200
}
201201

202202
void QueryPipelineBuilder::narrow(size_t size)

src/QueryPipeline/QueryPipelineBuilder.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class QueryPipelineBuilder
9797
void addMergingAggregatedMemoryEfficientTransform(AggregatingTransformParamsPtr params, size_t num_merging_processors);
9898

9999
/// Changes the number of output ports if needed. Adds ResizeTransform.
100-
void resize(size_t num_streams, bool strict = false);
100+
void resize(size_t num_streams, bool strict = false, UInt64 min_outstreams_per_resize_after_split = 0);
101101

102102
/// Concat some ports to have no more then size outputs.
103103
/// This method is needed for Merge table engine in case of reading from many tables.

0 commit comments

Comments
 (0)