Skip to content

Commit a1fb36d

Browse files
committed
[enhancement](filecache) enable warm up only idx file
this is a prototype commit to achieve the goal. the user interface must be refined in the future, use session vars or warm up properties. Signed-off-by: zhengyu <zhangzhengyu@selectdb.com>
1 parent 2cfcc0f commit a1fb36d

File tree

5 files changed

+128
-111
lines changed

5 files changed

+128
-111
lines changed

be/src/cloud/cloud_internal_service.cpp

Lines changed: 76 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -455,80 +455,88 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
455455
}
456456

457457
for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
458-
auto segment_size = rs_meta.segment_file_size(segment_id);
459-
auto download_done = [=, version = rs_meta.version()](Status st) {
460-
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
461-
auto sleep_time = dp->param<int>("sleep", 3);
462-
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
463-
rowset_id.to_string(), version.to_string(), sleep_time);
464-
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
465-
});
466-
DBUG_EXECUTE_IF(
467-
"CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_error", {
468-
st = Status::InternalError("injected error");
469-
LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}",
470-
tablet_id, rowset_id.to_string(), st.to_string());
471-
});
472-
if (st.ok()) {
473-
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
474-
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
475-
int64_t now_ts = std::chrono::duration_cast<std::chrono::microseconds>(
476-
std::chrono::system_clock::now().time_since_epoch())
477-
.count();
478-
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
479-
g_file_cache_warm_up_rowset_latency << (now_ts - request_ts);
480-
g_file_cache_warm_up_rowset_handle_to_finish_latency << (now_ts - handle_ts);
481-
if (request_ts > 0 &&
482-
now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) {
483-
g_file_cache_warm_up_rowset_slow_count << 1;
484-
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
485-
<< " us, tablet_id: " << tablet_id
486-
<< ", rowset_id: " << rowset_id.to_string()
487-
<< ", segment_id: " << segment_id;
458+
if (!config::enable_only_warm_up_idx) {
459+
auto segment_size = rs_meta.segment_file_size(segment_id);
460+
auto download_done = [=, version = rs_meta.version()](Status st) {
461+
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
462+
auto sleep_time = dp->param<int>("sleep", 3);
463+
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
464+
rowset_id.to_string(), version.to_string(), sleep_time);
465+
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
466+
});
467+
DBUG_EXECUTE_IF(
468+
"CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_"
469+
"error",
470+
{
471+
st = Status::InternalError("injected error");
472+
LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}",
473+
tablet_id, rowset_id.to_string(), st.to_string());
474+
});
475+
if (st.ok()) {
476+
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
477+
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
478+
int64_t now_ts =
479+
std::chrono::duration_cast<std::chrono::microseconds>(
480+
std::chrono::system_clock::now().time_since_epoch())
481+
.count();
482+
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
483+
g_file_cache_warm_up_rowset_latency << (now_ts - request_ts);
484+
g_file_cache_warm_up_rowset_handle_to_finish_latency
485+
<< (now_ts - handle_ts);
486+
if (request_ts > 0 &&
487+
now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) {
488+
g_file_cache_warm_up_rowset_slow_count << 1;
489+
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
490+
<< " us, tablet_id: " << tablet_id
491+
<< ", rowset_id: " << rowset_id.to_string()
492+
<< ", segment_id: " << segment_id;
493+
}
494+
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
495+
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
496+
LOG(INFO) << "warm up rowset (handle to finish) took "
497+
<< now_ts - handle_ts << " us, tablet_id: " << tablet_id
498+
<< ", rowset_id: " << rowset_id.to_string()
499+
<< ", segment_id: " << segment_id;
500+
}
501+
} else {
502+
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
503+
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
504+
LOG(WARNING)
505+
<< "download segment failed, tablet_id: " << tablet_id
506+
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
488507
}
489-
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
490-
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
491-
LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts
492-
<< " us, tablet_id: " << tablet_id
493-
<< ", rowset_id: " << rowset_id.to_string()
494-
<< ", segment_id: " << segment_id;
508+
if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN,
509+
rowset_id, st, 1, 0)
510+
.trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) {
511+
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
512+
<< rowset_id.to_string() << ") completed";
495513
}
496-
} else {
497-
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
498-
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
499-
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
500-
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
501-
}
502-
if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN,
503-
rowset_id, st, 1, 0)
504-
.trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) {
505-
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
506-
<< rowset_id.to_string() << ") completed";
507-
}
514+
if (wait) {
515+
wait->signal();
516+
}
517+
};
518+
519+
io::DownloadFileMeta download_meta {
520+
.path = storage_resource.value()->remote_segment_path(rs_meta, segment_id),
521+
.file_size = segment_size,
522+
.offset = 0,
523+
.download_size = segment_size,
524+
.file_system = storage_resource.value()->fs,
525+
.ctx = {.is_index_data = false,
526+
.expiration_time = expiration_time,
527+
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
528+
.is_warmup = true},
529+
.download_done = std::move(download_done),
530+
};
531+
532+
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
533+
g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
508534
if (wait) {
509-
wait->signal();
535+
wait->add_count();
510536
}
511-
};
512537

