diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 85a44fbf6f7d63..ca696409e5ff18 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -455,80 +455,88 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c } for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) { - auto segment_size = rs_meta.segment_file_size(segment_id); - auto download_done = [=, version = rs_meta.version()](Status st) { - DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", { - auto sleep_time = dp->param("sleep", 3); - LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}", - rowset_id.to_string(), version.to_string(), sleep_time); - std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); - }); - DBUG_EXECUTE_IF( - "CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_error", { - st = Status::InternalError("injected error"); - LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}", - tablet_id, rowset_id.to_string(), st.to_string()); - }); - if (st.ok()) { - g_file_cache_event_driven_warm_up_finished_segment_num << 1; - g_file_cache_event_driven_warm_up_finished_segment_size << segment_size; - int64_t now_ts = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts); - g_file_cache_warm_up_rowset_latency << (now_ts - request_ts); - g_file_cache_warm_up_rowset_handle_to_finish_latency << (now_ts - handle_ts); - if (request_ts > 0 && - now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) { - g_file_cache_warm_up_rowset_slow_count << 1; - LOG(INFO) << "warm up rowset took " << now_ts - request_ts - << " us, tablet_id: " << tablet_id - << ", rowset_id: " << rowset_id.to_string() - << ", segment_id: " << segment_id; + if (!config::file_cache_enable_only_warm_up_idx) { + auto segment_size = rs_meta.segment_file_size(segment_id); + auto download_done = [=, version = rs_meta.version()](Status st) { + DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", { + auto sleep_time = dp->param("sleep", 3); + LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}", + rowset_id.to_string(), version.to_string(), sleep_time); + std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); + }); + DBUG_EXECUTE_IF( + "CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_" + "error", + { + st = Status::InternalError("injected error"); + LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}", + tablet_id, rowset_id.to_string(), st.to_string()); + }); + if (st.ok()) { + g_file_cache_event_driven_warm_up_finished_segment_num << 1; + g_file_cache_event_driven_warm_up_finished_segment_size << segment_size; + int64_t now_ts = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts); + g_file_cache_warm_up_rowset_latency << (now_ts - request_ts); + g_file_cache_warm_up_rowset_handle_to_finish_latency + << (now_ts - handle_ts); + if (request_ts > 0 && + now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) { + g_file_cache_warm_up_rowset_slow_count << 1; + LOG(INFO) << "warm up rowset took " << now_ts - request_ts + << " us, tablet_id: " << tablet_id + << ", rowset_id: " << rowset_id.to_string() + << ", segment_id: " << segment_id; + } + if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) { + g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1; + LOG(INFO) << "warm up rowset (handle to finish) took " + << now_ts - handle_ts << " us, tablet_id: " << tablet_id + << ", rowset_id: " << rowset_id.to_string() + << ", segment_id: " << segment_id; + } + } else { + g_file_cache_event_driven_warm_up_failed_segment_num << 1; + g_file_cache_event_driven_warm_up_failed_segment_size << segment_size; + LOG(WARNING) + << "download segment failed, tablet_id: " << tablet_id + << " rowset_id: " << rowset_id.to_string() << ", error: " << st; } - if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) { - g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1; - LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts - << " us, tablet_id: " << tablet_id - << ", rowset_id: " << rowset_id.to_string() - << ", segment_id: " << segment_id; + if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN, + rowset_id, st, 1, 0) + .trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) { + VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" + << rowset_id.to_string() << ") completed"; } - } else { - g_file_cache_event_driven_warm_up_failed_segment_num << 1; - g_file_cache_event_driven_warm_up_failed_segment_size << segment_size; - LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id - << " rowset_id: " << rowset_id.to_string() << ", error: " << st; - } - if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN, - rowset_id, st, 1, 0) - .trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) { - VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" - << rowset_id.to_string() << ") completed"; - } + if (wait) { + wait->signal(); + } + }; + + io::DownloadFileMeta download_meta { + .path = storage_resource.value()->remote_segment_path(rs_meta, segment_id), + .file_size = segment_size, + .offset = 0, + .download_size = segment_size, + .file_system = storage_resource.value()->fs, + .ctx = {.is_index_data = false, + .expiration_time = expiration_time, + .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, + .is_warmup = true}, + .download_done = std::move(download_done), + }; + + g_file_cache_event_driven_warm_up_submitted_segment_num << 1; + g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size; if (wait) { - wait->signal(); + wait->add_count(); } - }; - io::DownloadFileMeta download_meta { - .path = storage_resource.value()->remote_segment_path(rs_meta, segment_id), - .file_size = segment_size, - .offset = 0, - .download_size = segment_size, - .file_system = storage_resource.value()->fs, - .ctx = {.is_index_data = false, - .expiration_time = expiration_time, - .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, - .is_warmup = true}, - .download_done = std::move(download_done), - }; - g_file_cache_event_driven_warm_up_submitted_segment_num << 1; - g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size; - if (wait) { - wait->add_count(); + _engine.file_cache_block_downloader().submit_download_task(download_meta); } - _engine.file_cache_block_downloader().submit_download_task(download_meta); - auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) { auto storage_resource = rs_meta.remote_storage_resource(); auto download_done = [=, version = rs_meta.version()](Status st) { diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 6e99d41985b407..cba53b8c5333e3 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -426,35 +426,37 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ } // clang-format off auto self = std::dynamic_pointer_cast(shared_from_this()); - _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { - .path = storage_resource.value()->remote_segment_path(*rowset_meta, seg_id), - .file_size = rs->rowset_meta()->segment_file_size(seg_id), - .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, - .is_warmup = true - }, - .download_done {[=](Status st) { - DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", { - if (rs->version().second > rs->version().first) { - auto sleep_time = dp->param("sleep", 3); - LOG_INFO( - "[verbose] block download for rowset={}, " - "version={}, sleep={}", - rs->rowset_id().to_string(), - rs->version().to_string(), sleep_time); - std::this_thread::sleep_for( - std::chrono::seconds(sleep_time)); - } - }); - self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0); - if (!st) { - LOG_WARNING("add rowset warm up error ").error(st); - } - }}, - }); + if (!config::file_cache_enable_only_warm_up_idx) { + _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { + .path = storage_resource.value()->remote_segment_path(*rowset_meta, seg_id), + .file_size = rs->rowset_meta()->segment_file_size(seg_id), + .file_system = storage_resource.value()->fs, + .ctx = + { + .expiration_time = expiration_time, + .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, + .is_warmup = true + }, + .download_done {[=](Status st) { + DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", { + if (rs->version().second > rs->version().first) { + auto sleep_time = dp->param("sleep", 3); + LOG_INFO( + "[verbose] block download for rowset={}, " + "version={}, sleep={}", + rs->rowset_id().to_string(), + rs->version().to_string(), sleep_time); + std::this_thread::sleep_for( + std::chrono::seconds(sleep_time)); + } + }); + self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0); + if (!st) { + LOG_WARNING("add rowset warm up error ").error(st); + } + }}, + }); + } auto download_idx_file = [&, self](const io::Path& idx_path, int64_t idx_size) { io::DownloadFileMeta meta { diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 272b690facf78f..dfdef1b3091637 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -130,6 +130,9 @@ void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size, int64_t expiration_time, std::shared_ptr wait, bool is_index, std::function done_cb) { + VLOG_DEBUG << "submit warm up task for file: " << path << ", file_size: " << file_size + << ", expiration_time: " << expiration_time + << ", is_index: " << (is_index ? "true" : "false"); if (file_size < 0) { auto st = file_system->file_size(path, &file_size); if (!st.ok()) [[unlikely]] { @@ -246,20 +249,23 @@ void CloudWarmUpManager::handle_jobs() { } for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) { // 1st. download segment files - submit_download_tasks( - storage_resource.value()->remote_segment_path(*rs, seg_id), - rs->segment_file_size(cast_set(seg_id)), - storage_resource.value()->fs, expiration_time, wait, false, - [tablet, rs, seg_id](Status st) { - VLOG_DEBUG << "warmup rowset " << rs->version() << " segment " - << seg_id << " completed"; - if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::JOB, - rs->rowset_id(), st, 1, - 0) - .trigger_source == WarmUpTriggerSource::JOB) { - VLOG_DEBUG << "warmup rowset " << rs->version() << " completed"; - } - }); + if (!config::file_cache_enable_only_warm_up_idx) { + submit_download_tasks( + storage_resource.value()->remote_segment_path(*rs, seg_id), + rs->segment_file_size(cast_set(seg_id)), + storage_resource.value()->fs, expiration_time, wait, false, + [tablet, rs, seg_id](Status st) { + VLOG_DEBUG << "warmup rowset " << rs->version() << " segment " + << seg_id << " completed"; + if (tablet->complete_rowset_segment_warmup( + WarmUpTriggerSource::JOB, rs->rowset_id(), st, + 1, 0) + .trigger_source == WarmUpTriggerSource::JOB) { + VLOG_DEBUG << "warmup rowset " << rs->version() + << " completed"; + } + }); + } // 2nd. download inverted index files int64_t file_size = -1; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5f78c7d9294a30..6d850e1afa399c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1167,6 +1167,8 @@ DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000"); DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000"); DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false"); +DEFINE_mBool(file_cache_enable_only_warm_up_idx, "false"); + DEFINE_Int32(file_cache_downloader_thread_num_min, "32"); DEFINE_Int32(file_cache_downloader_thread_num_max, "32"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c9d9fe94ffbdca..56ba11e068e67a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1209,6 +1209,8 @@ DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num); DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms); DECLARE_mBool(enable_evaluate_shadow_queue_diff); +DECLARE_mBool(file_cache_enable_only_warm_up_idx); + // inverted index searcher cache // cache entry stay time after lookup DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s); diff --git a/regression-test/suites/cloud_p0/cache/warm_up/test_warmup_table_docker.groovy b/regression-test/suites/cloud_p0/cache/warm_up/test_warmup_table_docker.groovy new file mode 100644 index 00000000000000..1ac63a63c26f50 --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/warm_up/test_warmup_table_docker.groovy @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import groovy.json.JsonSlurper + +suite('test_warmup_table_docker', 'docker') { + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + ] + options.beConfigs += [ + 'file_cache_enter_disk_resource_limit_mode_percent=99', + 'enable_evict_file_cache_in_advance=false', + 'enable_only_warm_up_idx=true', + ] + options.cloudMode = true + options.beNum = 1 + options.feNum = 1 + + def testTable = "test_warmup_table" + + def clearFileCache = {ip, port -> + def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true" + def response = new URL(url).text + def json = new JsonSlurper().parseText(response) + + // Check the status + if (json.status != "OK") { + throw new RuntimeException("Clear cache on ${ip}:${port} failed: ${json.status}") + } + } + + def clearFileCacheOnAllBackends = { + def backends = sql """SHOW BACKENDS""" + + for (be in backends) { + def ip = be[1] + def port = be[4] + clearFileCache(ip, port) + } + + // clear file cache is async, wait it done + sleep(5000) + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + return 0 + } + } + + def updateBeConf = {cluster, key, value -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + for (be in cluster_bes) { + def ip = be[1] + def port = be[4] + def (code, out, err) = update_be_config(ip, port, key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + docker(options) { + def clusterName = "warmup_cluster" + + // Add one cluster + cluster.addBackend(1, clusterName) + + // Ensure we are in the cluster + sql """use @${clusterName}""" + + try { + sql "set global enable_audit_plugin = false" + } catch (Exception e) { + logger.info("set global enable_audit_plugin = false failed: " + e.getMessage()) + } + + sql """ DROP TABLE IF EXISTS ${testTable} """ + sql """ CREATE TABLE IF NOT EXISTS ${testTable} ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `v3` int(11) NULL, + `text` text NULL, + INDEX idx_text (`text`) USING INVERTED PROPERTIES("parser" = "english") + ) unique KEY(`k1`, `k2`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + // Load data + int dataCount = 4000000 + streamLoad { + table "${testTable}" + set 'column_separator', ',' + inputIterator (new Iterator() { + int current = 0 + @Override + boolean hasNext() { + return current < dataCount + } + @Override + String next() { + current++ + if (current % 2 == 0) { + return "${current},${current},${current},hello doris ${current}" + } else { + return "${current},${current},${current},hello world ${current}" + } + } + }) + + check { res, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(res) + assertEquals("success", json.Status.toLowerCase()) + } + } + sql "sync" + + def backends = sql """SHOW BACKENDS""" + def ip = backends[0][1] + def brpcPort = backends[0][5] + + sleep(3000) + def cache_size_after_load = getBrpcMetrics(ip, brpcPort, "cache_cache_size") + + // Clear file cache to ensure warm up actually does something + clearFileCacheOnAllBackends() + + sleep(3000) + def cache_size_after_clear = getBrpcMetrics(ip, brpcPort, "cache_cache_size") + assertEquals(cache_size_after_clear, 0) + + // Set enable_only_warm_up_idx = true + updateBeConf(clusterName, "enable_only_warm_up_idx", "true") + + // Trigger warm up + def jobId = sql "WARM UP CLUSTER ${clusterName} WITH TABLE ${testTable}" + assertNotNull(jobId) + def id = jobId[0][0] + + // Wait for warm up job to finish + def waitJobFinished = { job_id -> + for (int i = 0; i < 60; i++) { + def result = sql "SHOW WARM UP JOB WHERE ID = ${job_id}" + if (result.size() > 0) { + def status = result[0][3] + logger.info("Warm up job ${job_id} status: ${status}") + if (status == "FINISHED") { + return true + } else if (status == "CANCELLED") { + throw new RuntimeException("Warm up job ${job_id} cancelled") + } + } + sleep(1000) + } + return false + } + + assertTrue(waitJobFinished(id), "Warm up job ${id} did not finish in time") + sleep(3000) + def cache_size_after_warm = getBrpcMetrics(ip, brpcPort, "cache_cache_size") + + logger.info("Cache size after load: ${cache_size_after_load}, after clear: ${cache_size_after_clear}, after warm up: ${cache_size_after_warm}") + assertTrue(cache_size_after_warm < cache_size_after_load); + + def s3ReadBefore = getBrpcMetrics(ip, brpcPort, "cached_remote_reader_s3_read") + + // Verify data can be read + def result = sql "SELECT COUNT() FROM ${testTable} WHERE text MATCH_ANY 'doris'" + sleep(3000) + // Get metrics after query + def s3ReadAfter = getBrpcMetrics(ip, brpcPort, "cached_remote_reader_s3_read") + + // Check no cache miss (s3 read count should not increase) + assertEquals(s3ReadBefore, s3ReadAfter) + } +}