Skip to content

Commit 5b7acb0

Browse files
authored
[opt](cloud) Exposes cloud balance metrics (apache#57200)
Exposes cloud balance related metrics to show whether the compute group is performing balance scheduling. When `*_balance_num` metrics are all 0, the current compute group is considered to be in a balanced state. Note: These metrics are valid only when requesting the fe master (balance scheduling is performed on the fe master) ``` curl "http://175.42.1.1:8030/metrics" |rg '_balance_num' # HELP doris_fe_cloud_table_balance_num current cluster cloud table balance sync edit log number # TYPE doris_fe_cloud_table_balance_num counter doris_fe_cloud_table_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 5 doris_fe_cloud_table_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 # HELP doris_fe_cloud_partition_balance_num current cluster cloud partition balance sync edit log number # TYPE doris_fe_cloud_partition_balance_num counter doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 0 doris_fe_cloud_partition_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 # HELP doris_fe_cloud_smooth_upgrade_balance_num current cluster cloud smooth upgrade sync edit log number # TYPE doris_fe_cloud_smooth_upgrade_balance_num counter doris_fe_cloud_smooth_upgrade_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 0 doris_fe_cloud_smooth_upgrade_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 # HELP doris_fe_cloud_global_balance_num current cluster cloud be balance sync edit log number # TYPE doris_fe_cloud_global_balance_num counter doris_fe_cloud_global_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 0 doris_fe_cloud_global_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 # HELP doris_fe_cloud_warm_up_balance_num current cluster cloud warm up cache sync edit log number # TYPE doris_fe_cloud_warm_up_balance_num counter doris_fe_cloud_warm_up_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 0 doris_fe_cloud_warm_up_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 ```
1 parent 8a6f0f9 commit 5b7acb0

File tree

4 files changed

+230
-6
lines changed

4 files changed

+230
-6
lines changed

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.doris.common.UserException;
3838
import org.apache.doris.common.util.DebugPointUtil;
3939
import org.apache.doris.common.util.MasterDaemon;
40+
import org.apache.doris.metric.MetricRepo;
4041
import org.apache.doris.rpc.RpcException;
4142
import org.apache.doris.system.Backend;
4243
import org.apache.doris.thrift.BackendService;
@@ -48,6 +49,7 @@
4849
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
4950

5051
import com.google.common.base.Preconditions;
52+
import com.google.common.base.Strings;
5153
import com.google.common.collect.Sets;
5254
import lombok.Getter;
5355
import org.apache.logging.log4j.LogManager;
@@ -125,6 +127,14 @@ public enum BalanceType {
125127
PARTITION
126128
}
127129

130+
public enum StatType {
131+
GLOBAL,
132+
TABLE,
133+
PARTITION,
134+
SMOOTH_UPGRADE,
135+
WARM_UP_CACHE
136+
}
137+
128138
@Getter
129139
private class InfightTablet {
130140
private final Long tabletId;
@@ -320,7 +330,7 @@ public void balanceAllPartitions() {
320330
balanceInPartition(entry.getValue(), entry.getKey(), infos);
321331
}
322332
long oldSize = infos.size();
323-
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
333+
infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.PARTITION);
324334
LOG.info("collect to editlog partitions before size={} after size={} infos", oldSize, infos.size());
325335
try {
326336
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
@@ -356,7 +366,7 @@ public void balanceAllTables() {
356366
balanceInTable(entry.getValue(), entry.getKey(), infos);
357367
}
358368
long oldSize = infos.size();
359-
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
369+
infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.TABLE);
360370
LOG.info("collect to editlog table before size={} after size={} infos", oldSize, infos.size());
361371
try {
362372
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
@@ -391,7 +401,7 @@ public void globalBalance() {
391401
balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL, infos);
392402
}
393403
long oldSize = infos.size();
394-
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
404+
infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.GLOBAL);
395405
LOG.info("collect to editlog global before size={} after size={} infos", oldSize, infos.size());
396406
try {
397407
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
@@ -472,7 +482,7 @@ public void checkInflightWarmUpCacheAsync() {
472482
}
473483
}
474484
long oldSize = infos.size();
475-
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
485+
infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.WARM_UP_CACHE);
476486
LOG.info("collect to editlog warmup before size={} after size={} infos", oldSize, infos.size());
477487
try {
478488
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
@@ -1221,7 +1231,7 @@ private void migrateTablets(Long srcBe, Long dstBe) {
12211231
}
12221232
}
12231233
long oldSize = infos.size();
1224-
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
1234+
infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.SMOOTH_UPGRADE);
12251235
LOG.info("collect to editlog migrate before size={} after size={} infos", oldSize, infos.size());
12261236
try {
12271237
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
@@ -1239,16 +1249,24 @@ private void migrateTablets(Long srcBe, Long dstBe) {
12391249
}
12401250
}
12411251

