Skip to content

Commit 97ea173

Browse files
authored
branch-4.0: [fix](cloud) Fix cloud warm up balance slow scheduling #58962 (#59337)
cherry pick from #58962
1 parent 40d11a8 commit 97ea173

File tree

10 files changed

+485
-153
lines changed

10 files changed

+485
-153
lines changed

be/src/cloud/cloud_backend_service.cpp

Lines changed: 79 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ namespace doris {
3838

3939
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_segment_num(
4040
"file_cache_warm_up_cache_async_submitted_segment_num");
41+
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_task_num(
42+
"file_cache_warm_up_cache_async_submitted_task_num");
43+
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_tablet_num(
44+
"file_cache_warm_up_cache_async_submitted_tablet_num");
4145

4246
CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env)
4347
: BaseBackendService(exec_env), _engine(engine) {}
@@ -169,79 +173,89 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
169173

170174
void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
171175
const TWarmUpCacheAsyncRequest& request) {
172-
std::ostringstream oss;
173-
oss << "[";
174-
for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
175-
if (i > 0) oss << ",";
176-
oss << request.tablet_ids[i];
177-
}
178-
oss << "]";
179-
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port
180-
<< ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str();
176+
// just submit the task to the thread pool, no need to wait for the result
177+
auto do_warm_up = [this, request]() {
178+
std::ostringstream oss;
179+
oss << "[";
180+
for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
181+
if (i > 0) oss << ",";
182+
oss << request.tablet_ids[i];
183+
}
184+
oss << "]";
185+
g_file_cache_warm_up_cache_async_submitted_tablet_num << request.tablet_ids.size();
186+
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":"
187+
<< request.brpc_port << ", tablets num=" << request.tablet_ids.size()
188+
<< ", tablet_ids=" << oss.str();
181189

182-
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
183-
// Record each tablet in manager
184-
for (int64_t tablet_id : request.tablet_ids) {
185-
manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
186-
}
190+
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
191+
// Record each tablet in manager
192+
for (int64_t tablet_id : request.tablet_ids) {
193+
manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
194+
}
187195

188-
std::string host = request.host;
189-
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
190-
if (dns_cache == nullptr) {
191-
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
192-
} else if (!is_valid_ip(request.host)) {
193-
Status status = dns_cache->get(request.host, &host);
194-
if (!status.ok()) {
195-
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
196-
<< status.to_string();
197-
// Remove failed tablets from tracking
198-
manager.remove_balanced_tablets(request.tablet_ids);
196+
std::string host = request.host;
197+
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
198+
if (dns_cache == nullptr) {
199+
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
200+
} else if (!is_valid_ip(request.host)) {
201+
Status status = dns_cache->get(request.host, &host);
202+
if (!status.ok()) {
203+
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
204+
<< status.to_string();
205+
return;
206+
}
207+
}
208+
std::string brpc_addr = get_host_port(host, request.brpc_port);
209+
std::shared_ptr<PBackendService_Stub> brpc_stub =
210+
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
211+
if (!brpc_stub) {
212+
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
199213
return;
200214
}
201-
}
202-
std::string brpc_addr = get_host_port(host, request.brpc_port);
203-
Status st = Status::OK();
204-
TStatus t_status;
205-
std::shared_ptr<PBackendService_Stub> brpc_stub =
206-
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
207-
if (!brpc_stub) {
208-
st = Status::RpcError("Address {} is wrong", brpc_addr);
209-
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
210-
// Remove failed tablets from tracking
211-
manager.remove_balanced_tablets(request.tablet_ids);
212-
return;
213-
}
214-
brpc::Controller cntl;
215-
PGetFileCacheMetaRequest brpc_request;
216-
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
217-
[&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); });
218-
PGetFileCacheMetaResponse brpc_response;
215+
PGetFileCacheMetaRequest brpc_request;
216+
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
217+
[&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); });
219218

