diff --git a/src/common/tracing/src/predefined_tables/history_tables.toml b/src/common/tracing/src/predefined_tables/history_tables.toml index b3ffce93700d0..5ebae2e931022 100644 --- a/src/common/tracing/src/predefined_tables/history_tables.toml +++ b/src/common/tracing/src/predefined_tables/history_tables.toml @@ -3,7 +3,7 @@ name = "log_history" target = "" create = "CREATE TABLE IF NOT EXISTS system_history.log_history (timestamp TIMESTAMP NULL, path STRING NULL, target STRING NULL, log_level STRING NULL, cluster_id STRING NULL, node_id STRING NULL, warehouse_id STRING NULL, query_id STRING NULL, message STRING NULL, fields VARIANT NULL, batch_number Int64) CLUSTER BY LINEAR(timestamp, query_id)" -transform = "settings (timezone='Etc/UTC') COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE" +transform = "settings (timezone='Etc/UTC') COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE MAX_FILES = 5000" delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hours(NOW(), {retention_hours})" [[tables]] diff --git a/src/query/service/src/history_tables/global_history_log.rs b/src/query/service/src/history_tables/global_history_log.rs index 5cca20ea55509..6a383a732c096 100644 --- a/src/query/service/src/history_tables/global_history_log.rs +++ b/src/query/service/src/history_tables/global_history_log.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::min; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -46,6 +47,7 @@ use futures_util::future::join_all; use futures_util::TryStreamExt; use log::error; use log::info; +use log::warn; use opendal::raw::normalize_root; use parking_lot::Mutex; use rand::random; @@ -161,30 +163,45 @@ impl GlobalHistoryLog { let meta_key = format!("{}/history_log_transform", self.tenant_id).clone(); let log = GlobalHistoryLog::instance(); let handle = spawn(async move { - let mut consecutive_error = 0; + let mut persistent_error_cnt = 0; + let mut temp_error_cnt = 0; loop { match log.transform(&table_clone, &meta_key).await { Ok(acquired_lock) => { if acquired_lock { - consecutive_error = 0; + persistent_error_cnt = 0; + temp_error_cnt = 0; } + sleep(sleep_time).await; } Err(e) => { - error!( - "[HISTORY-TABLES] {} log transform failed due to {}, retry {}", - table_clone.name, e, consecutive_error - ); - consecutive_error += 1; - if consecutive_error > 3 { + if is_temp_error(&e) { + // If the error is temporary, we will retry with exponential backoff + // The max backoff time is 10 minutes + let backoff_second = min(2u64.pow(temp_error_cnt), 10 * 60); + temp_error_cnt += 1; + warn!( + "[HISTORY-TABLES] {} log transform failed with temporary error {}, count {}, next retry in {} seconds", + table_clone.name, e, temp_error_cnt, backoff_second + ); + sleep(Duration::from_secs(backoff_second)).await; + } else { error!( - "[HISTORY-TABLES] {} log transform failed too many times, exit", - table_clone.name + "[HISTORY-TABLES] {} log transform failed with persistent error {}, retry count {}", + table_clone.name, e, persistent_error_cnt ); - break; + persistent_error_cnt += 1; + if persistent_error_cnt > 3 { + error!( + "[HISTORY-TABLES] {} log transform failed too many times, giving up", + table_clone.name + ); + return; + } + sleep(sleep_time).await; } } } - sleep(sleep_time).await; } }); handles.push(handle); @@ -433,3 +450,22 @@ pub async fn setup_operator(params: &Option) -> Result<()> { GlobalLogger::instance().set_operator(op).await; Ok(()) } + +/// Check if the error is a temporary error, +/// We will use this to determine if we should retry the operation. +fn is_temp_error(e: &ErrorCode) -> bool { + let code = e.code(); + let message = e.message(); + // Storage and I/O errors are considered temporary errors + let storage = code == ErrorCode::STORAGE_NOT_FOUND + || code == ErrorCode::STORAGE_PERMISSION_DENIED + || code == ErrorCode::STORAGE_UNAVAILABLE + || code == ErrorCode::STORAGE_UNSUPPORTED + || code == ErrorCode::STORAGE_INSECURE + || code == ErrorCode::INVALID_OPERATION + || code == ErrorCode::STORAGE_OTHER; + // If acquire semaphore failed, we consider it a temporary error + let meta = code == ErrorCode::INTERNAL && message.contains("acquire semaphore failed"); + let transaction = code == ErrorCode::UNRESOLVABLE_CONFLICT; + storage || transaction || meta +} diff --git a/src/query/service/src/history_tables/meta.rs b/src/query/service/src/history_tables/meta.rs index 78e235d24a9a4..1098676f9fc4c 100644 --- a/src/query/service/src/history_tables/meta.rs +++ b/src/query/service/src/history_tables/meta.rs @@ -90,7 +90,7 @@ impl HistoryMetaHandle { Duration::from_secs(3), )) .await - .map_err(|_e| "acquire semaphore failed from GlobalHistoryLog")?; + .map_err(|e| format!("acquire semaphore failed from GlobalHistoryLog {}", e))?; if interval == 0 { return Ok(Some(acquired_guard)); }