Skip to content

Commit 79729b0

Browse files
authored
[refactor](scanner) remove scanner scheduler class since it is already split into SimplifiedScheduler (apache#58310)
### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent 8471d2c commit 79729b0

File tree

13 files changed

+53
-141
lines changed

13 files changed

+53
-141
lines changed

be/src/exec/rowid_fetcher.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -831,8 +831,8 @@ Status RowIdStorageReader::read_batch_external_row(
831831
workload_group_ids.emplace_back(workload_group_id);
832832
auto wg = ExecEnv::GetInstance()->workload_group_mgr()->get_group(workload_group_ids);
833833
doris::pipeline::TaskScheduler* exec_sched = nullptr;
834-
vectorized::SimplifiedScanScheduler* scan_sched = nullptr;
835-
vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr;
834+
vectorized::ScannerScheduler* scan_sched = nullptr;
835+
vectorized::ScannerScheduler* remote_scan_sched = nullptr;
836836
wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
837837
DCHECK(remote_scan_sched);
838838

be/src/runtime/exec_env.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ class MemoryPool;
5151
namespace doris {
5252
namespace vectorized {
5353
class VDataStreamMgr;
54-
class ScannerScheduler;
5554
class SpillStreamManager;
5655
class DeltaWriterV2Pool;
5756
class DictionaryFactory;
@@ -283,7 +282,6 @@ class ExecEnv {
283282
StreamLoadExecutor* stream_load_executor() { return _stream_load_executor.get(); }
284283
RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; }
285284
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
286-
vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; }
287285
FileMetaCache* file_meta_cache() { return _file_meta_cache; }
288286
MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); }
289287
WalManager* wal_mgr() { return _wal_manager.get(); }
@@ -495,7 +493,6 @@ class ExecEnv {
495493
StreamLoadRecorderManager* _stream_load_recorder_manager = nullptr;
496494
SmallFileMgr* _small_file_mgr = nullptr;
497495
HeartbeatFlags* _heartbeat_flags = nullptr;
498-
vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
499496

500497
// To save meta info of external file, such as parquet footer.
501498
FileMetaCache* _file_meta_cache = nullptr;

be/src/runtime/exec_env_init.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
301301
_init_runtime_filter_timer_queue();
302302

303303
_workload_group_manager = new WorkloadGroupMgr();
304-
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
305304

306305
_fragment_mgr = new FragmentMgr(this);
307306
_result_cache = new ResultCache(config::query_cache_max_size_mb,
@@ -355,7 +354,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
355354
}
356355
_broker_mgr->init();
357356
static_cast<void>(_small_file_mgr->init());
358-
status = _scanner_scheduler->init(this);
359357
if (!status.ok()) {
360358
LOG(ERROR) << "Scanner scheduler init failed. " << status;
361359
return status;
@@ -768,7 +766,6 @@ void ExecEnv::destroy() {
768766
SAFE_STOP(_wal_manager);
769767
_wal_manager.reset();
770768
SAFE_STOP(_load_channel_mgr);
771-
SAFE_STOP(_scanner_scheduler);
772769
SAFE_STOP(_broker_mgr);
773770
SAFE_STOP(_load_path_mgr);
774771
SAFE_STOP(_result_mgr);
@@ -831,8 +828,6 @@ void ExecEnv::destroy() {
831828
SAFE_DELETE(_tablet_schema_cache);
832829
SAFE_DELETE(_tablet_column_object_pool);
833830

834-
// _scanner_scheduler must be desotried before _storage_page_cache
835-
SAFE_DELETE(_scanner_scheduler);
836831
// _storage_page_cache must be destoried before _cache_manager
837832
SAFE_DELETE(_storage_page_cache);
838833

be/src/runtime/query_context.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,9 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
201201

202202
TUniqueId query_id() const { return _query_id; }
203203

204-
vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
204+
vectorized::ScannerScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
205205

206-
vectorized::SimplifiedScanScheduler* get_remote_scan_scheduler() {
206+
vectorized::ScannerScheduler* get_remote_scan_scheduler() {
207207
return _remote_scan_task_scheduler;
208208
}
209209

@@ -323,8 +323,8 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
323323
AtomicStatus _exec_status;
324324

325325
doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
326-
vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
327-
vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
326+
vectorized::ScannerScheduler* _scan_task_scheduler = nullptr;
327+
vectorized::ScannerScheduler* _remote_scan_task_scheduler = nullptr;
328328
// This dependency indicates if the 2nd phase RPC received from FE.
329329
std::unique_ptr<pipeline::Dependency> _execution_dependency;
330330
// This dependency indicates if memory is sufficient to execute.

be/src/runtime/workload_group/workload_group.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
550550
}
551551

552552
if (_scan_task_sched == nullptr) {
553-
std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler;
553+
std::unique_ptr<vectorized::ScannerScheduler> scan_scheduler;
554554
if (config::enable_task_executor_in_internal_table) {
555555
scan_scheduler = std::make_unique<vectorized::TaskExecutorSimplifiedScanScheduler>(
556556
"ls_" + wg_name, cg_cpu_ctl_ptr, wg_name);
@@ -572,7 +572,7 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
572572
if (_remote_scan_task_sched == nullptr) {
573573
int remote_scan_thread_queue_size =
574574
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
575-
std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler;
575+
std::unique_ptr<vectorized::ScannerScheduler> remote_scan_scheduler;
576576
if (config::enable_task_executor_in_external_table) {
577577
remote_scan_scheduler =
578578
std::make_unique<vectorized::TaskExecutorSimplifiedScanScheduler>(
@@ -646,8 +646,8 @@ Status WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* wg_info) {
646646
}
647647

648648
void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
649-
vectorized::SimplifiedScanScheduler** scan_sched,
650-
vectorized::SimplifiedScanScheduler** remote_scan_sched) {
649+
vectorized::ScannerScheduler** scan_sched,
650+
vectorized::ScannerScheduler** remote_scan_sched) {
651651
std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
652652
*exec_sched = _task_sched.get();
653653
*scan_sched = _scan_task_sched.get();

be/src/runtime/workload_group/workload_group.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class IOThrottle;
4545
class ResourceContext;
4646

4747
namespace vectorized {
48-
class SimplifiedScanScheduler;
48+
class ScannerScheduler;
4949
}
5050

5151
namespace pipeline {
@@ -167,8 +167,8 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
167167
Status upsert_task_scheduler(WorkloadGroupInfo* tg_info);
168168

169169
virtual void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
170-
vectorized::SimplifiedScanScheduler** scan_sched,
171-
vectorized::SimplifiedScanScheduler** remote_scan_sched);
170+
vectorized::ScannerScheduler** scan_sched,
171+
vectorized::ScannerScheduler** remote_scan_sched);
172172

173173
void try_stop_schedulers();
174174

@@ -250,8 +250,8 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
250250
// so it should be shared ptr;
251251
std::shared_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
252252
std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
253-
std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched {nullptr};
254-
std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_task_sched {nullptr};
253+
std::unique_ptr<vectorized::ScannerScheduler> _scan_task_sched {nullptr};
254+
std::unique_ptr<vectorized::ScannerScheduler> _remote_scan_task_sched {nullptr};
255255
std::unique_ptr<ThreadPool> _memtable_flush_pool {nullptr};
256256

257257
std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map;

be/src/service/internal_service.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2127,8 +2127,8 @@ void PInternalService::multiget_data_v2(google::protobuf::RpcController* control
21272127
}
21282128

21292129
doris::pipeline::TaskScheduler* exec_sched = nullptr;
2130-
vectorized::SimplifiedScanScheduler* scan_sched = nullptr;
2131-
vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr;
2130+
vectorized::ScannerScheduler* scan_sched = nullptr;
2131+
vectorized::ScannerScheduler* remote_scan_sched = nullptr;
21322132
wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
21332133
DCHECK(remote_scan_sched);
21342134

be/src/vec/exec/scan/scanner_context.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ ScannerContext::ScannerContext(
6666
_output_row_descriptor(output_row_descriptor),
6767
_batch_size(state->batch_size()),
6868
limit(limit_),
69-
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
7069
_all_scanners(scanners.begin(), scanners.end()),
7170
_parallism_of_scan_operator(parallism_of_scan_operator),
7271
_min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()),
@@ -266,7 +265,7 @@ Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
266265
// if submit succeed, it will be also added back by ScannerContext::push_back_scan_task
267266
// see ScannerScheduler::_scanner_scan.
268267
_num_scheduled_scanners++;
269-
return _scanner_scheduler_global->submit(shared_from_this(), scan_task);
268+
return _scanner_scheduler->submit(shared_from_this(), scan_task);
270269
}
271270

272271
void ScannerContext::clear_free_blocks() {
@@ -523,7 +522,7 @@ int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
523522

524523
// This function must be called with:
525524
// 1. _transfer_lock held.
526-
// 2. SimplifiedScanScheduler::_lock held.
525+
// 2. ScannerScheduler::_lock held.
527526
Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
528527
std::unique_lock<std::mutex>& transfer_lock,
529528
std::unique_lock<std::shared_mutex>& scheduler_lock) {

be/src/vec/exec/scan/scanner_context.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ namespace vectorized {
5656
class Scanner;
5757
class ScannerDelegate;
5858
class ScannerScheduler;
59-
class SimplifiedScanScheduler;
59+
class ScannerScheduler;
6060
class TaskExecutor;
6161
class TaskHandle;
6262

@@ -116,7 +116,7 @@ class ScanTask {
116116
class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
117117
public HasTaskExecutionCtx {
118118
ENABLE_FACTORY_CREATOR(ScannerContext);
119-
friend class SimplifiedScanScheduler;
119+
friend class ScannerScheduler;
120120

121121
public:
122122
ScannerContext(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
@@ -165,8 +165,6 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
165165

166166
RuntimeState* state() { return _state; }
167167

168-
SimplifiedScanScheduler* get_scan_scheduler() { return _scanner_scheduler; }
169-
170168
void stop_scanners(RuntimeState* state);
171169

172170
int batch_size() const { return _batch_size; }
@@ -230,8 +228,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
230228
int64_t limit;
231229

232230
int64_t _max_bytes_in_queue = 0;
233-
doris::vectorized::ScannerScheduler* _scanner_scheduler_global = nullptr;
234-
SimplifiedScanScheduler* _scanner_scheduler = nullptr;
231+
ScannerScheduler* _scanner_scheduler = nullptr;
235232
// Using stack so that we can resubmit scanner in a LIFO order, maybe more cache friendly
236233
std::stack<std::shared_ptr<ScanTask>> _pending_scanners;
237234
// Scanner that is submitted to the scheduler.

be/src/vec/exec/scan/scanner_scheduler.cpp

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,6 @@
5252

5353
namespace doris::vectorized {
5454

55-
ScannerScheduler::ScannerScheduler() = default;
56-
57-
ScannerScheduler::~ScannerScheduler() = default;
58-
59-
void ScannerScheduler::stop() {
60-
if (!_is_init) {
61-
return;
62-
}
63-
64-
_is_closed = true;
65-
66-
LOG(INFO) << "ScannerScheduler stopped";
67-
}
68-
69-
Status ScannerScheduler::init(ExecEnv* env) {
70-
_is_init = true;
71-
return Status::OK();
72-
}
73-
7455
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
7556
std::shared_ptr<ScanTask> scan_task) {
7657
if (ctx->done()) {
@@ -90,7 +71,6 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
9071
scanner_delegate->_scanner->start_wait_worker_timer();
9172
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
9273
auto sumbit_task = [&]() {
93-
SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
9474
auto work_func = [scanner_ref = scan_task, ctx]() {
9575
auto status = [&] {
9676
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
@@ -105,7 +85,7 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
10585
return scanner_ref->is_eos();
10686
};
10787
SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
108-
return scan_sched->submit_scan_task(simple_scan_task);
88+
return this->submit_scan_task(simple_scan_task);
10989
};
11090

11191
Status submit_status = sumbit_task();

0 commit comments

Comments
 (0)