Skip to content

Commit ece2029

Browse files
Merge branch 'master' into fix-lrudump-reset_range
2 parents 0b1c428 + e74ea4b commit ece2029

File tree

266 files changed

+7547
-1642
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

266 files changed

+7547
-1642
lines changed

.github/workflows/build-thirdparty.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ jobs:
142142
name: Build Third Party Libraries (macOS)
143143
needs: changes
144144
if: ${{ needs.changes.outputs.thirdparty_changes == 'true' }}
145-
runs-on: macos-13
145+
runs-on: macos-15
146146
steps:
147147
- name: Checkout ${{ github.ref }}
148148
uses: actions/checkout@v4

be/src/cloud/cloud_backend_service.cpp

Lines changed: 79 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ namespace doris {
3939

4040
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_segment_num(
4141
"file_cache_warm_up_cache_async_submitted_segment_num");
42+
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_task_num(
43+
"file_cache_warm_up_cache_async_submitted_task_num");
44+
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_tablet_num(
45+
"file_cache_warm_up_cache_async_submitted_tablet_num");
4246

4347
CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env)
4448
: BaseBackendService(exec_env), _engine(engine) {}
@@ -170,83 +174,89 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
170174

171175
void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
172176
const TWarmUpCacheAsyncRequest& request) {
173-
std::ostringstream oss;
174-
oss << "[";
175-
for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
176-
if (i > 0) oss << ",";
177-
oss << request.tablet_ids[i];
178-
}
179-
oss << "]";
180-
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port
181-
<< ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str();
177+
// just submit the task to the thread pool, no need to wait for the result
178+
auto do_warm_up = [this, request]() {
179+
std::ostringstream oss;
180+
oss << "[";
181+
for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
182+
if (i > 0) oss << ",";
183+
oss << request.tablet_ids[i];
184+
}
185+
oss << "]";
186+
g_file_cache_warm_up_cache_async_submitted_tablet_num << request.tablet_ids.size();
187+
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":"
188+
<< request.brpc_port << ", tablets num=" << request.tablet_ids.size()
189+
<< ", tablet_ids=" << oss.str();
182190

183-
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
184-
// Record each tablet in manager
185-
for (int64_t tablet_id : request.tablet_ids) {
186-
manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
187-
}
191+
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
192+
// Record each tablet in manager
193+
for (int64_t tablet_id : request.tablet_ids) {
194+
manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
195+
}
188196

189-
std::string host = request.host;
190-
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
191-
if (dns_cache == nullptr) {
192-
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
193-
} else if (!is_valid_ip(request.host)) {
194-
Status status = dns_cache->get(request.host, &host);
195-
if (!status.ok()) {
196-
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
197-
<< status.to_string();
198-
// Remove failed tablets from tracking
199-
manager.remove_balanced_tablets(request.tablet_ids);
197+
std::string host = request.host;
198+
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
199+
if (dns_cache == nullptr) {
200+
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
201+
} else if (!is_valid_ip(request.host)) {
202+
Status status = dns_cache->get(request.host, &host);
203+
if (!status.ok()) {
204+
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
205+
<< status.to_string();
206+
return;
207+
}
208+
}
209+
std::string brpc_addr = get_host_port(host, request.brpc_port);
210+
std::shared_ptr<PBackendService_Stub> brpc_stub =
211+
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
212+
if (!brpc_stub) {
213+
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
200214
return;
201215
}
202-
}
203-
std::string brpc_addr = get_host_port(host, request.brpc_port);
204-
Status st = Status::OK();
205-
TStatus t_status;
206-
std::shared_ptr<PBackendService_Stub> brpc_stub =
207-
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
208-
if (!brpc_stub) {
209-
st = Status::RpcError("Address {} is wrong", brpc_addr);
210-
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
211-
// Remove failed tablets from tracking
212-
manager.remove_balanced_tablets(request.tablet_ids);
213-
return;
214-
}
215-
brpc::Controller cntl;
216-
PGetFileCacheMetaRequest brpc_request;
217-
std::stringstream ss;
218-
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(), [&](int64_t tablet_id) {
219-
brpc_request.add_tablet_ids(tablet_id);
220-
ss << tablet_id << ",";
221-
});
222-
VLOG_DEBUG << "tablets set: " << ss.str() << " stack: " << get_stack_trace();
223-
PGetFileCacheMetaResponse brpc_response;
216+
PGetFileCacheMetaRequest brpc_request;
217+
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
218+
[&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); });
224219

