@@ -46,6 +46,7 @@ use futures_util::future::join_all;
46
46
use futures_util:: TryStreamExt ;
47
47
use log:: error;
48
48
use log:: info;
49
+ use log:: warn;
49
50
use opendal:: raw:: normalize_root;
50
51
use parking_lot:: Mutex ;
51
52
use rand:: random;
@@ -161,30 +162,42 @@ impl GlobalHistoryLog {
161
162
let meta_key = format ! ( "{}/history_log_transform" , self . tenant_id) . clone ( ) ;
162
163
let log = GlobalHistoryLog :: instance ( ) ;
163
164
let handle = spawn ( async move {
164
- let mut consecutive_error = 0 ;
165
+ let mut persistent_error_cnt = 0 ;
166
+ let mut temp_error_cnt = 0 ;
165
167
loop {
166
168
match log. transform ( & table_clone, & meta_key) . await {
167
169
Ok ( acquired_lock) => {
168
170
if acquired_lock {
169
- consecutive_error = 0 ;
171
+ persistent_error_cnt = 0 ;
172
+ temp_error_cnt = 0 ;
170
173
}
174
+ sleep ( sleep_time) . await ;
171
175
}
172
176
Err ( e) => {
173
177
error ! (
174
- "[HISTORY-TABLES] {} log transform failed due to {}, retry {}" ,
175
- table_clone. name, e, consecutive_error
178
+ "[HISTORY-TABLES] {} log transform failed with persistent error {}, retry count {}" ,
179
+ table_clone. name, e, persistent_error_cnt
176
180
) ;
177
- consecutive_error += 1 ;
178
- if consecutive_error > 3 {
179
- error ! (
180
- "[HISTORY-TABLES] {} log transform failed too many times, exit" ,
181
- table_clone. name
181
+ if is_temp_error ( & e) {
182
+ let backoff_second = 2u64 . pow ( temp_error_cnt) ;
183
+ temp_error_cnt += 1 ;
184
+ warn ! (
185
+ "[HISTORY-TABLES] {} log transform failed with temporary error {}, next retry in {} seconds" ,
186
+ table_clone. name, e, temp_error_cnt
182
187
) ;
183
- break ;
188
+ sleep ( Duration :: from_secs ( backoff_second) ) . await ;
189
+ } else {
190
+ persistent_error_cnt += 1 ;
191
+ if persistent_error_cnt > 3 {
192
+ error ! (
193
+ "[HISTORY-TABLES] {} log transform failed too many times, giving up" ,
194
+ table_clone. name
195
+ ) ;
196
+ return ;
197
+ }
184
198
}
185
199
}
186
200
}
187
- sleep ( sleep_time) . await ;
188
201
}
189
202
} ) ;
190
203
handles. push ( handle) ;
@@ -433,3 +446,22 @@ pub async fn setup_operator(params: &Option<StorageParams>) -> Result<()> {
433
446
GlobalLogger :: instance ( ) . set_operator ( op) . await ;
434
447
Ok ( ( ) )
435
448
}
449
+
450
+ /// Check if the error is a temporary error,
451
+ /// We will use this to determine if we should retry the operation.
452
+ fn is_temp_error ( e : & ErrorCode ) -> bool {
453
+ let code = e. code ( ) ;
454
+ let message = e. message ( ) ;
455
+ // Storage and I/O errors are considered temporary errors
456
+ let storage = code == ErrorCode :: STORAGE_NOT_FOUND
457
+ || code == ErrorCode :: STORAGE_PERMISSION_DENIED
458
+ || code == ErrorCode :: STORAGE_UNAVAILABLE
459
+ || code == ErrorCode :: STORAGE_UNSUPPORTED
460
+ || code == ErrorCode :: STORAGE_INSECURE
461
+ || code == ErrorCode :: INVALID_OPERATION
462
+ || code == ErrorCode :: STORAGE_OTHER ;
463
+ // If acquire semaphore failed, we consider it a temporary error
464
+ let meta = code == ErrorCode :: INTERNAL && message. contains ( "acquire semaphore failed" ) ;
465
+ let transaction = code == ErrorCode :: UNRESOLVABLE_CONFLICT ;
466
+ storage || transaction || meta
467
+ }
0 commit comments