Skip to content

Commit 3a13104

Browse files
committed
[fix](shuffle) EOF iff all channels done (apache#56731)
Consider this case below: ``` +--- Channel0 (Running) | ExchangeSink ---+--- Channel1 (EOF) | +--- Channel2 (Running) ``` Channel1 is EOF now and return `END_OF_FILE` here. However, Channel0 and Channel2 still need new data. If ExchangeSink returns EOF, downstream tasks will no longer receive blocks including EOS signal. So we must ensure to return EOF iff all channels are EOF.
1 parent c732673 commit 3a13104

File tree

6 files changed

+25
-8
lines changed

6 files changed

+25
-8
lines changed

be/src/pipeline/exec/exchange_sink_operator.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,20 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
606606
local_state._serializer.reset_block();
607607
for (auto& channel : local_state.channels) {
608608
Status st = channel->close(state);
609-
if (!st.ok() && final_st.ok()) {
609+
/**
610+
* Consider this case below:
611+
*
612+
* +--- Channel0 (Running)
613+
* |
614+
* ExchangeSink ---+--- Channel1 (EOF)
615+
* |
616+
* +--- Channel2 (Running)
617+
*
618+
* Channel1 is EOF now and return `END_OF_FILE` here. However, Channel0 and Channel2
619+
* still need new data. If ExchangeSink returns EOF, downstream tasks will no longer receive
620+
* blocks including EOS signal. So we must ensure to return EOF iff all channels are EOF.
621+
*/
622+
if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>() && final_st.ok()) {
610623
final_st = st;
611624
}
612625
}

be/src/pipeline/pipeline.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
108108
return Status::OK();
109109
}
110110

111-
void Pipeline::make_all_runnable() {
111+
void Pipeline::make_all_runnable(PipelineId wake_by) {
112112
DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
113113
auto pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
114114
"Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
@@ -121,7 +121,7 @@ void Pipeline::make_all_runnable() {
121121
if (_sink->count_down_destination()) {
122122
for (auto* task : _tasks) {
123123
if (task) {
124-
task->set_wake_up_early();
124+
task->set_wake_up_early(wake_by);
125125
}
126126
}
127127
for (auto* task : _tasks) {

be/src/pipeline/pipeline.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
102102
_tasks[i] = task;
103103
}
104104

105-
void make_all_runnable();
105+
void make_all_runnable(PipelineId wake_by);
106106

107107
void set_num_tasks(int num_tasks) {
108108
_num_tasks = num_tasks;

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1750,7 +1750,7 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
17501750
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
17511751
if (_dag.contains(pipeline_id)) {
17521752
for (auto dep : _dag[pipeline_id]) {
1753-
_pip_id_to_pipeline[dep]->make_all_runnable();
1753+
_pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
17541754
}
17551755
}
17561756
}

be/src/pipeline/pipeline_task.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,9 +593,9 @@ std::string PipelineTask::debug_string() {
593593
fmt::format_to(
594594
debug_string_buffer,
595595
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finalized = {}, dry run = "
596-
"{}, _wake_up_early = {}, is running = {}]",
596+
"{}, _wake_up_early = {}, _wake_up_by = {}, is running = {}]",
597597
(void*)this, _index, _opened, _eos, _finalized, _dry_run, _wake_up_early.load(),
598-
is_running());
598+
_wake_by, is_running());
599599
std::unique_lock<std::mutex> lc(_dependency_lock);
600600
auto* cur_blocked_dep = _blocked_dep;
601601
auto fragment = _fragment_context.lock();

be/src/pipeline/pipeline_task.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,10 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
135135
int task_id() const { return _index; };
136136
bool is_finalized() const { return _finalized; }
137137

138-
void set_wake_up_early() { _wake_up_early = true; }
138+
void set_wake_up_early(PipelineId wake_by = -1) {
139+
_wake_up_early = true;
140+
_wake_by = wake_by;
141+
}
139142

140143
void clear_blocking_state() {
141144
auto fragment = _fragment_context.lock();
@@ -319,6 +322,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
319322
const std::string _pipeline_name;
320323
// PipelineTask maybe hold by TaskQueue
321324
std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
325+
int _wake_by = -1;
322326
};
323327

324328
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;

0 commit comments

Comments
 (0)