225-
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
226-
VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
227-
<< ", response=" << brpc_response.DebugString();
228-
if (!cntl.Failed()) {
229-
g_file_cache_warm_up_cache_async_submitted_segment_num
230-
<< brpc_response.file_cache_block_metas().size();
231-
auto& file_cache_block_metas = *brpc_response.mutable_file_cache_block_metas();
232-
if (!file_cache_block_metas.empty()) {
220+
auto run_rpc = [this, brpc_stub,
221+
brpc_addr](PGetFileCacheMetaRequest request_copy) -> Status {
222+
brpc::Controller cntl;
223+
cntl.set_timeout_ms(20 * 1000); // 20s
224+
PGetFileCacheMetaResponse brpc_response;
225+
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &request_copy, &brpc_response,
226+
nullptr);
227+
if (cntl.Failed()) {
228+
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
229+
<< ", error=" << cntl.ErrorText()
230+
<< ", error code=" << cntl.ErrorCode();
231+
return Status::RpcError("{} isn't connected, error code={}", brpc_addr,
232+
cntl.ErrorCode());
233+
}
234+
VLOG_DEBUG << "warm_up_cache_async: request=" << request_copy.DebugString()
235+
<< ", response=" << brpc_response.DebugString();
236+
g_file_cache_warm_up_cache_async_submitted_segment_num
237+
<< brpc_response.file_cache_block_metas().size();
233238
_engine.file_cache_block_downloader().submit_download_task(
234-
std::move(file_cache_block_metas));
235-
LOG(INFO) << "warm_up_cache_async: successfully submitted download task for tablets="
236-
<< oss.str();
237-
} else {
238-
LOG(INFO) << "warm_up_cache_async: no file cache block meta found, addr=" << brpc_addr;
239-
manager.remove_balanced_tablets(request.tablet_ids);
239+
std::move(*brpc_response.mutable_file_cache_block_metas()));
240+
return Status::OK();
241+
};
242+
243+
Status rpc_status = run_rpc(std::move(brpc_request));
244+
if (!rpc_status.ok()) {
245+
LOG(WARNING) << "warm_up_cache_async: rpc failed for addr=" << brpc_addr
246+
<< ", status=" << rpc_status;
240247
}
241-
} else {
242-
st = Status::RpcError("{} isn't connected", brpc_addr);
243-
// Remove failed tablets from tracking
244-
manager.remove_balanced_tablets(request.tablet_ids);
245-
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
246-
<< ", error=" << cntl.ErrorText();
248+
};
249+
g_file_cache_warm_up_cache_async_submitted_task_num << 1;
250+
Status submit_st = _engine.warmup_cache_async_thread_pool().submit_func(std::move(do_warm_up));
251+
if (!submit_st.ok()) {
252+
LOG(WARNING) << "warm_up_cache_async: fail to submit heavy task to "
253+
"warmup_cache_async_thread_pool, status="
254+
<< submit_st.to_string() << ", execute synchronously";
255+
do_warm_up();
247256
}
248-
st.to_thrift(&t_status);
249-
response.status = t_status;
257+
TStatus t_status;
258+
submit_st.to_thrift(&t_status);
259+
response.status = std::move(t_status);
250260
}
251261

252262
void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,

be/src/cloud/cloud_internal_service.cpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency(
4747
"file_cache_get_by_peer_server_latency");
4848
bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
4949
"file_cache_get_by_peer_read_cache_file_latency");
50+
bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency(
51+
"cloud_internal_service_get_file_cache_meta_by_tablet_id_latency");
5052

