@@ -176,6 +176,9 @@ pub struct PerspectiveInstance {
176176 immediate_commits_remaining : Arc < Mutex < usize > > ,
177177 subscribed_queries : Arc < Mutex < HashMap < String , SubscribedQuery > > > ,
178178 batch_store : Arc < RwLock < HashMap < String , PerspectiveDiff > > > ,
179+ // Fallback sync tracking for ensure_public_links_are_shared
180+ last_successful_fallback_sync : Arc < Mutex < Option < tokio:: time:: Instant > > > ,
181+ fallback_sync_interval : Arc < Mutex < Duration > > ,
179182}
180183
181184impl PerspectiveInstance {
@@ -196,6 +199,9 @@ impl PerspectiveInstance {
196199 immediate_commits_remaining : Arc :: new ( Mutex :: new ( IMMEDIATE_COMMITS_COUNT ) ) ,
197200 subscribed_queries : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
198201 batch_store : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
202+ // Initialize fallback sync tracking
203+ last_successful_fallback_sync : Arc :: new ( Mutex :: new ( None ) ) ,
204+ fallback_sync_interval : Arc :: new ( Mutex :: new ( Duration :: from_secs ( 30 ) ) ) ,
199205 }
200206 }
201207
@@ -205,7 +211,8 @@ impl PerspectiveInstance {
205211 self . notification_check_loop( ) ,
206212 self . nh_sync_loop( ) ,
207213 self . pending_diffs_loop( ) ,
208- self . subscribed_queries_loop( )
214+ self . subscribed_queries_loop( ) ,
215+ self . fallback_sync_loop( )
209216 ) ;
210217 }
211218
@@ -274,7 +281,12 @@ impl PerspectiveInstance {
274281 let mut link_language_guard = self . link_language . lock ( ) . await ;
275282 if let Some ( link_language) = link_language_guard. as_mut ( ) {
276283 match link_language. sync ( ) . await {
277- Ok ( _) => ( ) ,
284+ Ok ( _) => {
285+ // Transition to Synced state on successful sync
286+ let _ = self
287+ . update_perspective_state ( PerspectiveState :: Synced )
288+ . await ;
289+ }
278290 Err ( e) => {
279291 log:: error!( "Error calling sync on link language: {:?}" , e) ;
280292 let _ = self
@@ -416,7 +428,7 @@ impl PerspectiveInstance {
416428 }
417429 }
418430
419- async fn ensure_public_links_are_shared ( & self ) {
431+ pub async fn ensure_public_links_are_shared ( & self ) -> bool {
420432 let uuid = self . persisted . lock ( ) . await . uuid . clone ( ) ;
421433 let mut link_language_guard = self . link_language . lock ( ) . await ;
422434 if let Some ( link_language) = link_language_guard. as_mut ( ) {
@@ -451,6 +463,7 @@ impl PerspectiveInstance {
451463 }
452464
453465 if !links_to_commit. is_empty ( ) {
466+ let links_count = links_to_commit. len ( ) ;
454467 let result = link_language
455468 . commit ( PerspectiveDiff {
456469 additions : links_to_commit,
@@ -460,11 +473,18 @@ impl PerspectiveInstance {
460473
461474 if let Err ( e) = result {
462475 log:: error!( "Error calling link language's commit in ensure_public_links_are_shared: {:?}" , e) ;
476+ return false ;
463477 }
478+ log:: debug!(
479+ "Successfully committed {} links to link language in fallback sync" ,
480+ links_count
481+ ) ;
464482 }
465483
466484 //Ad4mDb::with_global_instance(|db| db.add_many_links(&self.persisted.lock().await.uuid, &remote_links)).unwrap(); // Assuming add_many_links takes a reference to a Vec<LinkExpression> and returns Result<(), AnyError>
485+ return true ;
467486 }
487+ false
468488 }
469489
470490 pub async fn update_perspective_state ( & self , state : PerspectiveState ) -> Result < ( ) , AnyError > {
@@ -909,6 +929,8 @@ impl PerspectiveInstance {
909929
910930 if status == LinkStatus :: Shared {
911931 self . spawn_commit_and_handle_error ( & diff) ;
932+ // Reset fallback sync interval when new shared links are added
933+ self . reset_fallback_sync_interval ( ) . await ;
912934 }
913935 Ok ( decorated_diff)
914936 }
@@ -2606,6 +2628,87 @@ impl PerspectiveInstance {
26062628 }
26072629 }
26082630
2631+ async fn fallback_sync_loop ( & self ) {
2632+ let uuid = self . persisted . lock ( ) . await . uuid . clone ( ) ;
2633+ log:: debug!( "Starting fallback sync loop for perspective {}" , uuid) ;
2634+
2635+ while !* self . is_teardown . lock ( ) . await {
2636+ // Check if we should run the fallback sync (avoid holding multiple locks)
2637+ let should_run = {
2638+ // Check perspective state first
2639+ let is_synced_neighbourhood = {
2640+ let handle = self . persisted . lock ( ) . await ;
2641+ let result =
2642+ handle. state == PerspectiveState :: Synced && handle. neighbourhood . is_some ( ) ;
2643+ drop ( handle) ; // Release lock immediately
2644+ result
2645+ } ;
2646+
2647+ if !is_synced_neighbourhood {
2648+ false
2649+ } else {
2650+ // Check link language availability
2651+ let link_lang_available = {
2652+ let link_lang = self . link_language . lock ( ) . await ;
2653+ let result = link_lang. is_some ( ) ;
2654+ drop ( link_lang) ; // Release lock immediately
2655+ result
2656+ } ;
2657+
2658+ if !link_lang_available {
2659+ false
2660+ } else {
2661+ // Check timing conditions
2662+ let last_success = * self . last_successful_fallback_sync . lock ( ) . await ;
2663+ let current_interval = * self . fallback_sync_interval . lock ( ) . await ;
2664+
2665+ // Only run if we haven't had a successful sync recently or it's been a while
2666+ last_success. is_none ( ) || last_success. unwrap ( ) . elapsed ( ) > current_interval
2667+ }
2668+ }
2669+ } ;
2670+
2671+ if should_run {
2672+ log:: debug!( "Running fallback sync for perspective {}" , uuid) ;
2673+ let success = self . ensure_public_links_are_shared ( ) . await ;
2674+
2675+ if success {
2676+ // Update last successful sync time and increase interval
2677+ {
2678+ * self . last_successful_fallback_sync . lock ( ) . await =
2679+ Some ( tokio:: time:: Instant :: now ( ) ) ;
2680+ * self . fallback_sync_interval . lock ( ) . await = Duration :: from_secs ( 300 ) ;
2681+ }
2682+ log:: debug!( "Fallback sync successful for perspective {}, increasing interval to 5 minutes" , uuid) ;
2683+ } else {
2684+ // Reset interval to 30 seconds on failure
2685+ * self . fallback_sync_interval . lock ( ) . await = Duration :: from_secs ( 30 ) ;
2686+ log:: warn!(
2687+ "Fallback sync failed for perspective {}, keeping interval at 30 seconds" ,
2688+ uuid
2689+ ) ;
2690+ }
2691+ }
2692+
2693+ // Get fresh interval for sleep (after potential updates)
2694+ let sleep_interval = * self . fallback_sync_interval . lock ( ) . await ;
2695+ sleep ( sleep_interval) . await ;
2696+ }
2697+
2698+ log:: debug!( "Fallback sync loop ended for perspective {}" , uuid) ;
2699+ }
2700+
2701+ /// Reset the fallback sync interval to 30 seconds when new links are added
2702+ /// This ensures that new links get synced quickly
2703+ async fn reset_fallback_sync_interval ( & self ) {
2704+ * self . fallback_sync_interval . lock ( ) . await = Duration :: from_secs ( 30 ) ;
2705+ let uuid = self . persisted . lock ( ) . await . uuid . clone ( ) ;
2706+ log:: debug!(
2707+ "Reset fallback sync interval to 30 seconds for perspective {}" ,
2708+ uuid
2709+ ) ;
2710+ }
2711+
26092712 pub async fn create_batch ( & self ) -> String {
26102713 let batch_uuid = Uuid :: new_v4 ( ) . to_string ( ) ;
26112714 self . batch_store . write ( ) . await . insert (
0 commit comments