Skip to content

Commit dbc4805

Browse files
authored
[fix](cloud) Fix cloud warm up balance slow scheduling (#58962)
Currently, when performing tablet warm-up balancing in the cloud, the sequential execution of a single warm-up task leads to a series of problems, such as: 1. When scaling up a computer group to include beta nodes, with a large number of tables (millions of tablets), actual tests showed that scaling from 1 beta node to 10 beta nodes took more than 6 hours to reach a balanced state. Each warm-up task RPC took about 30ms. This means that even if a new node can handle the load, scaling up a new node in the cloud can still take up to 6 hours in the worst case. 2. Due to the same logic, decomission be is also relatively slow. Fixes: 1. Batch and pipeline warm-up tasks. Each batch can contain multiple warm-up tasks with the same source and destination (each task represents migrating one tablet). 2. Separate the warm-up task finish thread to prevent scheduling logic from affecting the logic that modifies tablet-to-tablet mappings. 3. Asynchronously fetch file cache meta in the warm_up_cache_async logic and add some bvars. Post-fix testing showed that in a scenario with 10 databases, 10,000 tables, 100,000 partitions, and 1 million tablets, the number of be nodes increased from 3 to 10 within 10 minutes.
1 parent ae2ba6f commit dbc4805

File tree

10 files changed

+478
-150
lines changed

10 files changed

+478
-150
lines changed

be/src/cloud/cloud_backend_service.cpp

Lines changed: 79 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ namespace doris {
3939

4040
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_segment_num(
4141
"file_cache_warm_up_cache_async_submitted_segment_num");
42+
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_task_num(
43+
"file_cache_warm_up_cache_async_submitted_task_num");
44+
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_tablet_num(
45+
"file_cache_warm_up_cache_async_submitted_tablet_num");
4246

4347
CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env)
4448
: BaseBackendService(exec_env), _engine(engine) {}
@@ -170,83 +174,89 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
170174

171175
void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
172176
const TWarmUpCacheAsyncRequest& request) {
173-
std::ostringstream oss;
174-
oss << "[";
175-
for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
176-
if (i > 0) oss << ",";
177-
oss << request.tablet_ids[i];
178-
}
179-
oss << "]";
180-
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port
181-
<< ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str();
177+
// just submit the task to the thread pool, no need to wait for the result
178+
auto do_warm_up = [this, request]() {
179+
std::ostringstream oss;
180+
oss << "[";
181+
for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
182+
if (i > 0) oss << ",";
183+
oss << request.tablet_ids[i];
184+
}
185+
oss << "]";
186+
g_file_cache_warm_up_cache_async_submitted_tablet_num << request.tablet_ids.size();
187+
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":"
188+
<< request.brpc_port << ", tablets num=" << request.tablet_ids.size()
189+
<< ", tablet_ids=" << oss.str();
182190

183-
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
184-
// Record each tablet in manager
185-
for (int64_t tablet_id : request.tablet_ids) {
186-
manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
187-
}
191+
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
192+
// Record each tablet in manager
193+
for (int64_t tablet_id : request.tablet_ids) {
194+
manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
195+
}
188196

189-
std::string host = request.host;
190-
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
191-
if (dns_cache == nullptr) {
192-
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
193-
} else if (!is_valid_ip(request.host)) {
194-
Status status = dns_cache->get(request.host, &host);
195-
if (!status.ok()) {
196-
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
197-
<< status.to_string();
198-
// Remove failed tablets from tracking
199-
manager.remove_balanced_tablets(request.tablet_ids);
197+
std::string host = request.host;
198+
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
199+
if (dns_cache == nullptr) {
200+
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
201+
} else if (!is_valid_ip(request.host)) {
202+
Status status = dns_cache->get(request.host, &host);
203+
if (!status.ok()) {
204+
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
205+
<< status.to_string();
206+
return;
207+
}
208+
}
209+
std::string brpc_addr = get_host_port(host, request.brpc_port);
210+
std::shared_ptr<PBackendService_Stub> brpc_stub =
211+
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
212+
if (!brpc_stub) {
213+
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
200214
return;
201215
}
202-
}
203-
std::string brpc_addr = get_host_port(host, request.brpc_port);
204-
Status st = Status::OK();
205-
TStatus t_status;
206-
std::shared_ptr<PBackendService_Stub> brpc_stub =
207-
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
208-
if (!brpc_stub) {
209-
st = Status::RpcError("Address {} is wrong", brpc_addr);
210-
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
211-
// Remove failed tablets from tracking
212-
manager.remove_balanced_tablets(request.tablet_ids);
213-
return;
214-
}
215-
brpc::Controller cntl;
216-
PGetFileCacheMetaRequest brpc_request;
217-
std::stringstream ss;
218-
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(), [&](int64_t tablet_id) {
219-
brpc_request.add_tablet_ids(tablet_id);
220-
ss << tablet_id << ",";
221-
});
222-
VLOG_DEBUG << "tablets set: " << ss.str() << " stack: " << get_stack_trace();
223-
PGetFileCacheMetaResponse brpc_response;
216+
PGetFileCacheMetaRequest brpc_request;
217+
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
218+
[&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); });
224219

