Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 76 additions & 68 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,80 +455,88 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}

for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
auto segment_size = rs_meta.segment_file_size(segment_id);
auto download_done = [=, version = rs_meta.version()](Status st) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
rowset_id.to_string(), version.to_string(), sleep_time);
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
});
DBUG_EXECUTE_IF(
"CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_error", {
st = Status::InternalError("injected error");
LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}",
tablet_id, rowset_id.to_string(), st.to_string());
});
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
int64_t now_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
g_file_cache_warm_up_rowset_latency << (now_ts - request_ts);
g_file_cache_warm_up_rowset_handle_to_finish_latency << (now_ts - handle_ts);
if (request_ts > 0 &&
now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_slow_count << 1;
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
<< " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
if (!config::file_cache_enable_only_warm_up_idx) {
auto segment_size = rs_meta.segment_file_size(segment_id);
auto download_done = [=, version = rs_meta.version()](Status st) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
rowset_id.to_string(), version.to_string(), sleep_time);
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
});
DBUG_EXECUTE_IF(
"CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_"
"error",
{
st = Status::InternalError("injected error");
LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}",
tablet_id, rowset_id.to_string(), st.to_string());
});
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
int64_t now_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
g_file_cache_warm_up_rowset_latency << (now_ts - request_ts);
g_file_cache_warm_up_rowset_handle_to_finish_latency
<< (now_ts - handle_ts);
if (request_ts > 0 &&
now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_slow_count << 1;
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
<< " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
LOG(INFO) << "warm up rowset (handle to finish) took "
<< now_ts - handle_ts << " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
} else {
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
LOG(WARNING)
<< "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts
<< " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN,
rowset_id, st, 1, 0)
.trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) {
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
<< rowset_id.to_string() << ") completed";
}
} else {
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN,
rowset_id, st, 1, 0)
.trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) {
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
<< rowset_id.to_string() << ") completed";
}
if (wait) {
wait->signal();
}
};

io::DownloadFileMeta download_meta {
.path = storage_resource.value()->remote_segment_path(rs_meta, segment_id),
.file_size = segment_size,
.offset = 0,
.download_size = segment_size,
.file_system = storage_resource.value()->fs,
.ctx = {.is_index_data = false,
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
.download_done = std::move(download_done),
};

g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
if (wait) {
wait->signal();
wait->add_count();
}
};

io::DownloadFileMeta download_meta {
.path = storage_resource.value()->remote_segment_path(rs_meta, segment_id),
.file_size = segment_size,
.offset = 0,
.download_size = segment_size,
.file_system = storage_resource.value()->fs,
.ctx = {.is_index_data = false,
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
.download_done = std::move(download_done),
};
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
if (wait) {
wait->add_count();
_engine.file_cache_block_downloader().submit_download_task(download_meta);
}
_engine.file_cache_block_downloader().submit_download_task(download_meta);

auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) {
auto storage_resource = rs_meta.remote_storage_resource();
auto download_done = [=, version = rs_meta.version()](Status st) {
Expand Down
60 changes: 31 additions & 29 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,35 +426,37 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
}
// clang-format off
auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true
},
.download_done {[=](Status st) {
DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", {
if (rs->version().second > rs->version().first) {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO(
"[verbose] block download for rowset={}, "
"version={}, sleep={}",
rs->rowset_id().to_string(),
rs->version().to_string(), sleep_time);
std::this_thread::sleep_for(
std::chrono::seconds(sleep_time));
}
});
self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
if (!st) {
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
});
if (!config::file_cache_enable_only_warm_up_idx) {
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true
},
.download_done {[=](Status st) {
DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", {
if (rs->version().second > rs->version().first) {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO(
"[verbose] block download for rowset={}, "
"version={}, sleep={}",
rs->rowset_id().to_string(),
rs->version().to_string(), sleep_time);
std::this_thread::sleep_for(
std::chrono::seconds(sleep_time));
}
});
self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
if (!st) {
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
});
}

auto download_idx_file = [&, self](const io::Path& idx_path, int64_t idx_size) {
io::DownloadFileMeta meta {
Expand Down
34 changes: 20 additions & 14 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait,
bool is_index, std::function<void(Status)> done_cb) {
VLOG_DEBUG << "submit warm up task for file: " << path << ", file_size: " << file_size
<< ", expiration_time: " << expiration_time
<< ", is_index: " << (is_index ? "true" : "false");
if (file_size < 0) {
auto st = file_system->file_size(path, &file_size);
if (!st.ok()) [[unlikely]] {
Expand Down Expand Up @@ -246,20 +249,23 @@ void CloudWarmUpManager::handle_jobs() {
}
for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) {
// 1st. download segment files
submit_download_tasks(
storage_resource.value()->remote_segment_path(*rs, seg_id),
rs->segment_file_size(cast_set<int>(seg_id)),
storage_resource.value()->fs, expiration_time, wait, false,
[tablet, rs, seg_id](Status st) {
VLOG_DEBUG << "warmup rowset " << rs->version() << " segment "
<< seg_id << " completed";
if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::JOB,
rs->rowset_id(), st, 1,
0)
.trigger_source == WarmUpTriggerSource::JOB) {
VLOG_DEBUG << "warmup rowset " << rs->version() << " completed";
}
});
if (!config::file_cache_enable_only_warm_up_idx) {
submit_download_tasks(
storage_resource.value()->remote_segment_path(*rs, seg_id),
rs->segment_file_size(cast_set<int>(seg_id)),
storage_resource.value()->fs, expiration_time, wait, false,
[tablet, rs, seg_id](Status st) {
VLOG_DEBUG << "warmup rowset " << rs->version() << " segment "
<< seg_id << " completed";
if (tablet->complete_rowset_segment_warmup(
WarmUpTriggerSource::JOB, rs->rowset_id(), st,
1, 0)
.trigger_source == WarmUpTriggerSource::JOB) {
VLOG_DEBUG << "warmup rowset " << rs->version()
<< " completed";
}
});
}

// 2nd. download inverted index files
int64_t file_size = -1;
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,8 @@ DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000");
DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000");
DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false");

DEFINE_mBool(file_cache_enable_only_warm_up_idx, "false");

DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
DEFINE_Int32(file_cache_downloader_thread_num_max, "32");

Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,8 @@ DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num);
DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms);
DECLARE_mBool(enable_evaluate_shadow_queue_diff);

DECLARE_mBool(file_cache_enable_only_warm_up_idx);

// inverted index searcher cache
// cache entry stay time after lookup
DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);
Expand Down
Loading