Skip to content

Commit 18efd37

Browse files
authored
fix(cluster): skip the recovery for warehouses that are not running state. (#17193)
* fix(cluster): fix recovery suspended warehouse * fix(cluster): fix recovery suspended warehouse
1 parent f715a98 commit 18efd37

File tree

3 files changed

+51
-0
lines changed

3 files changed

+51
-0
lines changed

src/query/management/src/warehouse/warehouse_mgr.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,10 @@ impl WarehouseMgr {
373373
continue;
374374
};
375375

376+
if wh.status.to_uppercase() != "RUNNING" {
377+
continue;
378+
}
379+
376380
for (cluster_id, cluster) in wh.clusters {
377381
let mut lost_nodes = cluster.nodes;
378382

src/query/management/tests/it/warehouse.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,48 @@ async fn test_create_warehouse_with_no_resources() -> Result<()> {
534534
Ok(())
535535
}
536536

537+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
538+
async fn test_recovery_with_suspended_warehouse() -> Result<()> {
539+
let (_, warehouse_manager, nodes) = nodes(Duration::from_mins(30), 2).await?;
540+
541+
let create_warehouse = warehouse_manager.create_warehouse(
542+
String::from("test_warehouse"),
543+
vec![SelectedNode::Random(None); 2],
544+
);
545+
create_warehouse.await?;
546+
547+
let list_warehouse_nodes =
548+
warehouse_manager.list_warehouse_nodes(String::from("test_warehouse"));
549+
550+
assert_eq!(list_warehouse_nodes.await?.len(), 2);
551+
552+
warehouse_manager
553+
.suspend_warehouse(String::from("test_warehouse"))
554+
.await?;
555+
556+
let shutdown_node = warehouse_manager.shutdown_node(nodes[0].clone());
557+
shutdown_node.await?;
558+
559+
let shutdown_node = warehouse_manager.shutdown_node(nodes[1].clone());
560+
shutdown_node.await?;
561+
562+
let list_warehouse_nodes =
563+
warehouse_manager.list_warehouse_nodes(String::from("test_warehouse"));
564+
565+
assert_eq!(list_warehouse_nodes.await?.len(), 0);
566+
567+
let node_1 = GlobalUniqName::unique();
568+
let start_node_1 = warehouse_manager.start_node(system_managed_node(&node_1));
569+
assert!(start_node_1.await.is_ok());
570+
571+
let list_warehouse_nodes =
572+
warehouse_manager.list_warehouse_nodes(String::from("test_warehouse"));
573+
574+
let nodes = list_warehouse_nodes.await?;
575+
assert_eq!(nodes.len(), 0);
576+
Ok(())
577+
}
578+
537579
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
538580
async fn test_recovery_create_warehouse() -> Result<()> {
539581
let (_, warehouse_manager, nodes) = nodes(Duration::from_mins(30), 2).await?;

src/query/service/src/interpreters/interpreter_factory.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ impl InterpreterFactory {
161161
Plan::UnassignWarehouseNodes(v) => Ok(Arc::new(
162162
UnassignWarehouseNodesInterpreter::try_create(ctx.clone(), *v.clone())?,
163163
)),
164+
// We allow the execution of SET statements because this could be SET GLOBAL enterprise_license.
165+
Plan::Set(set_variable) => Ok(Arc::new(SetInterpreter::try_create(
166+
ctx,
167+
*set_variable.clone(),
168+
)?)),
164169
Plan::Query { metadata, .. } => {
165170
let read_guard = metadata.read();
166171
for table in read_guard.tables() {

0 commit comments

Comments
 (0)