5153
CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env)
5254
: PInternalService(exec_env), _engine(engine) {}
@@ -95,13 +97,26 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
9597
LOG_WARNING("try to access tablet file cache meta, but file cache not enabled");
9698
return;
9799
}
98-
LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size();
100+
auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
101+
std::chrono::steady_clock::now().time_since_epoch())
102+
.count();
103+
std::ostringstream tablet_ids_stream;
104+
int count = 0;
105+
for (const auto& tablet_id : request->tablet_ids()) {
106+
tablet_ids_stream << tablet_id << ", ";
107+
count++;
108+
if (count >= 10) {
109+
break;
110+
}
111+
}
112+
LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size()
113+
<< ", first 10 tablet_ids=[ " << tablet_ids_stream.str() << " ]";
99114
for (const auto& tablet_id : request->tablet_ids()) {
100115
auto res = _engine.tablet_mgr().get_tablet(tablet_id);
101116
if (!res.has_value()) {
102117
LOG(ERROR) << "failed to get tablet: " << tablet_id
103118
<< " err msg: " << res.error().msg();
104-
return;
119+
continue;
105120
}
106121
CloudTabletSPtr tablet = std::move(res.value());
107122
auto st = tablet->sync_rowsets();
@@ -166,7 +181,13 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
166181
}
167182
}
168183
}
169-
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
184+
auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
185+
std::chrono::steady_clock::now().time_since_epoch())
186+
.count();
187+
g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency << (end_ts - begin_ts);
188+
LOG(INFO) << "get file cache meta by tablet ids = [ " << tablet_ids_stream.str() << " ] took "
189+
<< end_ts - begin_ts << " us";
190+
VLOG_DEBUG << "get file cache meta by tablet id request=" << request->DebugString()
170191
<< ", response=" << response->DebugString();
171192
}
172193

be/src/cloud/cloud_storage_engine.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,19 @@ Status CloudStorageEngine::open() {
238238
// check cluster id
239239
RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id");
240240

241-
return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
242-
.set_max_threads(config::sync_load_for_tablets_thread)
243-
.set_min_threads(config::sync_load_for_tablets_thread)
244-
.build(&_sync_load_for_tablets_thread_pool);
241+
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
242+
.set_max_threads(config::sync_load_for_tablets_thread)
243+
.set_min_threads(config::sync_load_for_tablets_thread)
244+
.build(&_sync_load_for_tablets_thread_pool),
245+
"fail to build SyncLoadForTabletsThreadPool");
246+
247+
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
248+
.set_max_threads(config::warmup_cache_async_thread)
249+
.set_min_threads(config::warmup_cache_async_thread)
250+
.build(&_warmup_cache_async_thread_pool),
251+
"fail to build WarmupCacheAsyncThreadPool");
252+
253+
return Status::OK();
245254
}
246255

247256
void CloudStorageEngine::stop() {

be/src/cloud/cloud_storage_engine.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ class CloudStorageEngine final : public BaseStorageEngine {
169169
return *_sync_load_for_tablets_thread_pool;
170170
}
171171

172+
ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; }
173+
172174
Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator);
173175

174176
Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms);
@@ -221,6 +223,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
221223
std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
222224
std::unique_ptr<TabletHotspot> _tablet_hotspot;
223225
std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
226+
std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
224227
std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;
225228

226229
// FileSystem with latest shared storage info, new data will be written to this fs.

be/src/cloud/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ DEFINE_mBool(use_public_endpoint_for_error_log, "true");
7070

7171
DEFINE_mInt32(sync_load_for_tablets_thread, "32");
7272

73+
DEFINE_Int32(warmup_cache_async_thread, "16");
74+
7375
DEFINE_mBool(enable_new_tablet_do_compaction, "true");
7476

7577
// Empty rowset compaction strategy configurations

be/src/cloud/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ DECLARE_mBool(use_public_endpoint_for_error_log);
114114
// the theads which sync the datas which loaded in other clusters
115115
DECLARE_mInt32(sync_load_for_tablets_thread);
116116

117+
DECLARE_Int32(warmup_cache_async_thread);
118+
117119
DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);
118120

119121
DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);

be/src/exec/rowid_fetcher.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,27 +38,22 @@
3838
#include <vector>
3939

