@@ -18,7 +18,6 @@ use std::sync::atomic::AtomicBool;
1818use std:: sync:: Arc ;
1919use std:: time:: Duration ;
2020
21- use databend_common_base:: runtime:: block_on;
2221use databend_common_base:: runtime:: spawn_named;
2322use databend_common_base:: runtime:: CaptureLogSettings ;
2423use databend_common_base:: runtime:: ThreadTracker ;
@@ -42,34 +41,6 @@ use serde::Deserialize;
4241use serde:: Serialize ;
4342use tokio:: sync:: oneshot;
4443
45- /// RAII wrapper for Permit that automatically updates timestamp on drop
46- pub struct PermitGuard {
47- _permit : Permit ,
48- meta_handle : Arc < HistoryMetaHandle > ,
49- meta_key : String ,
50- }
51-
52- impl PermitGuard {
53- pub fn new ( permit : Permit , meta_handle : Arc < HistoryMetaHandle > , meta_key : String ) -> Self {
54- Self {
55- _permit : permit,
56- meta_handle,
57- meta_key,
58- }
59- }
60- }
61-
62- impl Drop for PermitGuard {
63- fn drop ( & mut self ) {
64- let meta_handle = self . meta_handle . clone ( ) ;
65- let meta_key = self . meta_key . clone ( ) ;
66-
67- block_on ( async move {
68- let _ = meta_handle. update_last_execution_timestamp ( & meta_key) . await ;
69- } ) ;
70- }
71- }
72-
7344pub struct HeartbeatTaskGuard {
7445 _cancel : oneshot:: Sender < ( ) > ,
7546 exited : Arc < AtomicBool > ,
@@ -276,38 +247,42 @@ impl HistoryMetaHandle {
276247 }
277248 }
278249
279- /// Acquires a permit with automatic timestamp update on drop using RAII pattern.
280- /// Returns a PermitGuard that will automatically update the timestamp when dropped.
281- pub async fn acquire_with_guard (
250+ /// Checks if enough time has passed since the last execution to perform a clean operation.
251+ /// If enough time has passed, it updates the last execution timestamp atomically.
252+ /// Returns `Ok(true)` if the clean operation should be performed, `Ok(false)` otherwise.
253+ ///
254+ /// Note: This function should only be used for clean operations, cleaning tasks is idempotence
255+ /// and not critical if multiple nodes perform cleaning simultaneously.
256+ pub async fn check_should_perform_clean (
282257 & self ,
283258 meta_key : & str ,
284- interval : u64 ,
285- ) -> Result < Option < PermitGuard > > {
286- if let Some ( permit) = self . acquire ( meta_key, interval) . await ? {
287- Ok ( Some ( PermitGuard :: new (
288- permit,
289- Arc :: new ( HistoryMetaHandle {
290- meta_client : self . meta_client . clone ( ) ,
291- node_id : self . node_id . clone ( ) ,
292- } ) ,
293- meta_key. to_string ( ) ,
294- ) ) )
295- } else {
296- Ok ( None )
259+ interval_secs : u64 ,
260+ ) -> Result < bool > {
261+ let now_ms = chrono:: Utc :: now ( ) . timestamp_millis ( ) as u64 ;
262+ let last_ts_key = format ! ( "{}/last_timestamp" , meta_key) ;
263+
264+ let current = self . meta_client . get_kv ( & last_ts_key) . await ?;
265+
266+ if let Some ( v) = & current {
267+ let last_ts: u64 = serde_json:: from_slice ( & v. data ) ?;
268+ let enough_time =
269+ now_ms - Duration :: from_secs ( interval_secs) . as_millis ( ) as u64 > last_ts;
270+ if !enough_time {
271+ // Not enough time has passed since last execution
272+ return Ok ( false ) ;
273+ }
297274 }
298- }
299275
300- /// Updating the last execution timestamp in the metadata.
301- pub async fn update_last_execution_timestamp ( & self , meta_key : & str ) -> Result < ( ) > {
302- self . meta_client
303- . upsert_kv ( UpsertKV :: new (
304- format ! ( "{}/last_timestamp" , meta_key) ,
305- MatchSeq :: Any ,
306- Operation :: Update ( serde_json:: to_vec ( & chrono:: Utc :: now ( ) . timestamp_millis ( ) ) ?) ,
307- None ,
308- ) )
309- . await ?;
310- Ok ( ( ) )
276+ let last_seq = current. map_or ( 0 , |v| v. seq ) ;
277+
278+ let condition = TxnCondition :: eq_seq ( last_ts_key. clone ( ) , last_seq) ;
279+ let operation = TxnOp :: put ( last_ts_key, serde_json:: to_vec ( & now_ms) ?) ;
280+ let txn_req = TxnRequest :: new ( vec ! [ condition] , vec ! [ operation] ) ;
281+ let resp = self . meta_client . transaction ( txn_req) . await ?;
282+
283+ // we don't retry on failure
284+ // some other node has updated the timestamp and do the clean work
285+ Ok ( resp. success )
311286 }
312287
313288 pub async fn get_u64_from_meta ( & self , meta_key : & str ) -> Result < Option < u64 > > {
@@ -381,98 +356,6 @@ mod tests {
381356 MetaStore :: new_local_testing ( & databend_common_version:: BUILD_INFO ) . await
382357 }
383358
384- #[ tokio:: test( flavor = "multi_thread" ) ]
385- pub async fn test_history_table_permit_guard ( ) -> databend_common_exception:: Result < ( ) > {
386- let meta_store = setup_meta_client ( ) . await ;
387- let meta_client = meta_store. deref ( ) . clone ( ) ;
388-
389- let node_id = "test_node_123" . to_string ( ) ;
390- let meta_handle = HistoryMetaHandle :: new ( meta_client, node_id) ;
391-
392- // Test 1: Basic permit acquisition with interval 0 (no rate limiting)
393- let meta_key = "test/history_table/permit_guard" ;
394- let guard_result = meta_handle. acquire_with_guard ( meta_key, 0 ) . await ?;
395- assert ! (
396- guard_result. is_some( ) ,
397- "Should acquire permit when interval is 0"
398- ) ;
399-
400- if let Some ( guard) = guard_result {
401- // Verify that the guard contains the correct meta key
402- assert_eq ! ( guard. meta_key, meta_key) ;
403- }
404-
405- // Same meta key, because we set the interval to 0, it should not block
406- let guard_result2 = meta_handle. acquire_with_guard ( meta_key, 0 ) . await ?;
407-
408- assert ! (
409- guard_result2. is_some( ) ,
410- "Should acquire permit again when interval is 0"
411- ) ;
412-
413- if let Some ( guard) = guard_result2 {
414- // Verify that the guard contains the correct meta key
415- assert_eq ! ( guard. meta_key, meta_key) ;
416- }
417-
418- // Test 2: Permit acquisition with interval > 0 (rate limiting)
419- let meta_key_rate_limited = "test/history_table/permit_guard_rate_limited" ;
420- let interval_seconds = 2 ;
421-
422- // First acquisition should succeed
423- let first_guard_result = meta_handle
424- . acquire_with_guard ( meta_key_rate_limited, interval_seconds)
425- . await ?;
426- assert ! (
427- first_guard_result. is_some( ) ,
428- "First permit acquisition should succeed"
429- ) ;
430-
431- // Drop the first guard to trigger timestamp update
432- drop ( first_guard_result) ;
433-
434- // Immediate second acquisition should fail due to rate limiting
435- let second_guard_result = meta_handle
436- . acquire_with_guard ( meta_key_rate_limited, interval_seconds)
437- . await ?;
438- assert ! (
439- second_guard_result. is_none( ) ,
440- "Second permit acquisition should fail due to rate limiting"
441- ) ;
442-
443- // Test 3: Verify permit guard automatically updates timestamp on drop
444- let meta_key_timestamp = "test/history_table/permit_guard_timestamp" ;
445-
446- // Get initial timestamp (should be None)
447- let initial_timestamp = meta_handle
448- . get_u64_from_meta ( & format ! ( "{}/last_timestamp" , meta_key_timestamp) )
449- . await ?;
450- assert ! (
451- initial_timestamp. is_none( ) ,
452- "Initial timestamp should be None"
453- ) ;
454-
455- // Acquire permit with guard
456- let guard = meta_handle
457- . acquire_with_guard ( meta_key_timestamp, 0 )
458- . await ?;
459- assert ! ( guard. is_some( ) , "Should acquire permit" ) ;
460-
461- // Drop guard to trigger timestamp update
462- drop ( guard) ;
463-
464- // Verify timestamp was updated
465- let updated_timestamp = meta_handle
466- . get_u64_from_meta ( & format ! ( "{}/last_timestamp" , meta_key_timestamp) )
467- . await ?;
468- assert ! (
469- updated_timestamp. is_some( ) ,
470- "Timestamp should be updated after guard drop"
471- ) ;
472-
473- Ok ( ( ) )
474- }
475-
476359 #[ tokio:: test( flavor = "multi_thread" ) ]
477360 pub async fn test_history_table_heartbeat_basic ( ) -> Result < ( ) > {
478361 let meta_store = setup_meta_client ( ) . await ;
@@ -629,4 +512,53 @@ mod tests {
629512
630513 Ok ( ( ) )
631514 }
515+
516+ #[ tokio:: test( flavor = "multi_thread" ) ]
517+ pub async fn test_check_and_update_last_execution_timestamp ( ) -> Result < ( ) > {
518+ let meta_store = setup_meta_client ( ) . await ;
519+ let meta_client = meta_store. deref ( ) . clone ( ) ;
520+
521+ let meta_handle = HistoryMetaHandle :: new ( meta_client, "test_node" . to_string ( ) ) ;
522+ let meta_key = "test/history_table/check_update" ;
523+ let interval_secs = 5 ;
524+
525+ // First time: key missing, should insert.
526+ let first = meta_handle
527+ . check_should_perform_clean ( meta_key, interval_secs)
528+ . await ?;
529+ assert ! ( first) ;
530+ let ts_after_first = meta_handle
531+ . get_u64_from_meta ( & format ! ( "{}/last_timestamp" , meta_key) )
532+ . await ?
533+ . expect ( "timestamp should exist" ) ;
534+
535+ // Immediately again: should not update.
536+ let second = meta_handle
537+ . check_should_perform_clean ( meta_key, interval_secs)
538+ . await ?;
539+ assert ! ( !second) ;
540+ let ts_after_second = meta_handle
541+ . get_u64_from_meta ( & format ! ( "{}/last_timestamp" , meta_key) )
542+ . await ?
543+ . expect ( "timestamp should still exist" ) ;
544+ assert_eq ! ( ts_after_first, ts_after_second) ;
545+
546+ // Make the stored timestamp old enough, then it should update.
547+ let old_ts = ts_after_first. saturating_sub ( ( interval_secs + 1 ) * 1000 ) ;
548+ meta_handle
549+ . set_u64_to_meta ( & format ! ( "{}/last_timestamp" , meta_key) , old_ts)
550+ . await ?;
551+
552+ let third = meta_handle
553+ . check_should_perform_clean ( meta_key, interval_secs)
554+ . await ?;
555+ assert ! ( third) ;
556+ let ts_after_third = meta_handle
557+ . get_u64_from_meta ( & format ! ( "{}/last_timestamp" , meta_key) )
558+ . await ?
559+ . expect ( "timestamp should exist" ) ;
560+ assert ! ( ts_after_third > ts_after_first) ;
561+
562+ Ok ( ( ) )
563+ }
632564}
0 commit comments