Skip to content

Commit ce4815b

Browse files
branch-4.0: [fix](cloud) Fixed be restart queries not retried #59566 (#59618)
Cherry-picked from #59566 Co-authored-by: deardeng <[email protected]>
1 parent d1ebe95 commit ce4815b

File tree

3 files changed

+125
-5
lines changed

3 files changed

+125
-5
lines changed

fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,12 @@ public void execute(TUniqueId queryId) throws Exception {
543543
if (context.getMinidump() != null && context.getMinidump().toString(4) != null) {
544544
MinidumpUtils.saveMinidumpString(context.getMinidump(), DebugUtil.printId(context.queryId()));
545545
}
546+
// COMPUTE_GROUPS_NO_ALIVE_BE, planner can't get alive be, need retry
547+
if (Config.isCloudMode() && SystemInfoService.needRetryWithReplan(e.getMessage())) {
548+
LOG.debug("planner failed with cloud compute group error, need retry. {}",
549+
context.getQueryIdentifier(), e);
550+
throw new UserException(e.getMessage());
551+
}
546552
LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
547553
context.getState().setError(e.getMessage());
548554
return;
@@ -924,10 +930,9 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
924930
LOG.warn("retry due to exception {}. retried {} times. is rpc error: {}, is user error: {}.",
925931
e.getMessage(), i, e instanceof RpcException, e instanceof UserException);
926932

927-
boolean isNeedRetry = false;
933+
boolean isNeedRetry = e instanceof RpcException;
928934
if (Config.isCloudMode()) {
929935
// cloud mode retry
930-
isNeedRetry = false;
931936
// errCode = 2, detailMessage = No backend available as scan node,
932937
// please check the status of your backends. [10003: not alive]
933938
List<String> bes = Env.getCurrentSystemInfo().getAllBackendIds().stream()
@@ -957,8 +962,6 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
957962
}
958963
}
959964
}
960-
} else {
961-
isNeedRetry = e instanceof RpcException;
962965
}
963966
if (i != retryTime - 1 && isNeedRetry
964967
&& context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) {

fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.doris.catalog.DiskInfo;
2323
import org.apache.doris.catalog.Env;
2424
import org.apache.doris.catalog.ReplicaAllocation;
25+
import org.apache.doris.cloud.qe.ComputeGroupException;
2526
import org.apache.doris.common.AnalysisException;
2627
import org.apache.doris.common.Config;
2728
import org.apache.doris.common.DdlException;
@@ -84,7 +85,8 @@ public class SystemInfoService {
8485

8586
public static final ImmutableSet<String> NEED_REPLAN_ERRORS = ImmutableSet.of(
8687
NO_SCAN_NODE_BACKEND_AVAILABLE_MSG,
87-
ERROR_E230
88+
ERROR_E230,
89+
ComputeGroupException.FailedTypeEnum.COMPUTE_GROUPS_NO_ALIVE_BE.toString()
8890
);
8991

9092
protected volatile ImmutableMap<Long, Backend> idToBackendRef = ImmutableMap.of();
@@ -1158,6 +1160,7 @@ public static boolean needRetryWithReplan(String errorMsg) {
11581160
return false;
11591161
}
11601162
for (String keyword : NEED_REPLAN_ERRORS) {
1163+
LOG.debug("key {}, errorMsg {}", keyword, errorMsg);
11611164
if (errorMsg.contains(keyword)) {
11621165
return true;
11631166
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
import org.apache.doris.regression.suite.ClusterOptions
18+
import org.apache.doris.regression.util.NodeType
19+
import org.apache.doris.regression.suite.SuiteCluster
20+
21+
suite("test_retry_be_restart", "p0, docker") {
22+
if (!isCloudMode()) {
23+
return
24+
}
25+
def options = new ClusterOptions()
26+
options.enableDebugPoints()
27+
options.setFeNum(1)
28+
options.feConfigs.add('max_query_retry_time=100')
29+
options.feConfigs.add('sys_log_verbose_modules=org')
30+
options.setBeNum(1)
31+
options.cloudMode = true
32+
// 1. connect to master
33+
options.connectToFollower = false
34+
35+
def queryTask = {
36+
for (int i = 0; i < 100; i++) {
37+
try {
38+
log.info("query count: {}", i)
39+
sql """select * from test_be_restart_table"""
40+
Thread.sleep(100)
41+
} catch (Exception e) {
42+
logger.warn("select failed: ${e.message}")
43+
assertFalse(true);
44+
}
45+
}
46+
}
47+
48+
def pointSelectQueryTask = {
49+
for (int i = 0; i < 100; i++) {
50+
try {
51+
log.info("query count: {}", i)
52+
sql """select * from test_be_restart_table where account=1 and site_code=1"""
53+
Thread.sleep(100)
54+
} catch (Exception e) {
55+
logger.warn("select failed: ${e.message}")
56+
assertFalse(true);
57+
}
58+
}
59+
}
60+
61+
docker(options) {
62+
def be1 = cluster.getBeByIndex(1)
63+
def beId = be1.backendId;
64+
65+
sql """
66+
CREATE TABLE IF NOT EXISTS `test_be_restart_table`
67+
(
68+
`account` bigint NULL COMMENT '用户ID',
69+
`site_code` int NULL COMMENT '站点代码',
70+
`site_code_str` varchar(64) NOT NULL DEFAULT "" COMMENT 'string类型站点编号,查询返回数据使用',
71+
`register_time` datetime(3) NULL COMMENT '注册时间,tidb中为int,需要转换',
72+
`increment_no` bigint NOT NULL AUTO_INCREMENT(1),
73+
`currency` varchar(65533) NULL COMMENT '币种'
74+
)
75+
ENGINE=OLAP
76+
UNIQUE KEY(`account`, `site_code`)
77+
PARTITION BY LIST(`site_code`)
78+
(
79+
PARTITION p_1 VALUES IN (1),
80+
PARTITION p_2 VALUES IN (2)
81+
)
82+
DISTRIBUTED BY HASH(`account`) BUCKETS 8
83+
PROPERTIES (
84+
"binlog.enable" = "true",
85+
"replication_num" = "1",
86+
"enable_unique_key_merge_on_write" = "true"
87+
);
88+
"""
89+
sql """
90+
INSERT INTO test_be_restart_table VALUES (1, 1, '1', '2026-01-01 00:00:00', 1, 'USD');
91+
"""
92+
sql """
93+
INSERT INTO test_be_restart_table VALUES (2, 2, '2', '2026-01-01 00:00:00', 2, 'EUR');
94+
"""
95+
sql """
96+
INSERT INTO test_be_restart_table VALUES (3, 1, '3', '2026-01-01 00:00:00', 3, 'GBP');
97+
"""
98+
99+
def result = sql """select account, site_code from test_be_restart_table order by account, site_code;"""
100+
log.info("insert result : {}", result)
101+
assertEquals([[1L, 1], [2L, 2], [3L, 1]], result)
102+
cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null])
103+
// this should be run at least 10 seconds
104+
def queryThread = Thread.start(queryTask)
105+
def pointSelectQueryThread = Thread.start(pointSelectQueryTask)
106+
sleep(5 * 1000)
107+
cluster.restartBackends()
108+
// query should have no failure
109+
// wait query thread finish
110+
queryThread.join(15000)
111+
pointSelectQueryThread.join(15000)
112+
}
113+
}
114+

0 commit comments

Comments
 (0)