513-
io::DownloadFileMeta download_meta {
514-
.path = storage_resource.value()->remote_segment_path(rs_meta, segment_id),
515-
.file_size = segment_size,
516-
.offset = 0,
517-
.download_size = segment_size,
518-
.file_system = storage_resource.value()->fs,
519-
.ctx = {.is_index_data = false,
520-
.expiration_time = expiration_time,
521-
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
522-
.is_warmup = true},
523-
.download_done = std::move(download_done),
524-
};
525-
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
526-
g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
527-
if (wait) {
528-
wait->add_count();
538+
_engine.file_cache_block_downloader().submit_download_task(download_meta);
529539
}
530-
_engine.file_cache_block_downloader().submit_download_task(download_meta);
531-
532540
auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) {
533541
auto storage_resource = rs_meta.remote_storage_resource();
534542
auto download_done = [=, version = rs_meta.version()](Status st) {

be/src/cloud/cloud_tablet.cpp

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -426,35 +426,37 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
426426
}
427427
// clang-format off
428428
auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
429-
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
430-
.path = storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
431-
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
432-
.file_system = storage_resource.value()->fs,
433-
.ctx =
434-
{
435-
.expiration_time = expiration_time,
436-
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
437-
.is_warmup = true
438-
},
439-
.download_done {[=](Status st) {
440-
DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", {
441-
if (rs->version().second > rs->version().first) {
442-
auto sleep_time = dp->param<int>("sleep", 3);
443-
LOG_INFO(
444-
"[verbose] block download for rowset={}, "
445-
"version={}, sleep={}",
446-
rs->rowset_id().to_string(),
447-
rs->version().to_string(), sleep_time);
448-
std::this_thread::sleep_for(
449-
std::chrono::seconds(sleep_time));
450-
}
451-
});
452-
self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
453-
if (!st) {
454-
LOG_WARNING("add rowset warm up error ").error(st);
455-
}
456-
}},
457-
});
429+
if (!config::enable_only_warm_up_idx) {
430+
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
431+
.path = storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
432+
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
433+
.file_system = storage_resource.value()->fs,
434+
.ctx =
435+
{
436+
.expiration_time = expiration_time,
437+
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
438+
.is_warmup = true
439+
},
440+
.download_done {[=](Status st) {
441+
DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", {
442+
if (rs->version().second > rs->version().first) {
443+
auto sleep_time = dp->param<int>("sleep", 3);
444+
LOG_INFO(
445+
"[verbose] block download for rowset={}, "
446+
"version={}, sleep={}",
447+
rs->rowset_id().to_string(),
448+
rs->version().to_string(), sleep_time);
449+
std::this_thread::sleep_for(
450+
std::chrono::seconds(sleep_time));
451+
}
452+
});
453+
self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
454+
if (!st) {
455+
LOG_WARNING("add rowset warm up error ").error(st);
456+
}
457+
}},
458+
});
459+
}
458460

459461
auto download_idx_file = [&, self](const io::Path& idx_path, int64_t idx_size) {
460462
io::DownloadFileMeta meta {

be/src/cloud/cloud_warm_up_manager.cpp

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -246,20 +246,23 @@ void CloudWarmUpManager::handle_jobs() {
246246
}
247247
for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) {
248248
// 1st. download segment files
249-
submit_download_tasks(
250-
storage_resource.value()->remote_segment_path(*rs, seg_id),
251-
rs->segment_file_size(cast_set<int>(seg_id)),
252-
storage_resource.value()->fs, expiration_time, wait, false,
253-
[tablet, rs, seg_id](Status st) {
254-
VLOG_DEBUG << "warmup rowset " << rs->version() << " segment "
255-
<< seg_id << " completed";
256-
if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::JOB,
257-
rs->rowset_id(), st, 1,
258-
0)
259-
.trigger_source == WarmUpTriggerSource::JOB) {
260-
VLOG_DEBUG << "warmup rowset " << rs->version() << " completed";
261-
}
262-
});
249+
if (!config::enable_only_warm_up_idx) {
250+
submit_download_tasks(
251+
storage_resource.value()->remote_segment_path(*rs, seg_id),
252+
rs->segment_file_size(cast_set<int>(seg_id)),
253+
storage_resource.value()->fs, expiration_time, wait, false,
254+
[tablet, rs, seg_id](Status st) {
255+
VLOG_DEBUG << "warmup rowset " << rs->version() << " segment "
256+
<< seg_id << " completed";
257+
if (tablet->complete_rowset_segment_warmup(
258+
WarmUpTriggerSource::JOB, rs->rowset_id(), st,
259+
1, 0)
260+
.trigger_source == WarmUpTriggerSource::JOB) {
261+
VLOG_DEBUG << "warmup rowset " << rs->version()
262+
<< " completed";
263+
}
264+
});
265+
}
263266

264267
// 2nd. download inverted index files
265268
int64_t file_size = -1;

be/src/common/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,8 @@ DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000");
11671167
DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000");
11681168
DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false");
11691169

1170+
DEFINE_mBool(enable_only_warm_up_idx, "false");
1171+
11701172
DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
11711173
DEFINE_Int32(file_cache_downloader_thread_num_max, "32");
11721174

be/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,8 @@ DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num);
12091209
DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms);
12101210
DECLARE_mBool(enable_evaluate_shadow_queue_diff);
12111211

1212+
DECLARE_mBool(enable_only_warm_up_idx);
1213+
12121214
// inverted index searcher cache
12131215
// cache entry stay time after lookup
12141216
DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);

0 commit comments

Comments
 (0)