Skip to content

Commit 06d5848

Browse files
authored
fix(query): fix the 'not found' error caused by heartbeat loss on the local (#18512)
1 parent 8e2ad36 commit 06d5848

File tree

4 files changed

+61
-1
lines changed

4 files changed

+61
-1
lines changed

โ€Žsrc/query/config/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2031,6 +2031,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
20312031
.collect(),
20322032
resources_management: self.resources_management,
20332033
enable_queries_executor: self.enable_queries_executor,
2034+
check_connection_before_schedule: true,
20342035
})
20352036
}
20362037
}

โ€Žsrc/query/config/src/inner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ pub struct QueryConfig {
254254
pub resources_management: Option<ResourcesManagementConfig>,
255255

256256
pub enable_queries_executor: bool,
257+
pub check_connection_before_schedule: bool,
257258
}
258259

259260
impl Default for QueryConfig {
@@ -343,6 +344,7 @@ impl Default for QueryConfig {
343344
settings: HashMap::new(),
344345
resources_management: None,
345346
enable_queries_executor: false,
347+
check_connection_before_schedule: true,
346348
}
347349
}
348350
}

โ€Žsrc/query/service/src/clusters/cluster.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,14 @@ impl ClusterDiscovery {
294294
Err(cause.add_message_back("(while cluster api get_nodes)."))
295295
}
296296
Ok(cluster_nodes) => {
297+
let mut has_local_node = false;
297298
let mut res = Vec::with_capacity(cluster_nodes.len());
298299
for node in &cluster_nodes {
299-
if node.id != self.local_id {
300+
if node.id == self.local_id {
301+
has_local_node = true;
302+
}
303+
304+
if config.query.check_connection_before_schedule && node.id != self.local_id {
300305
let start_at = Instant::now();
301306
if let Err(cause) = create_client(config, &node.flight_address).await {
302307
warn!(
@@ -313,6 +318,26 @@ impl ClusterDiscovery {
313318
res.push(Arc::new(node.clone()));
314319
}
315320

321+
// When this node loses heartbeat with the meta node but receives an SQL request from the client, we should attempt to use this node.
322+
if !has_local_node && !res.is_empty() {
323+
let mut local_node_info = NodeInfo::create(
324+
config.query.node_id.clone(),
325+
config.query.node_secret.clone(),
326+
format!(
327+
"{}:{}",
328+
config.query.http_handler_host, config.query.http_handler_port
329+
),
330+
config.query.flight_api_address.clone(),
331+
config.query.discovery_address.clone(),
332+
String::new(),
333+
String::new(),
334+
);
335+
336+
local_node_info.cluster_id = res[0].cluster_id.clone();
337+
local_node_info.warehouse_id = res[0].warehouse_id.clone();
338+
res.push(Arc::new(local_node_info));
339+
}
340+
316341
metrics_gauge_discovered_nodes(
317342
&self.local_id,
318343
&self.cluster_id,

โ€Žsrc/query/service/tests/it/clusters.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,38 @@ async fn test_remove_invalid_nodes() -> Result<()> {
8686
Ok(())
8787
}
8888

89+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
90+
async fn test_lost_local_cluster_discovery() -> Result<()> {
91+
let _guard = TestFixture::setup().await?;
92+
93+
let mut config_1 = ConfigBuilder::create()
94+
.query_flight_address("10.0.0.1")
95+
.build();
96+
let mut config_2 = ConfigBuilder::create()
97+
.query_flight_address("10.0.0.2")
98+
.build();
99+
100+
config_1.query.check_connection_before_schedule = false;
101+
config_2.query.check_connection_before_schedule = false;
102+
103+
let metastore = ClusterDiscovery::create_meta_client(&config_1).await?;
104+
let cluster_discovery_1 = ClusterDiscovery::try_create(&config_1, metastore.clone()).await?;
105+
let cluster_discovery_2 = ClusterDiscovery::try_create(&config_2, metastore.clone()).await?;
106+
107+
cluster_discovery_2.register_to_metastore(&config_2).await?;
108+
109+
let cluster_1 = cluster_discovery_1.discover(&config_1).await?;
110+
assert_eq!(cluster_1.get_nodes().len(), 2);
111+
assert_eq!(cluster_1.is_local(&cluster_1.nodes[0]), false);
112+
assert_eq!(cluster_1.is_local(&cluster_1.nodes[1]), true);
113+
114+
let cluster_2 = cluster_discovery_2.discover(&config_2).await?;
115+
assert_eq!(cluster_2.get_nodes().len(), 1);
116+
assert_eq!(cluster_2.is_local(&cluster_2.nodes[0]), true);
117+
118+
Ok(())
119+
}
120+
89121
// TODO:(Winter) need kvapi::KVApi for cluster multiple nodes test
90122
// #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
91123
// async fn test_multiple_cluster_discovery() -> Result<()> {

0 commit comments

Comments
ย (0)