225-
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
226-
VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
227-
<< ", response=" << brpc_response.DebugString();
228-
if (!cntl.Failed()) {
229-
g_file_cache_warm_up_cache_async_submitted_segment_num
230-
<< brpc_response.file_cache_block_metas().size();
231-
auto& file_cache_block_metas = *brpc_response.mutable_file_cache_block_metas();
232-
if (!file_cache_block_metas.empty()) {
220+
auto run_rpc = [this, brpc_stub,
221+
brpc_addr](PGetFileCacheMetaRequest request_copy) -> Status {
222+
brpc::Controller cntl;
223+
cntl.set_timeout_ms(20 * 1000); // 20s
224+
PGetFileCacheMetaResponse brpc_response;
225+
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &request_copy, &brpc_response,
226+
nullptr);
227+
if (cntl.Failed()) {
228+
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
229+
<< ", error=" << cntl.ErrorText()
230+
<< ", error code=" << cntl.ErrorCode();
231+
return Status::RpcError("{} isn't connected, error code={}", brpc_addr,
232+
cntl.ErrorCode());
233+
}
234+
VLOG_DEBUG << "warm_up_cache_async: request=" << request_copy.DebugString()
235+
<< ", response=" << brpc_response.DebugString();
236+
g_file_cache_warm_up_cache_async_submitted_segment_num
237+
<< brpc_response.file_cache_block_metas().size();
233238
_engine.file_cache_block_downloader().submit_download_task(
234-
std::move(file_cache_block_metas));
235-
LOG(INFO) << "warm_up_cache_async: successfully submitted download task for tablets="
236-
<< oss.str();
237-
} else {
238-
LOG(INFO) << "warm_up_cache_async: no file cache block meta found, addr=" << brpc_addr;
239-
manager.remove_balanced_tablets(request.tablet_ids);
239+
std::move(*brpc_response.mutable_file_cache_block_metas()));
240+
return Status::OK();
241+
};
242+
243+
Status rpc_status = run_rpc(std::move(brpc_request));
244+
if (!rpc_status.ok()) {
245+
LOG(WARNING) << "warm_up_cache_async: rpc failed for addr=" << brpc_addr
246+
<< ", status=" << rpc_status;
240247
}
241-
} else {
242-
st = Status::RpcError("{} isn't connected", brpc_addr);
243-
// Remove failed tablets from tracking
244-
manager.remove_balanced_tablets(request.tablet_ids);
245-
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
246-
<< ", error=" << cntl.ErrorText();
248+
};
249+
g_file_cache_warm_up_cache_async_submitted_task_num << 1;
250+
Status submit_st = _engine.warmup_cache_async_thread_pool().submit_func(std::move(do_warm_up));
251+
if (!submit_st.ok()) {
252+
LOG(WARNING) << "warm_up_cache_async: fail to submit heavy task to "
253+
"warmup_cache_async_thread_pool, status="
254+
<< submit_st.to_string() << ", execute synchronously";
255+
do_warm_up();
247256
}
248-
st.to_thrift(&t_status);
249-
response.status = t_status;
257+
TStatus t_status;
258+
submit_st.to_thrift(&t_status);
259+
response.status = std::move(t_status);
250260
}
251261

252262
void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,

be/src/cloud/cloud_internal_service.cpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency(
4747
"file_cache_get_by_peer_server_latency");
4848
bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
4949
"file_cache_get_by_peer_read_cache_file_latency");
50+
bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency(
51+
"cloud_internal_service_get_file_cache_meta_by_tablet_id_latency");
5052

5153
CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env)
5254
: PInternalService(exec_env), _engine(engine) {}
@@ -95,13 +97,26 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
9597
LOG_WARNING("try to access tablet file cache meta, but file cache not enabled");
9698
return;
9799
}
98-
LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size();
100+
auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
101+
std::chrono::steady_clock::now().time_since_epoch())
102+
.count();
103+
std::ostringstream tablet_ids_stream;
104+
int count = 0;
105+
for (const auto& tablet_id : request->tablet_ids()) {
106+
tablet_ids_stream << tablet_id << ", ";
107+
count++;
108+
if (count >= 10) {
109+
break;
110+
}
111+
}
112+
LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size()
113+
<< ", first 10 tablet_ids=[ " << tablet_ids_stream.str() << " ]";
99114
for (const auto& tablet_id : request->tablet_ids()) {
100115
auto res = _engine.tablet_mgr().get_tablet(tablet_id);
101116
if (!res.has_value()) {
102117
LOG(ERROR) << "failed to get tablet: " << tablet_id
103118
<< " err msg: " << res.error().msg();
104-
return;
119+
continue;
105120
}
106121
CloudTabletSPtr tablet = std::move(res.value());
107122
auto st = tablet->sync_rowsets();
@@ -166,7 +181,13 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
166181
}
167182
}
168183
}
169-
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
184+
auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
185+
std::chrono::steady_clock::now().time_since_epoch())
186+
.count();
187+
g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency << (end_ts - begin_ts);
188+
LOG(INFO) << "get file cache meta by tablet ids = [ " << tablet_ids_stream.str() << " ] took "
189+
<< end_ts - begin_ts << " us";
190+
VLOG_DEBUG << "get file cache meta by tablet id request=" << request->DebugString()
170191
<< ", response=" << response->DebugString();
171192
}
172193

