Skip to content

Commit 91eb679

Browse files
authored
refactor: add heartbeat to reduce meta requests for history tables (#18594)
* refactor: add heartbeat to reduce meta requests for history tables * refactor: add heartbeat to reduce meta requests for history tables * refactor: refine error handling * refactor: break transform if exit heartbeat loop * fixup * fixup * fix: external tables
1 parent 4db6fc2 commit 91eb679

File tree

6 files changed

+569
-176
lines changed

6 files changed

+569
-176
lines changed

src/query/service/src/history_tables/alter_table.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use databend_common_catalog::table::Table;
2222
use databend_common_catalog::table_context::TableContext;
2323
use databend_common_exception::ErrorCode;
2424
use databend_common_exception::Result;
25+
use databend_common_expression::TableSchemaRef;
2526
use databend_common_sql::binder::parse_uri_location;
2627
use databend_common_sql::plans::Plan;
2728
use databend_common_sql::Planner;
@@ -30,28 +31,39 @@ use log::info;
3031
use crate::history_tables::external::ExternalStorageConnection;
3132
use crate::sessions::QueryContext;
3233

33-
pub async fn get_alter_table_sql(
34+
pub async fn get_schemas(
3435
ctx: Arc<QueryContext>,
3536
new_create_sql: &str,
3637
table_name: &str,
37-
) -> Result<Vec<String>> {
38-
let mut tracking_payload = ThreadTracker::new_tracking_payload();
39-
tracking_payload.capture_log_settings = Some(CaptureLogSettings::capture_off());
40-
let _guard = ThreadTracker::tracking(tracking_payload);
38+
) -> Result<(TableSchemaRef, TableSchemaRef)> {
4139
let old_table_schema = ThreadTracker::tracking_future(ctx.get_table(
4240
CATALOG_DEFAULT,
4341
"system_history",
4442
table_name,
4543
))
4644
.await?
4745
.schema();
48-
let (create_plan, _) = Planner::new(ctx.clone()).plan_sql(new_create_sql).await?;
46+
let mut planner = Planner::new(ctx.clone());
47+
let (create_plan, _) = ThreadTracker::tracking_future(planner.plan_sql(new_create_sql)).await?;
4948
let new_table_schema = match create_plan {
5049
Plan::CreateTable(plan) => plan.schema,
5150
_ => {
5251
unreachable!("logic error: expected CreateTable plan")
5352
}
5453
};
54+
Ok((old_table_schema, new_table_schema))
55+
}
56+
57+
pub async fn get_alter_table_sql(
58+
ctx: Arc<QueryContext>,
59+
new_create_sql: &str,
60+
table_name: &str,
61+
) -> Result<Vec<String>> {
62+
let mut tracking_payload = ThreadTracker::new_tracking_payload();
63+
tracking_payload.capture_log_settings = Some(CaptureLogSettings::capture_off());
64+
let _guard = ThreadTracker::tracking(tracking_payload);
65+
let (old_table_schema, new_table_schema) =
66+
ThreadTracker::tracking_future(get_schemas(ctx, new_create_sql, table_name)).await?;
5567
// The table schema change follow "open-closed principle", only accept adding new fields.
5668
// If the new table schema has less or equal fields than the old one, means older version
5769
// node restarted, we should not alter the table.
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::cmp::min;
16+
17+
use databend_common_exception::ErrorCode;
18+
19+
/// Error counters for tracking persistent and temporary errors during history table operations
20+
#[derive(Debug, Default)]
21+
pub struct ErrorCounters {
22+
persistent: u32,
23+
temporary: u32,
24+
}
25+
26+
impl ErrorCounters {
27+
/// Create a new ErrorCounters instance with zero counts
28+
pub fn new() -> Self {
29+
Self::default()
30+
}
31+
32+
/// Reset both error counters to zero
33+
pub fn reset(&mut self) {
34+
self.persistent = 0;
35+
self.temporary = 0;
36+
}
37+
38+
/// Increment persistent error counter and return the new count
39+
pub fn increment_persistent(&mut self) -> u32 {
40+
self.persistent += 1;
41+
self.persistent
42+
}
43+
44+
/// Increment temporary error counter and return the new count
45+
pub fn increment_temporary(&mut self) -> u32 {
46+
self.temporary += 1;
47+
self.temporary
48+
}
49+
50+
/// Check if persistent error count has exceeded the maximum allowed attempts
51+
pub fn persistent_exceeded_limit(&self) -> bool {
52+
self.persistent > MAX_PERSISTENT_ERROR_ATTEMPTS
53+
}
54+
55+
/// Calculate backoff duration in seconds for temporary errors using exponential backoff
56+
pub fn calculate_temp_backoff(&self) -> u64 {
57+
min(
58+
2u64.saturating_pow(self.temporary),
59+
MAX_TEMP_ERROR_BACKOFF_SECONDS,
60+
)
61+
}
62+
63+
/// Get current persistent error count
64+
pub fn persistent_count(&self) -> u32 {
65+
self.persistent
66+
}
67+
68+
/// Get current temporary error count
69+
pub fn temporary_count(&self) -> u32 {
70+
self.temporary
71+
}
72+
}
73+
74+
/// Maximum number of persistent error attempts before giving up
75+
const MAX_PERSISTENT_ERROR_ATTEMPTS: u32 = 3;
76+
77+
/// Maximum backoff time in seconds for temporary errors (10 minutes)
78+
const MAX_TEMP_ERROR_BACKOFF_SECONDS: u64 = 10 * 60;
79+
80+
/// Check if the error is a temporary error that should be retried
81+
/// We will use this to determine if we should retry the operation.
82+
pub fn is_temp_error(e: &ErrorCode) -> bool {
83+
let code = e.code();
84+
85+
// Storage and I/O errors are considered temporary errors
86+
let storage = code == ErrorCode::STORAGE_NOT_FOUND
87+
|| code == ErrorCode::STORAGE_PERMISSION_DENIED
88+
|| code == ErrorCode::STORAGE_UNAVAILABLE
89+
|| code == ErrorCode::STORAGE_UNSUPPORTED
90+
|| code == ErrorCode::STORAGE_INSECURE
91+
|| code == ErrorCode::INVALID_OPERATION
92+
|| code == ErrorCode::STORAGE_OTHER;
93+
94+
// If acquire semaphore failed, we consider it a temporary error
95+
let meta = code == ErrorCode::META_SERVICE_ERROR;
96+
let transaction = code == ErrorCode::UNRESOLVABLE_CONFLICT;
97+
98+
storage || transaction || meta
99+
}

0 commit comments

Comments
 (0)