1242-
private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<UpdateCloudReplicaInfo> infos) {
1252+
private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<UpdateCloudReplicaInfo> infos,
1253+
StatType type) {
12431254
long start = System.currentTimeMillis();
12441255
List<UpdateCloudReplicaInfo> rets = new ArrayList<>();
12451256
// clusterId, infos
12461257
Map<String, List<UpdateCloudReplicaInfo>> clusterIdToInfos = infos.stream()
12471258
.collect(Collectors.groupingBy(UpdateCloudReplicaInfo::getClusterId));
1259+
Set<String> notBalancedClusterIds = new HashSet<>(this.clusterToBes.keySet());
12481260
for (Map.Entry<String, List<UpdateCloudReplicaInfo>> entry : clusterIdToInfos.entrySet()) {
12491261
// same cluster
12501262
String clusterId = entry.getKey();
1263+
notBalancedClusterIds.remove(clusterId);
12511264
List<UpdateCloudReplicaInfo> infoList = entry.getValue();
1265+
String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
1266+
.getClusterNameByClusterId(clusterId);
1267+
if (!Strings.isNullOrEmpty(clusterName)) {
1268+
MetricRepo.updateClusterCloudBalanceNum(clusterName, clusterId, type, infoList.size());
1269+
}
12521270
Map<Long, List<UpdateCloudReplicaInfo>> sameLocationInfos = infoList.stream()
12531271
.collect(Collectors.groupingBy(
12541272
info -> info.getDbId()
@@ -1291,6 +1309,15 @@ private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<Up
12911309
rets.add(newInfo);
12921310
});
12931311
}
1312+
1313+
for (String clusterId : notBalancedClusterIds) {
1314+
String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
1315+
.getClusterNameByClusterId(clusterId);
1316+
if (!Strings.isNullOrEmpty(clusterName)) {
1317+
MetricRepo.updateClusterCloudBalanceNum(clusterName, clusterId, type, 0);
1318+
}
1319+
}
1320+
12941321
if (LOG.isDebugEnabled()) {
12951322
LOG.debug("batchUpdateCloudReplicaInfoEditlogs old size {}, cur size {} cost {} ms",
12961323
infos.size(), rets.size(), System.currentTimeMillis() - start);

fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ public class CloudMetrics {
4545
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_WARM_UP_JOB_LATEST_START_TIME;
4646
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_WARM_UP_JOB_LAST_FINISH_TIME;
4747

48+
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_CLOUD_PARTITION_BALANCE_NUM;
49+
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_CLOUD_TABLE_BALANCE_NUM;
50+
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_CLOUD_GLOBAL_BALANCE_NUM;
51+
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM;
52+
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM;
53+
4854
protected static void init() {
4955
if (Config.isNotCloudMode()) {
5056
return;
@@ -98,5 +104,25 @@ protected static void init() {
98104
CLUSTER_WARM_UP_JOB_FINISHED_TABLETS = new AutoMappedMetric<>(
99105
name -> new LongCounterMetric("file_cache_warm_up_job_finished_tablets",
100106
MetricUnit.NOUNIT, "warm up job finished tablets"));
107+
108+
CLUSTER_CLOUD_PARTITION_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric(
109+
"cloud_partition_balance_num", MetricUnit.NOUNIT,
110+
"current cluster cloud partition balance sync edit log number"));
111+
112+
CLUSTER_CLOUD_TABLE_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric(
113+
"cloud_table_balance_num", MetricUnit.NOUNIT,
114+
"current cluster cloud table balance sync edit log number"));
115+
116+
CLUSTER_CLOUD_GLOBAL_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric(
117+
"cloud_global_balance_num", MetricUnit.NOUNIT,
118+
"current cluster cloud be balance sync edit log number"));
119+
120+
CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric(
121+
"cloud_smooth_upgrade_balance_num", MetricUnit.NOUNIT,
122+
"current cluster cloud smooth upgrade sync edit log number"));
123+
124+
CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric(
125+
"cloud_warm_up_balance_num", MetricUnit.NOUNIT,
126+
"current cluster cloud warm up cache sync edit log number"));
101127
}
102128
}

fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.doris.alter.AlterJobV2.JobType;
2222
import org.apache.doris.catalog.Database;
2323
import org.apache.doris.catalog.Env;
24+
import org.apache.doris.cloud.catalog.CloudTabletRebalancer;
2425
import org.apache.doris.cloud.system.CloudSystemInfoService;
2526
import org.apache.doris.common.Config;
2627
import org.apache.doris.common.InternalErrorCode;
@@ -1319,6 +1320,32 @@ public static void registerCloudMetrics(String clusterId, String clusterName) {
13191320

13201321
String key = clusterId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + clusterName;
13211322
CloudMetrics.CLUSTER_QUERY_LATENCY_HISTO.getOrAdd(key);
1323+
1324+
LongCounterMetric clusterCloudPartitionBalanceNum =
1325+
CloudMetrics.CLUSTER_CLOUD_PARTITION_BALANCE_NUM.getOrAdd(clusterId);
1326+
clusterCloudPartitionBalanceNum.setLabels(labels);
1327+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudPartitionBalanceNum);
1328+
1329+
LongCounterMetric clusterCloudTableBalanceNum =
1330+
CloudMetrics.CLUSTER_CLOUD_TABLE_BALANCE_NUM.getOrAdd(clusterId);
1331+
clusterCloudTableBalanceNum.setLabels(labels);
1332+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudTableBalanceNum);
1333+
1334+
LongCounterMetric clusterCloudGlobalBalanceNum =
1335+
CloudMetrics.CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.getOrAdd(clusterId);
1336+
clusterCloudGlobalBalanceNum.setLabels(labels);
1337+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudGlobalBalanceNum);
1338+
1339+
LongCounterMetric clusterCloudSmoothUpgradeBalanceNum =
1340+
CloudMetrics.CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.getOrAdd(clusterId);
1341+
clusterCloudSmoothUpgradeBalanceNum.setLabels(labels);
1342+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudSmoothUpgradeBalanceNum);
1343+
1344+
LongCounterMetric clusterCloudWarmUpBalanceNum =
1345+
CloudMetrics.CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.getOrAdd(clusterId);
1346+
clusterCloudWarmUpBalanceNum.setLabels(labels);
1347+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudWarmUpBalanceNum);
1348+
13221349
}
13231350