be/src/cloud/cloud_storage_engine.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,19 @@ Status CloudStorageEngine::open() {
238238
// check cluster id
239239
RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id");
240240

241-
return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
242-
.set_max_threads(config::sync_load_for_tablets_thread)
243-
.set_min_threads(config::sync_load_for_tablets_thread)
244-
.build(&_sync_load_for_tablets_thread_pool);
241+
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
242+
.set_max_threads(config::sync_load_for_tablets_thread)
243+
.set_min_threads(config::sync_load_for_tablets_thread)
244+
.build(&_sync_load_for_tablets_thread_pool),
245+
"fail to build SyncLoadForTabletsThreadPool");
246+
247+
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
248+
.set_max_threads(config::warmup_cache_async_thread)
249+
.set_min_threads(config::warmup_cache_async_thread)
250+
.build(&_warmup_cache_async_thread_pool),
251+
"fail to build WarmupCacheAsyncThreadPool");
252+
253+
return Status::OK();
245254
}
246255

247256
void CloudStorageEngine::stop() {

be/src/cloud/cloud_storage_engine.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ class CloudStorageEngine final : public BaseStorageEngine {
169169
return *_sync_load_for_tablets_thread_pool;
170170
}
171171

172+
ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; }
173+
172174
Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator);
173175

174176
Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms);
@@ -221,6 +223,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
221223
std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
222224
std::unique_ptr<TabletHotspot> _tablet_hotspot;
223225
std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
226+
std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
224227
std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;
225228

226229
// FileSystem with latest shared storage info, new data will be written to this fs.

be/src/cloud/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ DEFINE_mBool(use_public_endpoint_for_error_log, "true");
7070

7171
DEFINE_mInt32(sync_load_for_tablets_thread, "32");
7272

73+
DEFINE_Int32(warmup_cache_async_thread, "16");
74+
7375
DEFINE_mBool(enable_new_tablet_do_compaction, "true");
7476

7577
// Empty rowset compaction strategy configurations

be/src/cloud/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ DECLARE_mBool(use_public_endpoint_for_error_log);
114114
// the theads which sync the datas which loaded in other clusters
115115
DECLARE_mInt32(sync_load_for_tablets_thread);
116116

117+
DECLARE_Int32(warmup_cache_async_thread);
118+
117119
DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);
118120

119121
DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3413,9 +3413,6 @@ public static int metaServiceRpcRetryTimes() {
34133413
@ConfField(mutable = true, masterOnly = true)
34143414
public static double cloud_balance_tablet_percent_per_run = 0.05;
34153415

3416-
@ConfField(mutable = true, masterOnly = true)
3417-
public static int cloud_min_balance_tablet_num_per_run = 2;
3418-
34193416
@ConfField(mutable = true, masterOnly = true, description = {"指定存算分离模式下所有 Compute group 的扩缩容预热方式。"
34203417
+ "without_warmup: 直接修改 tablet 分片映射,首次读从 S3 拉取,均衡最快但性能波动最大;"
34213418
+ "async_warmup: 异步预热,尽力而为拉取 cache,均衡较快但可能 cache miss;"
@@ -3437,6 +3434,20 @@ public static int metaServiceRpcRetryTimes() {
34373434
options = {"without_warmup", "async_warmup", "sync_warmup", "peer_read_async_warmup"})
34383435
public static String cloud_warm_up_for_rebalance_type = "async_warmup";
34393436

3437+
@ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
3438+
+ "同一个host内预热批次的最大tablet个数,默认10", "The max number of tablets per host "
3439+
+ "when batching warm-up requests during cloud tablet rebalancing, default 10"})
3440+
public static int cloud_warm_up_batch_size = 10;
3441+
3442+
@ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
3443+
+ "预热批次最长等待时间,单位毫秒,默认50ms", "Maximum wait time in milliseconds before a "
3444+
+ "pending warm-up batch is flushed, default 50ms"})
3445+
public static int cloud_warm_up_batch_flush_interval_ms = 50;
3446+
3447+
@ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡预热rpc异步线程池大小,默认4",
3448+
"Thread pool size for asynchronous warm-up RPC dispatch during cloud tablet rebalancing, default 4"})
3449+
public static int cloud_warm_up_rpc_async_pool_size = 4;
3450+
34403451
@ConfField(mutable = true, masterOnly = false)
34413452
public static String security_checker_class_name = "";
34423453

0 commit comments

Comments
 (0)