Skip to content

Commit 394023f

Browse files
committed
[Improvement](cloud) Hold tablets in another RPC thread (apache#52879)
With cloud mode on, remote tablets will be loaded into local storage before execution. This will block execution thread now. If it's too slow, pipeline execution thread may hang. This PR makes remote tablets loading asynchronously.
1 parent 5fbf4b1 commit 394023f

File tree

9 files changed

+121
-79
lines changed

9 files changed

+121
-79
lines changed

be/src/common/config.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,9 +1066,6 @@ DEFINE_mInt32(segcompaction_num_threads, "5");
10661066
// enable java udf and jdbc scannode
10671067
DEFINE_Bool(enable_java_support, "true");
10681068

1069-
// enable prefetch tablets before opening
1070-
DEFINE_mBool(enable_prefetch_tablet, "true");
1071-
10721069
// Set config randomly to check more issues in github workflow
10731070
DEFINE_Bool(enable_fuzzy_mode, "false");
10741071

be/src/common/config.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,9 +1108,6 @@ DECLARE_mInt32(segcompaction_num_threads);
11081108
// enable java udf and jdbc scannode
11091109
DECLARE_Bool(enable_java_support);
11101110

1111-
// enable prefetch tablets before opening
1112-
DECLARE_mBool(enable_prefetch_tablet);
1113-
11141111
// Set config randomly to check more issues in github workflow
11151112
DECLARE_Bool(enable_fuzzy_mode);
11161113

be/src/pipeline/exec/multi_cast_data_stream_source.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<Mul
4949
Status close(RuntimeState* state) override;
5050
friend class MultiCastDataStreamerSourceOperatorX;
5151

52-
std::vector<Dependency*> filter_dependencies() override {
52+
std::vector<Dependency*> execution_dependencies() override {
5353
if (_filter_dependencies.empty()) {
5454
return {};
5555
}

be/src/pipeline/exec/olap_scan_operator.cpp

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@
4646

4747
namespace doris::pipeline {
4848

49+
Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
50+
RETURN_IF_ERROR(Base::init(state, info));
51+
RETURN_IF_ERROR(_sync_cloud_tablets(state));
52+
return Status::OK();
53+
}
54+
4955
Status OlapScanLocalState::_init_profile() {
5056
RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::_init_profile());
5157
// Rows read from storage.
@@ -359,7 +365,6 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
359365
bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
360366
state()->query_options().resource_limit.__isset.cpu_limit;
361367

362-
RETURN_IF_ERROR(hold_tablets());
363368
if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
364369
p._push_down_agg_type == TPushAggOp::NONE &&
365370
(_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
@@ -453,30 +458,34 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
453458
return Status::OK();
454459
}
455460

456-
Status OlapScanLocalState::hold_tablets() {
457-
if (!_tablets.empty()) {
458-
return Status::OK();
459-
}
460-
461-
MonotonicStopWatch timer;
462-
timer.start();
463-
_tablets.resize(_scan_ranges.size());
464-
_read_sources.resize(_scan_ranges.size());
465-
466-
if (config::is_cloud_mode()) {
467-
std::vector<SyncRowsetStats> sync_statistics(_scan_ranges.size());
468-
std::vector<std::function<Status()>> tasks {};
469-
tasks.reserve(_scan_ranges.size());
470-
int64_t duration_ns {0};
471-
{
472-
SCOPED_RAW_TIMER(&duration_ns);
461+
Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
462+
if (config::is_cloud_mode() && !_sync_tablet) {
463+
_pending_tablets_num = _scan_ranges.size();
464+
if (_pending_tablets_num > 0) {
465+
_sync_cloud_tablets_watcher.start();
466+
_cloud_tablet_dependency = Dependency::create_shared(
467+
_parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP");
468+
_tablets.resize(_scan_ranges.size());
469+
_tasks.reserve(_scan_ranges.size());
470+
_sync_statistics.resize(_scan_ranges.size());
473471
for (size_t i = 0; i < _scan_ranges.size(); i++) {
474-
auto* sync_stats = &sync_statistics[i];
472+
auto* sync_stats = &_sync_statistics[i];
475473
int64_t version = 0;
476474
std::from_chars(_scan_ranges[i]->version.data(),
477475
_scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(),
478476
version);
479-
tasks.emplace_back([this, sync_stats, version, i]() {
477+
auto task_ctx = state->get_task_execution_context();
478+
_tasks.emplace_back([this, sync_stats, version, i, task_ctx]() {
479+
auto task_lock = task_ctx.lock();
480+
if (task_lock == nullptr) {
481+
return Status::OK();
482+
}
483+
Defer defer([&] {
484+
if (_pending_tablets_num.fetch_sub(1) == 1) {
485+
_cloud_tablet_dependency->set_ready();
486+
_sync_cloud_tablets_watcher.stop();
487+
}
488+
});
480489
auto tablet =
481490
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats));
482491
_tablets[i] = {std::move(tablet), version};
@@ -488,17 +497,37 @@ Status OlapScanLocalState::hold_tablets() {
488497
return Status::OK();
489498
});
490499
}
491-
RETURN_IF_ERROR(
492-
cloud::bthread_fork_join(tasks, config::init_scanner_sync_rowsets_parallelism));
500+
RETURN_IF_ERROR(cloud::bthread_fork_join(
501+
_tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future));
502+
}
503+
_sync_tablet = true;
504+
}
505+
return Status::OK();
506+
}
507+
508+
Status OlapScanLocalState::prepare(RuntimeState* state) {
509+
if (_prepared) {
510+
return Status::OK();
511+
}
512+
MonotonicStopWatch timer;
513+
timer.start();
514+
_read_sources.resize(_scan_ranges.size());
515+
516+
if (config::is_cloud_mode()) {
517+
if (!_cloud_tablet_dependency ||
518+
_cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) {
519+
// Remote tablet still in-flight.
520+
return Status::OK();
493521
}
494-
COUNTER_UPDATE(_sync_rowset_timer, duration_ns);
522+
DCHECK(_cloud_tablet_future.valid() && _cloud_tablet_future.get().ok());
523+
COUNTER_UPDATE(_sync_rowset_timer, _sync_cloud_tablets_watcher.elapsed_time());
495524
auto total_rowsets = std::accumulate(
496525
_tablets.cbegin(), _tablets.cend(), 0LL,
497526
[](long long acc, const auto& tabletWithVersion) {
498527
return acc + tabletWithVersion.tablet->tablet_meta()->all_rs_metas().size();
499528
});
500529
COUNTER_UPDATE(_sync_rowset_tablets_rowsets_total_num, total_rowsets);
501-
for (const auto& sync_stats : sync_statistics) {
530+
for (const auto& sync_stats : _sync_statistics) {
502531
COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_hit, sync_stats.tablet_meta_cache_hit);
503532
COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_miss, sync_stats.tablet_meta_cache_miss);
504533
COUNTER_UPDATE(_sync_rowset_get_remote_tablet_meta_rpc_timer,
@@ -518,6 +547,7 @@ Status OlapScanLocalState::hold_tablets() {
518547
sync_stats.get_remote_delete_bitmap_rpc_ns);
519548
}
520549
} else {
550+
_tablets.resize(_scan_ranges.size());
521551
for (size_t i = 0; i < _scan_ranges.size(); i++) {
522552
int64_t version = 0;
523553
std::from_chars(_scan_ranges[i]->version.data(),
@@ -553,6 +583,7 @@ Status OlapScanLocalState::hold_tablets() {
553583
cost_secs, print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(),
554584
_scan_ranges.size());
555585
}
586+
_prepared = true;
556587
return Status::OK();
557588
}
558589

@@ -765,9 +796,4 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, i
765796
}
766797
}
767798

768-
Status OlapScanOperatorX::hold_tablets(RuntimeState* state) {
769-
auto& local_state = ScanOperatorX<OlapScanLocalState>::get_local_state(state);
770-
return local_state.hold_tablets();
771-
}
772-
773799
} // namespace doris::pipeline

be/src/pipeline/exec/olap_scan_operator.h

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <string>
2323

24+
#include "cloud/cloud_tablet.h"
2425
#include "common/status.h"
2526
#include "olap/tablet_reader.h"
2627
#include "operator.h"
@@ -39,22 +40,31 @@ class OlapScanOperatorX;
3940
class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
4041
public:
4142
using Parent = OlapScanOperatorX;
43+
using Base = ScanLocalState<OlapScanLocalState>;
4244
ENABLE_FACTORY_CREATOR(OlapScanLocalState);
43-
OlapScanLocalState(RuntimeState* state, OperatorXBase* parent)
44-
: ScanLocalState(state, parent) {}
45-
45+
OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}
46+
Status init(RuntimeState* state, LocalStateInfo& info) override;
47+
Status prepare(RuntimeState* state) override;
4648
TOlapScanNode& olap_scan_node() const;
4749

4850
std::string name_suffix() const override {
4951
return fmt::format(" (id={}. nereids_id={}. table name = {})",
5052
std::to_string(_parent->node_id()),
5153
std::to_string(_parent->nereids_id()), olap_scan_node().table_name);
5254
}
53-
Status hold_tablets();
55+
std::vector<Dependency*> execution_dependencies() override {
56+
if (!_cloud_tablet_dependency) {
57+
return Base::execution_dependencies();
58+
}
59+
std::vector<Dependency*> res = Base::execution_dependencies();
60+
res.push_back(_cloud_tablet_dependency.get());
61+
return res;
62+
}
5463

5564
private:
5665
friend class vectorized::NewOlapScanner;
5766

67+
Status _sync_cloud_tablets(RuntimeState* state);
5868
void set_scan_ranges(RuntimeState* state,
5969
const std::vector<TScanRangeParams>& scan_ranges) override;
6070
Status _init_profile() override;
@@ -91,6 +101,14 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
91101
Status _build_key_ranges_and_filters();
92102

93103
std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
104+
std::vector<SyncRowsetStats> _sync_statistics;
105+
std::vector<std::function<Status()>> _tasks;
106+
MonotonicStopWatch _sync_cloud_tablets_watcher;
107+
std::shared_ptr<Dependency> _cloud_tablet_dependency;
108+
std::atomic<size_t> _pending_tablets_num = 0;
109+
bool _prepared = false;
110+
std::future<Status> _cloud_tablet_future;
111+
std::atomic_bool _sync_tablet = false;
94112
std::vector<std::unique_ptr<doris::OlapScanRange>> _cond_ranges;
95113
OlapScanKeys _scan_keys;
96114
std::vector<TCondition> _olap_filters;
@@ -237,7 +255,6 @@ class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
237255
OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
238256
const DescriptorTbl& descs, int parallel_tasks,
239257
const TQueryCacheParam& cache_param);
240-
Status hold_tablets(RuntimeState* state) override;
241258

242259
private:
243260
friend class OlapScanLocalState;

be/src/pipeline/exec/operator.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ class PipelineXLocalStateBase {
152152
// Do initialization. This step should be executed only once and in bthread, so we can do some
153153
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
154154
virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
155+
// Make sure all resources are ready before execution. For example, remote tablets should be
156+
// loaded to local storage.
157+
// This is called by execution pthread and different from `Operator::prepare` which is called
158+
// by bthread.
159+
virtual Status prepare(RuntimeState* state) = 0;
155160
// Do initialization. This step can be executed multiple times, so we should make sure it is
156161
// idempotent (e.g. wait for runtime filters).
157162
virtual Status open(RuntimeState* state) = 0;
@@ -180,7 +185,7 @@ class PipelineXLocalStateBase {
180185
// override in Scan
181186
virtual Dependency* finishdependency() { return nullptr; }
182187
// override in Scan MultiCastSink
183-
virtual std::vector<Dependency*> filter_dependencies() { return {}; }
188+
virtual std::vector<Dependency*> execution_dependencies() { return {}; }
184189

185190
std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; }
186191

@@ -227,6 +232,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase {
227232
~PipelineXLocalState() override = default;
228233

229234
Status init(RuntimeState* state, LocalStateInfo& info) override;
235+
Status prepare(RuntimeState* state) override { return Status::OK(); }
230236
Status open(RuntimeState* state) override;
231237

232238
virtual std::string name_suffix() const;
@@ -311,6 +317,7 @@ class PipelineXSinkLocalStateBase {
311317
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
312318
virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0;
313319

320+
virtual Status prepare(RuntimeState* state) = 0;
314321
// Do initialization. This step can be executed multiple times, so we should make sure it is
315322
// idempotent (e.g. wait for runtime filters).
316323
virtual Status open(RuntimeState* state) = 0;
@@ -388,6 +395,7 @@ class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase {
388395

389396
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
390397

398+
Status prepare(RuntimeState* state) override { return Status::OK(); }
391399
Status open(RuntimeState* state) override { return Status::OK(); }
392400

393401
Status close(RuntimeState* state, Status exec_status) override;
@@ -648,8 +656,8 @@ class OperatorXBase : public OperatorBase {
648656
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; }
649657

650658
// Tablets should be hold before open phase.
651-
[[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return Status::OK(); }
652659
Status open(RuntimeState* state) override;
660+
Status prepare(RuntimeState* state) override;
653661

654662
[[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block,
655663
bool* eos) = 0;

be/src/pipeline/exec/scan_operator.h

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon
8383
virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0;
8484
virtual void set_scan_ranges(RuntimeState* state,
8585
const std::vector<TScanRangeParams>& scan_ranges) = 0;
86-
8786
virtual TPushAggOp::type get_push_down_agg_type() = 0;
8887

8988
virtual int64_t get_push_down_count() = 0;
@@ -163,15 +162,13 @@ class ScanLocalState : public ScanLocalStateBase {
163162

164163
int64_t get_push_down_count() override;
165164

166-
std::vector<Dependency*> filter_dependencies() override {
165+
std::vector<Dependency*> execution_dependencies() override {
167166
if (_filter_dependencies.empty()) {
168167
return {};
169168
}
170-
std::vector<Dependency*> res;
171-
res.resize(_filter_dependencies.size());
172-
for (size_t i = 0; i < _filter_dependencies.size(); i++) {
173-
res[i] = _filter_dependencies[i].get();
174-
}
169+
std::vector<Dependency*> res(_filter_dependencies.size());
170+
std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(),
171+
[](DependencySPtr dep) { return dep.get(); });
175172
return res;
176173
}
177174

0 commit comments

Comments
 (0)