Skip to content

Commit e52956b

Browse files
authored
[fix](shuffle) EOF iff all channels done (apache#56731)
Consider this case below: ``` +--- Channel0 (Running) | ExchangeSink ---+--- Channel1 (EOF) | +--- Channel2 (Running) ``` Channel1 is EOF now and return `END_OF_FILE` here. However, Channel0 and Channel2 still need new data. If ExchangeSink returns EOF, downstream tasks will no longer receive blocks including EOS signal. So we must ensure to return EOF iff all channels are EOF.
1 parent b9c48f4 commit e52956b

File tree

6 files changed

+27
-9
lines changed

6 files changed

+27
-9
lines changed

be/src/pipeline/exec/exchange_sink_operator.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,20 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
514514
for (auto& channel : local_state.channels) {
515515
COUNTER_UPDATE(local_state.memory_used_counter(), -channel->mem_usage());
516516
Status st = channel->close(state);
517-
if (!st.ok() && final_st.ok()) {
517+
/**
518+
* Consider this case below:
519+
*
520+
* +--- Channel0 (Running)
521+
* |
522+
* ExchangeSink ---+--- Channel1 (EOF)
523+
* |
524+
* +--- Channel2 (Running)
525+
*
526+
* Channel1 is EOF now and return `END_OF_FILE` here. However, Channel0 and Channel2
527+
* still need new data. If ExchangeSink returns EOF, downstream tasks will no longer receive
528+
* blocks including EOS signal. So we must ensure to return EOF iff all channels are EOF.
529+
*/
530+
if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>() && final_st.ok()) {
518531
final_st = st;
519532
}
520533
}

be/src/pipeline/pipeline.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
111111
return Status::OK();
112112
}
113113

114-
void Pipeline::make_all_runnable() {
114+
void Pipeline::make_all_runnable(PipelineId wake_by) {
115115
DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
116116
auto pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
117117
"Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
@@ -124,7 +124,7 @@ void Pipeline::make_all_runnable() {
124124
if (_sink->count_down_destination()) {
125125
for (auto* task : _tasks) {
126126
if (task) {
127-
task->set_wake_up_early();
127+
task->set_wake_up_early(wake_by);
128128
}
129129
}
130130
for (auto* task : _tasks) {

be/src/pipeline/pipeline.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
104104
_tasks[i] = task;
105105
}
106106

107-
void make_all_runnable();
107+
void make_all_runnable(PipelineId wake_by);
108108

109109
void set_num_tasks(int num_tasks) {
110110
_num_tasks = num_tasks;

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1782,7 +1782,7 @@ void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
17821782
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
17831783
if (_dag.contains(pipeline_id)) {
17841784
for (auto dep : _dag[pipeline_id]) {
1785-
_pip_id_to_pipeline[dep]->make_all_runnable();
1785+
_pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
17861786
}
17871787
}
17881788
}

be/src/pipeline/pipeline_task.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -723,10 +723,11 @@ std::string PipelineTask::debug_string() {
723723

724724
fmt::format_to(debug_string_buffer,
725725
"PipelineTask[id = {}, open = {}, eos = {}, state = {}, dry run = "
726-
"{}, _wake_up_early = {}, time elapsed since last state changing = {}s, spilling"
727-
" = {}, is running = {}]",
726+
"{}, _wake_up_early = {}, _wake_up_by = {}, time elapsed since last state "
727+
"changing = {}s, spilling = {}, is running = {}]",
728728
_index, _opened, _eos, _to_string(_exec_state), _dry_run, _wake_up_early.load(),
729-
_state_change_watcher.elapsed_time() / NANOS_PER_SEC, _spilling, is_running());
729+
_wake_by, _state_change_watcher.elapsed_time() / NANOS_PER_SEC, _spilling,
730+
is_running());
730731
std::unique_lock<std::mutex> lc(_dependency_lock);
731732
auto* cur_blocked_dep = _blocked_dep;
732733
auto fragment = _fragment_context.lock();

be/src/pipeline/pipeline_task.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
127127
int task_id() const { return _index; };
128128
bool is_finalized() const { return _exec_state == State::FINALIZED; }
129129

130-
void set_wake_up_early() { _wake_up_early = true; }
130+
void set_wake_up_early(PipelineId wake_by = -1) {
131+
_wake_up_early = true;
132+
_wake_by = wake_by;
133+
}
131134

132135
// Execution phase should be terminated. This is called if this task is canceled or waken up early.
133136
void terminate();
@@ -309,6 +312,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
309312
MonotonicStopWatch _state_change_watcher;
310313
std::atomic<bool> _spilling = false;
311314
const std::string _pipeline_name;
315+
int _wake_by = -1;
312316
};
313317

314318
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;

0 commit comments

Comments
 (0)