diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5c587f6988e732..b9c5c2d72f5f39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -543,6 +543,12 @@ public void execute(TUniqueId queryId) throws Exception { if (context.getMinidump() != null && context.getMinidump().toString(4) != null) { MinidumpUtils.saveMinidumpString(context.getMinidump(), DebugUtil.printId(context.queryId())); } + // COMPUTE_GROUPS_NO_ALIVE_BE, planner can't get alive be, need retry + if (Config.isCloudMode() && SystemInfoService.needRetryWithReplan(e.getMessage())) { + LOG.debug("planner failed with cloud compute group error, need retry. {}", + context.getQueryIdentifier(), e); + throw new UserException(e.getMessage()); + } LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); context.getState().setError(e.getMessage()); return; @@ -924,10 +930,9 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception { LOG.warn("retry due to exception {}. retried {} times. is rpc error: {}, is user error: {}.", e.getMessage(), i, e instanceof RpcException, e instanceof UserException); - boolean isNeedRetry = false; + boolean isNeedRetry = e instanceof RpcException; if (Config.isCloudMode()) { // cloud mode retry - isNeedRetry = false; // errCode = 2, detailMessage = No backend available as scan node, // please check the status of your backends. [10003: not alive] List bes = Env.getCurrentSystemInfo().getAllBackendIds().stream() @@ -957,8 +962,6 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception { } } } - } else { - isNeedRetry = e instanceof RpcException; } if (i != retryTime - 1 && isNeedRetry && context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index b9c63f813b1ebf..59264ee6bb62d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.cloud.qe.ComputeGroupException; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -84,7 +85,8 @@ public class SystemInfoService { public static final ImmutableSet NEED_REPLAN_ERRORS = ImmutableSet.of( NO_SCAN_NODE_BACKEND_AVAILABLE_MSG, - ERROR_E230 + ERROR_E230, + ComputeGroupException.FailedTypeEnum.COMPUTE_GROUPS_NO_ALIVE_BE.toString() ); protected volatile ImmutableMap idToBackendRef = ImmutableMap.of(); @@ -1158,6 +1160,7 @@ public static boolean needRetryWithReplan(String errorMsg) { return false; } for (String keyword : NEED_REPLAN_ERRORS) { + LOG.debug("key {}, errorMsg {}", keyword, errorMsg); if (errorMsg.contains(keyword)) { return true; } diff --git a/regression-test/suites/cloud_p0/query_retry/test_retry_be_restart.groovy b/regression-test/suites/cloud_p0/query_retry/test_retry_be_restart.groovy new file mode 100644 index 00000000000000..0113f6be877090 --- /dev/null +++ b/regression-test/suites/cloud_p0/query_retry/test_retry_be_restart.groovy @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import org.apache.doris.regression.suite.SuiteCluster + +suite("test_retry_be_restart", "p0, docker") { + if (!isCloudMode()) { + return + } + def options = new ClusterOptions() + options.enableDebugPoints() + options.setFeNum(1) + options.feConfigs.add('max_query_retry_time=100') + options.feConfigs.add('sys_log_verbose_modules=org') + options.setBeNum(1) + options.cloudMode = true + // 1. connect to master + options.connectToFollower = false + + def queryTask = { + for (int i = 0; i < 100; i++) { + try { + log.info("query count: {}", i) + sql """select * from test_be_restart_table""" + Thread.sleep(100) + } catch (Exception e) { + logger.warn("select failed: ${e.message}") + assertFalse(true); + } + } + } + + def pointSelectQueryTask = { + for (int i = 0; i < 100; i++) { + try { + log.info("query count: {}", i) + sql """select * from test_be_restart_table where account=1 and site_code=1""" + Thread.sleep(100) + } catch (Exception e) { + logger.warn("select failed: ${e.message}") + assertFalse(true); + } + } + } + + docker(options) { + def be1 = cluster.getBeByIndex(1) + def beId = be1.backendId; + + sql """ + CREATE TABLE IF NOT EXISTS `test_be_restart_table` + ( + `account` bigint NULL COMMENT '用户ID', + `site_code` int NULL COMMENT '站点代码', + `site_code_str` varchar(64) NOT NULL DEFAULT "" COMMENT 'string类型站点编号,查询返回数据使用', + `register_time` datetime(3) NULL COMMENT '注册时间,tidb中为int,需要转换', + `increment_no` bigint NOT NULL AUTO_INCREMENT(1), + `currency` varchar(65533) NULL COMMENT '币种' + ) + ENGINE=OLAP + UNIQUE KEY(`account`, `site_code`) + PARTITION BY LIST(`site_code`) + ( + PARTITION p_1 VALUES IN (1), + PARTITION p_2 VALUES IN (2) + ) + DISTRIBUTED BY HASH(`account`) BUCKETS 8 + PROPERTIES ( + "binlog.enable" = "true", + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + sql """ + INSERT INTO test_be_restart_table VALUES (1, 1, '1', '2026-01-01 00:00:00', 1, 'USD'); + """ + sql """ + INSERT INTO test_be_restart_table VALUES (2, 2, '2', '2026-01-01 00:00:00', 2, 'EUR'); + """ + sql """ + INSERT INTO test_be_restart_table VALUES (3, 1, '3', '2026-01-01 00:00:00', 3, 'GBP'); + """ + + def result = sql """select account, site_code from test_be_restart_table order by account, site_code;""" + log.info("insert result : {}", result) + assertEquals([[1L, 1], [2L, 2], [3L, 1]], result) + cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null]) + // this should be run at least 10 seconds + def queryThread = Thread.start(queryTask) + def pointSelectQueryThread = Thread.start(pointSelectQueryTask) + sleep(5 * 1000) + cluster.restartBackends() + // query should have no failure + // wait query thread finish + queryThread.join(15000) + pointSelectQueryThread.join(15000) + } +} +