220-
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
221-
VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
222-
<< ", response=" << brpc_response.DebugString();
223-
if (!cntl.Failed()) {
224-
g_file_cache_warm_up_cache_async_submitted_segment_num
225-
<< brpc_response.file_cache_block_metas().size();
226-
auto& file_cache_block_metas = *brpc_response.mutable_file_cache_block_metas();
227-
if (!file_cache_block_metas.empty()) {
219+
auto run_rpc = [this, brpc_stub,
220+
brpc_addr](PGetFileCacheMetaRequest request_copy) -> Status {
221+
brpc::Controller cntl;
222+
cntl.set_timeout_ms(20 * 1000); // 20s
223+
PGetFileCacheMetaResponse brpc_response;
224+
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &request_copy, &brpc_response,
225+
nullptr);
226+
if (cntl.Failed()) {
227+
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
228+
<< ", error=" << cntl.ErrorText()
229+
<< ", error code=" << cntl.ErrorCode();
230+
return Status::RpcError("{} isn't connected, error code={}", brpc_addr,
231+
cntl.ErrorCode());
232+
}
233+
VLOG_DEBUG << "warm_up_cache_async: request=" << request_copy.DebugString()
234+
<< ", response=" << brpc_response.DebugString();
235+
g_file_cache_warm_up_cache_async_submitted_segment_num
236+
<< brpc_response.file_cache_block_metas().size();
228237
_engine.file_cache_block_downloader().submit_download_task(
229-
std::move(file_cache_block_metas));
230-
LOG(INFO) << "warm_up_cache_async: successfully submitted download task for tablets="
231-
<< oss.str();
232-
} else {
233-
LOG(INFO) << "warm_up_cache_async: no file cache block meta found, addr=" << brpc_addr;
234-
manager.remove_balanced_tablets(request.tablet_ids);
238+
std::move(*brpc_response.mutable_file_cache_block_metas()));
239+
return Status::OK();
240+
};
241+
242+
Status rpc_status = run_rpc(std::move(brpc_request));
243+
if (!rpc_status.ok()) {
244+
LOG(WARNING) << "warm_up_cache_async: rpc failed for addr=" << brpc_addr
245+
<< ", status=" << rpc_status;
235246
}
236-
} else {
237-
st = Status::RpcError("{} isn't connected", brpc_addr);
238-
// Remove failed tablets from tracking
239-
manager.remove_balanced_tablets(request.tablet_ids);
240-
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
241-
<< ", error=" << cntl.ErrorText();
247+
};
248+
g_file_cache_warm_up_cache_async_submitted_task_num << 1;
249+
Status submit_st = _engine.warmup_cache_async_thread_pool().submit_func(std::move(do_warm_up));
250+
if (!submit_st.ok()) {
251+
LOG(WARNING) << "warm_up_cache_async: fail to submit heavy task to "
252+
"warmup_cache_async_thread_pool, status="
253+
<< submit_st.to_string() << ", execute synchronously";
254+
do_warm_up();
242255
}
243-
st.to_thrift(&t_status);
244-
response.status = t_status;
256+
TStatus t_status;
257+
submit_st.to_thrift(&t_status);
258+
response.status = std::move(t_status);
245259
}
246260

247261
void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,

be/src/cloud/cloud_internal_service.cpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency(
4747
"file_cache_get_by_peer_server_latency");
4848
bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
4949
"file_cache_get_by_peer_read_cache_file_latency");
50+
bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency(
51+
"cloud_internal_service_get_file_cache_meta_by_tablet_id_latency");
5052

5153
CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env)
5254
: PInternalService(exec_env), _engine(engine) {}
@@ -95,13 +97,26 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
9597
LOG_WARNING("try to access tablet file cache meta, but file cache not enabled");
9698
return;
9799
}
98-
LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size();
100+
auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
101+
std::chrono::steady_clock::now().time_since_epoch())
102+
.count();
103+
std::ostringstream tablet_ids_stream;
104+
int count = 0;
105+
for (const auto& tablet_id : request->tablet_ids()) {
106+
tablet_ids_stream << tablet_id << ", ";
107+
count++;
108+
if (count >= 10) {
109+
break;
110+
}
111+
}
112+
LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size()
113+
<< ", first 10 tablet_ids=[ " << tablet_ids_stream.str() << " ]";
99114
for (const auto& tablet_id : request->tablet_ids()) {
100115
auto res = _engine.tablet_mgr().get_tablet(tablet_id);
101116
if (!res.has_value()) {
102117
LOG(ERROR) << "failed to get tablet: " << tablet_id
103118
<< " err msg: " << res.error().msg();
104-
return;
119+
continue;
105120
}
106121
CloudTabletSPtr tablet = std::move(res.value());
107122
auto st = tablet->sync_rowsets();
@@ -166,7 +181,13 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
166181
}
167182
}
168183
}
169-
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
184+
auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
185+
std::chrono::steady_clock::now().time_since_epoch())
186+
.count();
187+
g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency << (end_ts - begin_ts);
188+
LOG(INFO) << "get file cache meta by tablet ids = [ " << tablet_ids_stream.str() << " ] took "
189+
<< end_ts - begin_ts << " us";
190+
VLOG_DEBUG << "get file cache meta by tablet id request=" << request->DebugString()
170191
<< ", response=" << response->DebugString();
171192
}
172193

be/src/cloud/cloud_storage_engine.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,19 @@ Status CloudStorageEngine::open() {
238238
// check cluster id
239239
RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id");
240240

241-
return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
242-
.set_max_threads(config::sync_load_for_tablets_thread)
243-
.set_min_threads(config::sync_load_for_tablets_thread)
244-
.build(&_sync_load_for_tablets_thread_pool);
241+
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
242+
.set_max_threads(config::sync_load_for_tablets_thread)
243+
.set_min_threads(config::sync_load_for_tablets_thread)
244+
.build(&_sync_load_for_tablets_thread_pool),
245+
"fail to build SyncLoadForTabletsThreadPool");
246+
247+
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
248+
.set_max_threads(config::warmup_cache_async_thread)
249+
.set_min_threads(config::warmup_cache_async_thread)
250+
.build(&_warmup_cache_async_thread_pool),
251+
"fail to build WarmupCacheAsyncThreadPool");
252+
253+
return Status::OK();
245254
}
246255

