diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index d48d4a7b57fdfd..84f4483a56d495 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -251,7 +251,7 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { } else if (!status.ok()) { // Currently, a known error status is emitted when sender // close recei - throw std::runtime_error(status.msg()); + throw Exception(status.code(), status.msg()); } return false; } diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index a9c0c53e487499..a14f9afb1df21e 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -97,7 +97,7 @@ Status VSortedRunMerger::prepare(const vector& input_runs) { return Status::OK(); } -Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { +Status VSortedRunMerger::_get_next_internal(Block* output_block, bool* eos) { ScopedTimer timer(_get_next_timer); // Only have one receive data queue of data, no need to do merge and // copy the data of block. @@ -213,6 +213,10 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { return Status::OK(); } +Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { + RETURN_IF_CATCH_EXCEPTION(return _get_next_internal(output_block, eos)); +} + bool VSortedRunMerger::next_heap(MergeSortCursor& current) { if (!current->isLast()) { current->next(); diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 2f9ebe04a68b38..5c43110f71ab2a 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -99,6 +99,8 @@ class VSortedRunMerger { /// In pipeline engine, return false if need to read one more block from sender. bool next_heap(MergeSortCursor& current); bool has_next_block(MergeSortCursor& current); + + Status _get_next_internal(Block* output_block, bool* eos); }; } // namespace vectorized