4040
#include "bthread/countdown_event.h"
41-
#include "cloud/cloud_storage_engine.h"
42-
#include "cloud/cloud_tablet.h"
43-
#include "cloud/cloud_tablet_mgr.h"
44-
#include "cloud/config.h"
4541
#include "common/config.h"
4642
#include "common/consts.h"
4743
#include "common/exception.h"
4844
#include "common/signal_handler.h"
4945
#include "exec/tablet_info.h" // DorisNodesInfo
5046
#include "olap/olap_common.h"
5147
#include "olap/rowset/beta_rowset.h"
48+
#include "olap/rowset/segment_v2/column_reader.h"
5249
#include "olap/storage_engine.h"
5350
#include "olap/tablet_fwd.h"
54-
#include "olap/tablet_manager.h"
5551
#include "olap/tablet_schema.h"
5652
#include "olap/utils.h"
5753
#include "runtime/descriptors.h"
5854
#include "runtime/exec_env.h" // ExecEnv
5955
#include "runtime/fragment_mgr.h" // FragmentMgr
6056
#include "runtime/runtime_state.h" // RuntimeState
61-
#include "runtime/types.h"
6257
#include "runtime/workload_group/workload_group_manager.h"
6358
#include "semaphore"
6459
#include "util/brpc_client_cache.h" // BrpcClientCache
@@ -69,13 +64,11 @@
6964
#include "vec/common/assert_cast.h"
7065
#include "vec/common/string_ref.h"
7166
#include "vec/core/block.h" // Block
72-
#include "vec/data_types/data_type_factory.hpp"
7367
#include "vec/data_types/data_type_struct.h"
7468
#include "vec/data_types/serde/data_type_serde.h"
7569
#include "vec/exec/format/orc/vorc_reader.h"
7670
#include "vec/exec/format/parquet/vparquet_reader.h"
7771
#include "vec/exec/scan/file_scanner.h"
78-
#include "vec/functions/function_helpers.h"
7972
#include "vec/jsonb/serialize.h"
8073

8174
namespace doris {

be/src/http/action/file_cache_action.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ constexpr static std::string_view BASE_PATH = "base_path";
5858
constexpr static std::string_view RELEASED_ELEMENTS = "released_elements";
5959
constexpr static std::string_view DUMP = "dump";
6060
constexpr static std::string_view VALUE = "value";
61+
constexpr static std::string_view RELOAD = "reload";
6162

6263
Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) {
6364
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
@@ -161,6 +162,41 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
161162
*json_metrics = json.ToString();
162163
}
163164
}
165+
} else if (operation == RELOAD) {
166+
#ifdef BE_TEST
167+
std::string doris_home = getenv("DORIS_HOME");
168+
std::string conffile = std::string(doris_home) + "/conf/be.conf";
169+
if (!doris::config::init(conffile.c_str(), true, true, true)) {
170+
return Status::InternalError("Error reading config file");
171+
}
172+
173+
std::string custom_conffile = doris::config::custom_config_dir + "/be_custom.conf";
174+
if (!doris::config::init(custom_conffile.c_str(), true, false, false)) {
175+
return Status::InternalError("Error reading custom config file");
176+
}
177+
178+
if (!doris::config::enable_file_cache) {
179+
return Status::InternalError("config::enbale_file_cache should be true!");
180+
}
181+
182+
std::unordered_set<std::string> cache_path_set;
183+
std::vector<doris::CachePath> cache_paths;
184+
RETURN_IF_ERROR(doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths));
185+
186+
std::vector<CachePath> cache_paths_no_dup;
187+
cache_paths_no_dup.reserve(cache_paths.size());
188+
for (const auto& cache_path : cache_paths) {
189+
if (cache_path_set.contains(cache_path.path)) {
190+
LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path);
191+
continue;
192+
}
193+
cache_path_set.emplace(cache_path.path);
194+
cache_paths_no_dup.emplace_back(cache_path);
195+
}
196+
RETURN_IF_ERROR(doris::io::FileCacheFactory::instance()->reload_file_cache(cache_paths));
197+
#else
198+
return Status::InternalError("Do not use reload in production environment!!!!");
199+
#endif
164200
} else {
165201
st = Status::InternalError("invalid operation: {}", operation);
166202
}

0 commit comments

Comments
 (0)