Skip to content

Commit 38860a8

Browse files
deardengYour Name
authored andcommitted
[feature](cloud) Support balance sync warm up (#56164)
1. A new type of balance is supported on the cloud. When balancing, the BE mapping of the tablet service is modified on the FE only after all the file caches of the old BE are migrated to the new BE. 2. Fix dest be not sync rs in time. src have new rs meta, but dest be not sync rs, download_file_cache_block will retrun err
1 parent d8663df commit 38860a8

File tree

25 files changed

+2116
-33
lines changed

25 files changed

+2116
-33
lines changed

be/src/io/cache/block_file_cache_downloader.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
#include <memory>
2929
#include <mutex>
30+
#include <unordered_set>
3031
#include <variant>
3132

3233
#include "cloud/cloud_tablet_mgr.h"
@@ -171,6 +172,7 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTable
171172

172173
void FileCacheBlockDownloader::download_file_cache_block(
173174
const DownloadTask::FileCacheBlockMetaVec& metas) {
175+
std::unordered_set<int64_t> synced_tablets;
174176
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
175177
VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id()
176178
<< ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id()
@@ -183,12 +185,20 @@ void FileCacheBlockDownloader::download_file_cache_block(
183185
} else {
184186
tablet = std::move(res).value();
185187
}
186-
188+
if (!synced_tablets.contains(meta.tablet_id())) {
189+
auto st = tablet->sync_rowsets();
190+
if (!st) {
191+
// just log failed, try it best
192+
LOG(WARNING) << "failed to sync rowsets: " << meta.tablet_id()
193+
<< " err msg: " << st.to_string();
194+
}
195+
synced_tablets.insert(meta.tablet_id());
196+
}
187197
auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
188198
auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
189199
if (find_it == id_to_rowset_meta_map.end()) {
190200
LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id()
191-
<< "rowset_id not found, rowset_id=" << meta.rowset_id();
201+
<< " rowset_id not found, rowset_id=" << meta.rowset_id();
192202
return;
193203
}
194204

cloud/src/meta-service/meta_service_resource.cpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2607,7 +2607,7 @@ void handle_set_cluster_status(const std::string& instance_id, const ClusterInfo
26072607
});
26082608
}
26092609