13241351
public static void increaseClusterRequestAll(String clusterName) {
@@ -1532,4 +1559,38 @@ public static void updateClusterQueryLatency(String clusterName, long elapseMs)
15321559
String key = clusterId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + clusterName;
15331560
CloudMetrics.CLUSTER_QUERY_LATENCY_HISTO.getOrAdd(key).update(elapseMs);
15341561
}
1562+
1563+
public static void updateClusterCloudBalanceNum(String clusterName, String clusterId,
1564+
CloudTabletRebalancer.StatType type, long num) {
1565+
if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterName)
1566+
|| Strings.isNullOrEmpty(clusterId)) {
1567+
return;
1568+
}
1569+
LongCounterMetric counter = null;
1570+
switch (type) {
1571+
case PARTITION:
1572+
counter = CloudMetrics.CLUSTER_CLOUD_PARTITION_BALANCE_NUM.getOrAdd(clusterId);
1573+
break;
1574+
case TABLE:
1575+
counter = CloudMetrics.CLUSTER_CLOUD_TABLE_BALANCE_NUM.getOrAdd(clusterId);
1576+
break;
1577+
case GLOBAL:
1578+
counter = CloudMetrics.CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.getOrAdd(clusterId);
1579+
break;
1580+
case SMOOTH_UPGRADE:
1581+
counter = CloudMetrics.CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.getOrAdd(clusterId);
1582+
break;
1583+
case WARM_UP_CACHE:
1584+
counter = CloudMetrics.CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.getOrAdd(clusterId);
1585+
break;
1586+
default:
1587+
return;
1588+
}
1589+
List<MetricLabel> labels = new ArrayList<>();
1590+
counter.update(num);
1591+
labels.add(new MetricLabel("cluster_id", clusterId));
1592+
labels.add(new MetricLabel("cluster_name", clusterName));
1593+
counter.setLabels(labels);
1594+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
1595+
}
15351596
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
20+
21+
suite('test_balance_metrics', 'docker') {
22+
if (!isCloudMode()) {
23+
return;
24+
}
25+
def options = new ClusterOptions()
26+
options.feConfigs += [
27+
'cloud_cluster_check_interval_second=1',
28+
'cloud_tablet_rebalancer_interval_second=2',
29+
'sys_log_verbose_modules=org',
30+
'heartbeat_interval_second=1',
31+
'rehash_tablet_after_be_dead_seconds=3600',
32+
'enable_cloud_warm_up_for_rebalance=false'
33+
]
34+
options.beConfigs += [
35+
'report_tablet_interval_seconds=1',
36+
'schedule_sync_tablets_interval_s=18000',
37+
'disable_auto_compaction=true',
38+
'sys_log_verbose_modules=*'
39+
]
40+
options.setFeNum(1)
41+
options.setBeNum(1)
42+
options.cloudMode = true
43+
options.enableDebugPoints()
44+
45+
def getFEMetrics = {ip, port, name ->
46+
def url = "http://${ip}:${port}/metrics"
47+
logger.info("getFEMetrics1, url: ${url}, name: ${name}")
48+
def metrics = new URL(url).text
49+
def pattern = java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name) + "\\s+(\\d+)")
50+
def matcher = pattern.matcher(metrics)
51+
if (matcher.find()) {
52+
def ret = matcher[0][1] as long
53+
logger.info("getFEMetrics2, ${url}, name:${name}, value:${ret}")
54+
return ret
55+
} else {
56+
throw new RuntimeException("${name} not found for ${ip}:${port}")
57+
}
58+
}
59+
60+
def testCase = { table ->
61+
def master = cluster.getMasterFe()
62+
def allEditlogNum = 0;
63+
def future = thread {
64+
awaitUntil(300) {
65+
def name = """doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"}"""
66+
def value = getFEMetrics(master.host, master.httpPort, name)
67+
allEditlogNum += value
68+
logger.info("balance metrics value: ${value}, allEditlogNum: ${allEditlogNum}")
69+
return value == 0 && allEditlogNum > 0
70+
}
71+
}
72+
sql """CREATE TABLE $table (
73+
`k1` int(11) NULL,
74+
`v1` VARCHAR(2048)
75+
)
76+
DUPLICATE KEY(`k1`)
77+
COMMENT 'OLAP'
78+
DISTRIBUTED BY HASH(`k1`) BUCKETS 200
79+
PROPERTIES (
80+
"replication_num"="1"
81+
);
82+
"""
83+
// generate some balance tasks
84+
cluster.addBackend(1)
85+
future.get()
86+
// wait for rebalancer to do its job
87+
assertTrue(allEditlogNum > 0, "balance metrics not increased")
88+
89+
allEditlogNum = 0
90+
for (i in 0..30) {
91+
sleep(1000)
92+
def name = """doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"}"""
93+
def value = getFEMetrics(master.host, master.httpPort, name)
94+
allEditlogNum += value
95+
logger.info("Final balance metrics value: ${value}, allEditlogNum: ${allEditlogNum}")
96+
}
97+
// after all balance tasks done, the metric should not increase
98+
assertTrue(allEditlogNum == 0, "final balance metrics not increased")
99+
100+
cluster.addBackend(1, "other_cluster")
101+
sleep(5000)
102+
def name = """doris_fe_cloud_partition_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"}"""
103+
def value = getFEMetrics(master.host, master.httpPort, name)
104+
logger.info("other cluster balance metrics value: ${value}")
105+
}
106+
107+
docker(options) {
108+
testCase("test_balance_metrics_tbl")
109+
}
110+
}

0 commit comments

Comments
 (0)