Skip to content

Commit 4a23deb

Browse files
authored
fix: improve error handling in history table (#18504)
* fix: improve error handling in history table * refine, add maximum retry interval * fix: missing sleep time * fix: use saturating_pow
1 parent afe673b commit 4a23deb

File tree

3 files changed

+51
-14
lines changed

3 files changed

+51
-14
lines changed

src/common/tracing/src/predefined_tables/history_tables.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
name = "log_history"
44
target = ""
55
create = "CREATE TABLE IF NOT EXISTS system_history.log_history (timestamp TIMESTAMP NULL, path STRING NULL, target STRING NULL, log_level STRING NULL, cluster_id STRING NULL, node_id STRING NULL, warehouse_id STRING NULL, query_id STRING NULL, message STRING NULL, fields VARIANT NULL, batch_number Int64) CLUSTER BY LINEAR(timestamp, query_id)"
6-
transform = "settings (timezone='Etc/UTC') COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE"
6+
transform = "settings (timezone='Etc/UTC') COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE MAX_FILES = 5000"
77
delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hours(NOW(), {retention_hours})"
88

99
[[tables]]

src/query/service/src/history_tables/global_history_log.rs

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::cmp::min;
1516
use std::sync::atomic::AtomicBool;
1617
use std::sync::atomic::Ordering;
1718
use std::sync::Arc;
@@ -46,6 +47,7 @@ use futures_util::future::join_all;
4647
use futures_util::TryStreamExt;
4748
use log::error;
4849
use log::info;
50+
use log::warn;
4951
use opendal::raw::normalize_root;
5052
use parking_lot::Mutex;
5153
use rand::random;
@@ -161,30 +163,46 @@ impl GlobalHistoryLog {
161163
let meta_key = format!("{}/history_log_transform", self.tenant_id).clone();
162164
let log = GlobalHistoryLog::instance();
163165
let handle = spawn(async move {
164-
let mut consecutive_error = 0;
166+
let mut persistent_error_cnt = 0;
167+
let mut temp_error_cnt = 0;
165168
loop {
166169
match log.transform(&table_clone, &meta_key).await {
167170
Ok(acquired_lock) => {
168171
if acquired_lock {
169-
consecutive_error = 0;
172+
persistent_error_cnt = 0;
173+
temp_error_cnt = 0;
170174
}
175+
sleep(sleep_time).await;
171176
}
172177
Err(e) => {
173-
error!(
174-
"[HISTORY-TABLES] {} log transform failed due to {}, retry {}",
175-
table_clone.name, e, consecutive_error
176-
);
177-
consecutive_error += 1;
178-
if consecutive_error > 3 {
178+
if is_temp_error(&e) {
179+
// If the error is temporary, we will retry with exponential backoff
180+
// The max backoff time is 10 minutes
181+
let backoff_second =
182+
min(2u64.saturating_pow(temp_error_cnt), 10 * 60);
183+
temp_error_cnt += 1;
184+
warn!(
185+
"[HISTORY-TABLES] {} log transform failed with temporary error {}, count {}, next retry in {} seconds",
186+
table_clone.name, e, temp_error_cnt, backoff_second
187+
);
188+
sleep(Duration::from_secs(backoff_second)).await;
189+
} else {
179190
error!(
180-
"[HISTORY-TABLES] {} log transform failed too many times, exit",
181-
table_clone.name
191+
"[HISTORY-TABLES] {} log transform failed with persistent error {}, retry count {}",
192+
table_clone.name, e, persistent_error_cnt
182193
);
183-
break;
194+
persistent_error_cnt += 1;
195+
if persistent_error_cnt > 3 {
196+
error!(
197+
"[HISTORY-TABLES] {} log transform failed too many times, giving up",
198+
table_clone.name
199+
);
200+
return;
201+
}
202+
sleep(sleep_time).await;
184203
}
185204
}
186205
}
187-
sleep(sleep_time).await;
188206
}
189207
});
190208
handles.push(handle);
@@ -433,3 +451,22 @@ pub async fn setup_operator(params: &Option<StorageParams>) -> Result<()> {
433451
GlobalLogger::instance().set_operator(op).await;
434452
Ok(())
435453
}
454+
455+
/// Check if the error is a temporary error,
456+
/// We will use this to determine if we should retry the operation.
457+
fn is_temp_error(e: &ErrorCode) -> bool {
458+
let code = e.code();
459+
let message = e.message();
460+
// Storage and I/O errors are considered temporary errors
461+
let storage = code == ErrorCode::STORAGE_NOT_FOUND
462+
|| code == ErrorCode::STORAGE_PERMISSION_DENIED
463+
|| code == ErrorCode::STORAGE_UNAVAILABLE
464+
|| code == ErrorCode::STORAGE_UNSUPPORTED
465+
|| code == ErrorCode::STORAGE_INSECURE
466+
|| code == ErrorCode::INVALID_OPERATION
467+
|| code == ErrorCode::STORAGE_OTHER;
468+
// If acquire semaphore failed, we consider it a temporary error
469+
let meta = code == ErrorCode::INTERNAL && message.contains("acquire semaphore failed");
470+
let transaction = code == ErrorCode::UNRESOLVABLE_CONFLICT;
471+
storage || transaction || meta
472+
}

src/query/service/src/history_tables/meta.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl HistoryMetaHandle {
9090
Duration::from_secs(3),
9191
))
9292
.await
93-
.map_err(|_e| "acquire semaphore failed from GlobalHistoryLog")?;
93+
.map_err(|e| format!("acquire semaphore failed from GlobalHistoryLog {}", e))?;
9494
if interval == 0 {
9595
return Ok(Some(acquired_guard));
9696
}

0 commit comments

Comments
 (0)