2610-
void handle_alter_vcluster_Info(const std::string& instance_id, const ClusterInfo& cluster,
2610+
void handle_alter_vcluster_info(const std::string& instance_id, const ClusterInfo& cluster,
26112611
std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
26122612
MetaServiceCode& code) {
26132613
msg = resource_mgr->update_cluster(
@@ -2694,6 +2694,26 @@ void handle_alter_vcluster_Info(const std::string& instance_id, const ClusterInf
26942694
});
26952695
}
26962696

2697+
void handle_alter_properties(const std::string& instance_id, const ClusterInfo& cluster,
2698+
std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
2699+
MetaServiceCode& code) {
2700+
msg = resource_mgr->update_cluster(
2701+
instance_id, cluster,
2702+
[&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
2703+
[&](ClusterPB& c, std::vector<ClusterPB>&) {
2704+
std::string msg;
2705+
std::stringstream ss;
2706+
if (ClusterPB::COMPUTE != c.type()) {
2707+
code = MetaServiceCode::INVALID_ARGUMENT;
2708+
ss << "just support set COMPUTE cluster status";
2709+
msg = ss.str();
2710+
return msg;
2711+
}
2712+
*c.mutable_properties() = cluster.cluster.properties();
2713+
return msg;
2714+
});
2715+
}
2716+
26972717
void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
26982718
const AlterClusterRequest* request,
26992719
AlterClusterResponse* response,
@@ -2778,7 +2798,10 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
27782798
handle_set_cluster_status(instance_id, cluster, resource_mgr(), msg, code);
27792799
break;
27802800
case AlterClusterRequest::ALTER_VCLUSTER_INFO:
2781-
handle_alter_vcluster_Info(instance_id, cluster, resource_mgr(), msg, code);
2801+
handle_alter_vcluster_info(instance_id, cluster, resource_mgr(), msg, code);
2802+
break;
2803+
case AlterClusterRequest::ALTER_PROPERTIES:
2804+
handle_alter_properties(instance_id, cluster, resource_mgr(), msg, code);
27822805
break;
27832806
default:
27842807
code = MetaServiceCode::INVALID_ARGUMENT;

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3333,8 +3333,26 @@ public static int metaServiceRpcRetryTimes() {
33333333
@ConfField(mutable = true, masterOnly = true)
33343334
public static int cloud_min_balance_tablet_num_per_run = 2;
33353335

3336-
@ConfField(mutable = true, masterOnly = true)
3337-
public static boolean enable_cloud_warm_up_for_rebalance = true;
3336+
@ConfField(description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
3337+
+ "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
3338+
+ "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
3339+
+ "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
3340+
+ "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
3341+
+ "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
3342+
+ "设置compute group维度的balance类型,compute group维度配置优先级更高",
3343+
"Specify the scaling and warming methods for all Compute groups in a cloud mode. "
3344+
+ "without_warmup: Directly modify shard mapping, first read from S3,"
3345+
+ "fastest re-balance but largest fluctuation; "
3346+
+ "async_warmup: Asynchronous warmup, best-effort cache pulling, "
3347+
+ "faster re-balance but possible cache miss; "
3348+
+ "sync_warmup: Synchronous warmup, ensure cache migration completion, "
3349+
+ "slower re-balance but no cache miss; "
3350+
+ "peer_read_async_warmup: Directly modify shard mapping, first read from Peer BE, "
3351+
+ "fastest re-balance but may affect other BEs in the same compute group performance. "
3352+
+ "Note: This is a global FE configuration, you can also use SQL (ALTER COMPUTE GROUP cg PROPERTIES) "
3353+
+ "to set balance type at compute group level, compute group level configuration has higher priority"},
3354+
options = {"without_warmup", "async_warmup", "sync_warmup", "peer_read_async_warmup"})
3355+
public static String cloud_warm_up_for_rebalance_type = "async_warmup";
33383356

33393357
@ConfField(mutable = true, masterOnly = false)
33403358
public static String security_checker_class_name = "";

fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,8 @@ supportedAlterStatement
264264
| ALTER STORAGE VAULT name=multipartIdentifier properties=propertyClause #alterStorageVault
265265
| ALTER WORKLOAD GROUP name=identifierOrText (FOR computeGroup=identifierOrText)?
266266
properties=propertyClause? #alterWorkloadGroup
267+
| ALTER COMPUTE GROUP name=identifierOrText
268+
properties=propertyClause? #alterComputeGroup
267269
| ALTER CATALOG name=identifier SET PROPERTIES
268270
LEFT_PAREN propertyItemList RIGHT_PAREN #alterCatalogProperties
269271
| ALTER WORKLOAD POLICY name=identifierOrText
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.cloud.catalog;
19+
20+
import org.apache.doris.common.Config;
21+
22+
import lombok.Getter;
23+
24+
/**
25+
* Enum for balance type options
26+
*/
27+
@Getter
28+
public enum BalanceTypeEnum {
29+
WITHOUT_WARMUP("without_warmup"),
30+
ASYNC_WARMUP("async_warmup"),
31+
SYNC_WARMUP("sync_warmup"),
32+
PEER_READ_ASYNC_WARMUP("peer_read_async_warmup");
33+
34+
private final String value;
35+
36+
BalanceTypeEnum(String value) {
37+
this.value = value;
38+
}
39+
40+
/**
41+
* Parse string value to enum, case-insensitive
42+
*/
43+
public static BalanceTypeEnum fromString(String value) {
44+
if (value == null) {
45+
return null;
46+
}
47+
for (BalanceTypeEnum type : BalanceTypeEnum.values()) {
48+
if (type.value.equalsIgnoreCase(value)) {
49+
return type;
50+
}
51+
}
52+
return null;
53+
}
54+
55+
/**
56+
* Check if the given string is a valid balance type
57+
*/
58+
public static boolean isValid(String value) {
59+
return fromString(value) != null;
60+
}
61+
62+
/**
63+
* Get the balance type enum from the configuration string
64+
*/
65+
public static BalanceTypeEnum getCloudWarmUpForRebalanceTypeEnum() {
66+
return fromString(Config.cloud_warm_up_for_rebalance_type) == null
67+
? ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM : fromString(Config.cloud_warm_up_for_rebalance_type);
68+
}
69+
}

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,63 @@ private void processVirtualClusters(List<Cloud.ClusterPB> clusters) {
9999
List<Cloud.ClusterPB> virtualClusters = new ArrayList<>();
100100
List<Cloud.ClusterPB> computeClusters = new ArrayList<>();
101101
categorizeClusters(clusters, virtualClusters, computeClusters);
102+
handleComputeClusters(computeClusters);
102103
handleVirtualClusters(virtualClusters, computeClusters);
103104
removeObsoleteVirtualGroups(virtualClusters);
104105
}
105106

107+
private void handleComputeClusters(List<Cloud.ClusterPB> computeClusters) {
108+
for (Cloud.ClusterPB computeClusterInMs : computeClusters) {
109+
ComputeGroup computeGroupInFe = cloudSystemInfoService
110+
.getComputeGroupById(computeClusterInMs.getClusterId());
111+
if (computeGroupInFe == null) {
112+
// cluster checker will sync it
113+
LOG.info("found compute cluster {} in ms, but not in fe mem, "
114+
+ "it may be wait cluster checker to sync, ignore it",
115+
computeClusterInMs);
116+
} else {
117+
// exist compute group, check properties changed and update if needed
118+
updatePropertiesIfChanged(computeGroupInFe, computeClusterInMs);
119+
}
120+
}
121+
}
122+
123+
/**
124+
* Compare properties between compute cluster in MS and compute group in FE,
125+
* update only the changed key-value pairs to avoid unnecessary updates.
126+
*/
127+
private void updatePropertiesIfChanged(ComputeGroup computeGroupInFe, Cloud.ClusterPB computeClusterInMs) {
128+
Map<String, String> propertiesInMs = computeClusterInMs.getPropertiesMap();
129+
Map<String, String> propertiesInFe = computeGroupInFe.getProperties();
130+
131+
if (propertiesInMs == null || propertiesInMs.isEmpty()) {
132+
return;
133+
}
134+
Map<String, String> changedProperties = new HashMap<>();
135+
136+
// Check for changed or new properties
137+
for (Map.Entry<String, String> entry : propertiesInMs.entrySet()) {
138+
String key = entry.getKey();
139+
String valueInMs = entry.getValue();
140+
String valueInFe = propertiesInFe.get(key);
141+
142+
if (valueInFe != null && valueInFe.equalsIgnoreCase(valueInMs)) {
143+
continue;
144+
}
145+
changedProperties.put(key, valueInMs);
146+
147+
LOG.debug("Property changed for compute group {}: {} = {} (was: {})",
148+
computeGroupInFe.getName(), key, valueInMs, valueInFe);
149+
}
150+
151+
// Only update if there are actual changes
152+
if (!changedProperties.isEmpty()) {
153+
LOG.info("Updating properties for compute group {}: {}",
154+
computeGroupInFe.getName(), changedProperties);
155+
computeGroupInFe.setProperties(changedProperties);
156+
}
157+
}
158+
106159
private void categorizeClusters(List<Cloud.ClusterPB> clusters,
107160
List<Cloud.ClusterPB> virtualClusters, List<Cloud.ClusterPB> computeClusters) {
108161
for (Cloud.ClusterPB cluster : clusters) {

0 commit comments

Comments
 (0)