247256
void CloudStorageEngine::stop() {

be/src/cloud/cloud_storage_engine.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ class CloudStorageEngine final : public BaseStorageEngine {
152152
return *_sync_load_for_tablets_thread_pool;
153153
}
154154

155+
ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; }
156+
155157
Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator);
156158

157159
Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms);
@@ -204,6 +206,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
204206
std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
205207
std::unique_ptr<TabletHotspot> _tablet_hotspot;
206208
std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
209+
std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
207210
std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;
208211

209212
// FileSystem with latest shared storage info, new data will be written to this fs.

be/src/cloud/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ DEFINE_mBool(use_public_endpoint_for_error_log, "true");
6969

7070
DEFINE_mInt32(sync_load_for_tablets_thread, "32");
7171

72+
DEFINE_Int32(warmup_cache_async_thread, "16");
73+
7274
DEFINE_mBool(enable_new_tablet_do_compaction, "true");
7375

7476
// Empty rowset compaction strategy configurations

be/src/cloud/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ DECLARE_mBool(use_public_endpoint_for_error_log);
113113
// the theads which sync the datas which loaded in other clusters
114114
DECLARE_mInt32(sync_load_for_tablets_thread);
115115

116+
DECLARE_Int32(warmup_cache_async_thread);
117+
116118
DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);
117119

118120
DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3372,16 +3372,13 @@ public static int metaServiceRpcRetryTimes() {
33723372
@ConfField(mutable = true, masterOnly = true)
33733373
public static double cloud_balance_tablet_percent_per_run = 0.05;
33743374

3375-
@ConfField(mutable = true, masterOnly = true)
3376-
public static int cloud_min_balance_tablet_num_per_run = 2;
3377-
3378-
@ConfField(mutable = true, masterOnly = true, description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
3379-
+ "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
3380-
+ "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
3381-
+ "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
3382-
+ "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
3383-
+ "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
3384-
+ "设置compute group维度的balance类型,compute group维度配置优先级更高",
3375+
@ConfField(mutable = true, masterOnly = true, description = {"指定存算分离模式下所有 Compute group 的扩缩容预热方式。"
3376+
+ "without_warmup: 直接修改 tablet 分片映射,首次读从 S3 拉取,均衡最快但性能波动最大;"
3377+
+ "async_warmup: 异步预热,尽力而为拉取 cache,均衡较快但可能 cache miss;"
3378+
+ "sync_warmup: 同步预热,确保 cache 迁移完成,均衡较慢但无 cache miss;"
3379+
+ "peer_read_async_warmup: 直接修改 tablet 分片映射,首次读从 Peer BE 拉取,均衡最快可能会影响同计算组中其他 BE 性能。"
3380+
+ "注意:此为全局 FE 配置,也可通过 SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
3381+
+ "设置 compute group 维度的 balance 类型,compute group 维度配置优先级更高",
33853382
"Specify the scaling and warming methods for all Compute groups in a cloud mode. "
33863383
+ "without_warmup: Directly modify shard mapping, first read from S3,"
33873384
+ "fastest re-balance but largest fluctuation; "
@@ -3396,6 +3393,20 @@ public static int metaServiceRpcRetryTimes() {
33963393
options = {"without_warmup", "async_warmup", "sync_warmup", "peer_read_async_warmup"})
33973394
public static String cloud_warm_up_for_rebalance_type = "async_warmup";
33983395

3396+
@ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
3397+
+ "同一个host内预热批次的最大tablet个数,默认10", "The max number of tablets per host "
3398+
+ "when batching warm-up requests during cloud tablet rebalancing, default 10"})
3399+
public static int cloud_warm_up_batch_size = 10;
3400+
3401+
@ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
3402+
+ "预热批次最长等待时间,单位毫秒,默认50ms", "Maximum wait time in milliseconds before a "
3403+
+ "pending warm-up batch is flushed, default 50ms"})
3404+
public static int cloud_warm_up_batch_flush_interval_ms = 50;
3405+
3406+
@ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡预热rpc异步线程池大小,默认4",
3407+
"Thread pool size for asynchronous warm-up RPC dispatch during cloud tablet rebalancing, default 4"})
3408+
public static int cloud_warm_up_rpc_async_pool_size = 4;
3409+
33993410
@ConfField(mutable = true, masterOnly = false)
34003411
public static String security_checker_class_name = "";
34013412

0 commit comments

Comments
 (0)