Skip to content

Commit 522c7d5

Browse files
dqhl76BohuTANG
andauthored
fix(history-table): stop heartbeat when another node starts (#18664)
* fix: history table should give send heartbeat if other nodes already override * fix: history table should give send heartbeat if other nodes already override * fix: history table should give send heartbeat if other nodes already override * fix: history table should give send heartbeat if other nodes already override * fix: history table should give send heartbeat if other nodes already override * fix: history table should give send heartbeat if other nodes already override * fix: history table should give send heartbeat if other nodes already override * refine error message * refine error message --------- Co-authored-by: Bohu <[email protected]>
1 parent f3dce95 commit 522c7d5

File tree

3 files changed

+168
-57
lines changed

3 files changed

+168
-57
lines changed

src/query/service/src/history_tables/error_handling.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,12 @@ pub fn is_temp_error(e: &ErrorCode) -> bool {
9191
|| code == ErrorCode::INVALID_OPERATION
9292
|| code == ErrorCode::STORAGE_OTHER;
9393

94-
// If acquire semaphore failed, we consider it a temporary error
95-
let meta = code == ErrorCode::META_SERVICE_ERROR;
94+
let meta = code == ErrorCode::META_SERVICE_ERROR
95+
|| code == ErrorCode::DUPLICATED_UPSERT_FILES
96+
|| code == ErrorCode::TABLE_VERSION_MISMATCHED
97+
|| code == ErrorCode::TABLE_LOCK_EXPIRED
98+
|| code == ErrorCode::TABLE_ALREADY_LOCKED;
99+
96100
let transaction = code == ErrorCode::UNRESOLVABLE_CONFLICT;
97101

98102
storage || transaction || meta

src/query/service/src/history_tables/global_history_log.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ use crate::interpreters::InterpreterFactory;
6767
use crate::sessions::BuildInfoRef;
6868
use crate::sessions::QueryContext;
6969

70+
const DEAD_IN_SECS: u64 = 60;
71+
7072
pub struct GlobalHistoryLog {
7173
meta_handle: HistoryMetaHandle,
7274
interval: u64,
@@ -378,7 +380,10 @@ impl GlobalHistoryLog {
378380
loop {
379381
// 1. Acquire heartbeat
380382
let heartbeat_key = format!("{}/{}", meta_key, table.name);
381-
let heartbeat_guard = match self.meta_handle.create_heartbeat_task(&heartbeat_key).await
383+
let heartbeat_guard = match self
384+
.meta_handle
385+
.create_heartbeat_task(&heartbeat_key, DEAD_IN_SECS)
386+
.await
382387
{
383388
Ok(Some(guard)) => guard,
384389
Ok(None) => {
@@ -442,11 +447,33 @@ impl GlobalHistoryLog {
442447
}
443448
sleep(self.transform_sleep_duration()).await;
444449
}
450+
451+
// On error(e.g. DUPLICATED_UPSERT_FILES), verify that our heartbeat is still valid (from this node).
452+
// Purpose: avoid two nodes performing the same work concurrently.
453+
// The periodic heartbeat loop would also detect the conflict, but it runs
454+
// only around every 30 seconds; this check enables faster failover.
455+
if e.code() == ErrorCode::DUPLICATED_UPSERT_FILES
456+
|| e.code() == ErrorCode::TABLE_ALREADY_LOCKED
457+
|| e.code() == ErrorCode::TABLE_LOCK_EXPIRED
458+
|| e.code() == ErrorCode::TABLE_VERSION_MISMATCHED
459+
{
460+
if let Ok(valid) =
461+
self.meta_handle.is_heartbeat_valid(&heartbeat_key).await
462+
{
463+
if !valid {
464+
info!(
465+
"[HISTORY-TABLES] {} heartbeat lost during transform",
466+
table.name
467+
);
468+
break;
469+
}
470+
}
471+
}
445472
}
446473
}
447474
// Release heartbeat periodically to allow other nodes in the cluster
448475
// to take over and ensure even task distribution across the cluster
449-
if transform_cnt % 100 == 0 {
476+
if transform_cnt % 200 == 0 {
450477
break;
451478
}
452479
}

0 commit comments

Comments
 (0)