@@ -16,10 +16,6 @@ use std::{
1616} ;
1717
1818use anyhow:: Context ;
19- use async_channel:: {
20- Receiver ,
21- Sender ,
22- } ;
2319use async_trait:: async_trait;
2420use common:: {
2521 backoff:: Backoff ,
@@ -108,6 +104,11 @@ use futures_async_stream::try_stream;
108104use governor:: Quota ;
109105use parking_lot:: Mutex ;
110106use rand:: Rng ;
107+ use tokio:: sync:: watch:: {
108+ self ,
109+ Receiver ,
110+ Sender ,
111+ } ;
111112use value:: InternalDocumentId ;
112113
113114use crate :: {
@@ -296,18 +297,17 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
296297 )
297298 . await ?;
298299
299- let ( send_min_snapshot, receive_min_snapshot) = async_channel:: bounded ( 1 ) ;
300- let ( send_min_document_snapshot, receive_min_document_snapshot) = async_channel:: bounded ( 1 ) ;
300+ let ( send_min_snapshot, receive_min_snapshot) = watch:: channel ( min_snapshot_ts) ;
301+ let ( send_min_document_snapshot, receive_min_document_snapshot) =
302+ watch:: channel ( min_document_snapshot_ts) ;
301303 let advance_min_snapshot_handle = rt. spawn (
302304 "retention_advance_min_snapshot" ,
303305 Self :: go_advance_min_snapshot (
304306 bounds_writer,
305307 checkpoint_reader. clone ( ) ,
306308 rt. clone ( ) ,
307309 persistence. clone ( ) ,
308- receive_min_snapshot. clone ( ) ,
309310 send_min_snapshot,
310- receive_min_document_snapshot. clone ( ) ,
311311 send_min_document_snapshot,
312312 snapshot_reader. clone ( ) ,
313313 ) ,
@@ -444,20 +444,15 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
444444 }
445445
446446 async fn emit_timestamp (
447- snapshot_rx : Receiver < Timestamp > ,
448- snapshot_sender : Sender < Timestamp > ,
447+ snapshot_sender : & Sender < Timestamp > ,
449448 ts : anyhow:: Result < Option < Timestamp > > ,
450449 ) {
451450 match ts {
452451 Err ( mut err) => {
453452 report_error ( & mut err) ;
454453 } ,
455454 Ok ( Some ( ts) ) => {
456- // Clear out the old value if one is there.
457- let _ = snapshot_rx. try_recv ( ) ;
458- // Send the new one. This will not block because we're the only
459- // producer.
460- if let Err ( err) = snapshot_sender. send ( ts) . await {
455+ if let Err ( err) = snapshot_sender. send ( ts) {
461456 report_error ( & mut err. into ( ) ) ;
462457 }
463458 } ,
@@ -470,9 +465,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
470465 checkpoint_reader : Reader < Checkpoint > ,
471466 rt : RT ,
472467 persistence : Arc < dyn Persistence > ,
473- min_snapshot_rx : Receiver < Timestamp > ,
474468 min_snapshot_sender : Sender < Timestamp > ,
475- min_document_snapshot_rx : Receiver < Timestamp > ,
476469 min_document_snapshot_sender : Sender < Timestamp > ,
477470 snapshot_reader : Reader < SnapshotManager > ,
478471 ) {
@@ -493,12 +486,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
493486 RetentionType :: Index ,
494487 )
495488 . await ;
496- let _ = Self :: emit_timestamp (
497- min_snapshot_rx. clone ( ) ,
498- min_snapshot_sender. clone ( ) ,
499- index_ts,
500- )
501- . await ;
489+ Self :: emit_timestamp ( & min_snapshot_sender, index_ts) . await ;
502490
503491 let document_ts = Self :: advance_timestamp (
504492 & bounds_writer,
@@ -508,12 +496,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
508496 RetentionType :: Document ,
509497 )
510498 . await ;
511- let _ = Self :: emit_timestamp (
512- min_document_snapshot_rx. clone ( ) ,
513- min_document_snapshot_sender. clone ( ) ,
514- document_ts,
515- )
516- . await ;
499+ Self :: emit_timestamp ( & min_document_snapshot_sender, document_ts) . await ;
517500 }
518501 rt. wait ( ADVANCE_RETENTION_TS_FREQUENCY ) . await ;
519502 }
@@ -1025,7 +1008,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
10251008 index_table_id : TableIdAndTableNumber ,
10261009 mut index_cursor : Timestamp ,
10271010 retention_validator : Arc < dyn RetentionValidator > ,
1028- min_snapshot_rx : Receiver < Timestamp > ,
1011+ mut min_snapshot_rx : Receiver < Timestamp > ,
10291012 checkpoint_writer : Writer < Checkpoint > ,
10301013 snapshot_reader : Reader < SnapshotManager > ,
10311014 ) {
@@ -1036,15 +1019,15 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
10361019 let mut is_working = false ;
10371020 loop {
10381021 if !is_working {
1039- min_snapshot_ts = match min_snapshot_rx. recv ( ) . await {
1022+ min_snapshot_ts = match min_snapshot_rx. changed ( ) . await {
10401023 Err ( err) => {
10411024 report_error ( & mut err. into ( ) ) ;
10421025 // Fall back to polling if the channel is closed or falls over. This should
10431026 // really never happen.
10441027 Self :: wait_with_jitter ( & rt, * MAX_RETENTION_DELAY_SECONDS ) . await ;
10451028 bounds_reader. lock ( ) . min_snapshot_ts
10461029 } ,
1047- Ok ( timestamp ) => timestamp ,
1030+ Ok ( ( ) ) => * min_snapshot_rx . borrow_and_update ( ) ,
10481031 } ;
10491032 is_working = true ;
10501033 }
@@ -1138,7 +1121,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
11381121 rt : RT ,
11391122 persistence : Arc < dyn Persistence > ,
11401123 retention_validator : Arc < dyn RetentionValidator > ,
1141- min_document_snapshot_rx : Receiver < Timestamp > ,
1124+ mut min_document_snapshot_rx : Receiver < Timestamp > ,
11421125 checkpoint_writer : Writer < Checkpoint > ,
11431126 snapshot_reader : Reader < SnapshotManager > ,
11441127 ) {
@@ -1158,15 +1141,15 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
11581141
11591142 loop {
11601143 if !is_working {
1161- min_document_snapshot_ts = match min_document_snapshot_rx. recv ( ) . await {
1144+ min_document_snapshot_ts = match min_document_snapshot_rx. changed ( ) . await {
11621145 Err ( err) => {
11631146 tracing:: warn!( "Failed to receive document snapshot: {}" , err) ;
11641147 // Fall back to polling if the channel is closed or falls over. This should
11651148 // really never happen.
11661149 Self :: wait_with_jitter ( & rt, * MAX_RETENTION_DELAY_SECONDS ) . await ;
1167- bounds_reader. lock ( ) . min_document_snapshot_ts
1150+ bounds_reader. lock ( ) . min_snapshot_ts
11681151 } ,
1169- Ok ( timestamp ) => timestamp ,
1152+ Ok ( ( ) ) => * min_document_snapshot_rx . borrow_and_update ( ) ,
11701153 } ;
11711154 is_working = true ;
11721155 }
0 commit comments