Skip to content

Commit 0cf4b48

Browse files
committed
Merge branch 'dev' into staging
2 parents 636e7aa + ad26898 commit 0cf4b48

File tree

3 files changed

+154
-14
lines changed

3 files changed

+154
-14
lines changed

rust-executor/src/neighbourhoods.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,15 @@ pub async fn neighbourhood_publish_from_perspective(
3535
// Add shared perspective to original perspective and then update controller
3636
perspective_handle.shared_url = Some(neighbourhood_url.clone());
3737
perspective_handle.neighbourhood = Some(neighbourhood_exp);
38-
perspective_handle.state = PerspectiveState::Synced;
38+
perspective_handle.state = PerspectiveState::NeighbourhoodCreationInitiated;
3939
update_perspective(&perspective_handle)
4040
.await
4141
.map_err(|e| anyhow!(e))?;
42+
43+
// Ensure any existing shared links are committed to the link language
44+
// This is critical for early links created before neighbourhood sharing
45+
// We need to do this after the neighbourhood is created but before other agents join
46+
perspective.ensure_public_links_are_shared().await;
4247
Ok(neighbourhood_url)
4348
}
4449

rust-executor/src/perspectives/perspective_instance.rs

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@ pub struct PerspectiveInstance {
176176
immediate_commits_remaining: Arc<Mutex<usize>>,
177177
subscribed_queries: Arc<Mutex<HashMap<String, SubscribedQuery>>>,
178178
batch_store: Arc<RwLock<HashMap<String, PerspectiveDiff>>>,
179+
// Fallback sync tracking for ensure_public_links_are_shared
180+
last_successful_fallback_sync: Arc<Mutex<Option<tokio::time::Instant>>>,
181+
fallback_sync_interval: Arc<Mutex<Duration>>,
179182
}
180183

181184
impl PerspectiveInstance {
@@ -196,6 +199,9 @@ impl PerspectiveInstance {
196199
immediate_commits_remaining: Arc::new(Mutex::new(IMMEDIATE_COMMITS_COUNT)),
197200
subscribed_queries: Arc::new(Mutex::new(HashMap::new())),
198201
batch_store: Arc::new(RwLock::new(HashMap::new())),
202+
// Initialize fallback sync tracking
203+
last_successful_fallback_sync: Arc::new(Mutex::new(None)),
204+
fallback_sync_interval: Arc::new(Mutex::new(Duration::from_secs(30))),
199205
}
200206
}
201207

@@ -205,7 +211,8 @@ impl PerspectiveInstance {
205211
self.notification_check_loop(),
206212
self.nh_sync_loop(),
207213
self.pending_diffs_loop(),
208-
self.subscribed_queries_loop()
214+
self.subscribed_queries_loop(),
215+
self.fallback_sync_loop()
209216
);
210217
}
211218

@@ -274,7 +281,12 @@ impl PerspectiveInstance {
274281
let mut link_language_guard = self.link_language.lock().await;
275282
if let Some(link_language) = link_language_guard.as_mut() {
276283
match link_language.sync().await {
277-
Ok(_) => (),
284+
Ok(_) => {
285+
// Transition to Synced state on successful sync
286+
let _ = self
287+
.update_perspective_state(PerspectiveState::Synced)
288+
.await;
289+
}
278290
Err(e) => {
279291
log::error!("Error calling sync on link language: {:?}", e);
280292
let _ = self
@@ -416,7 +428,7 @@ impl PerspectiveInstance {
416428
}
417429
}
418430

419-
async fn ensure_public_links_are_shared(&self) {
431+
pub async fn ensure_public_links_are_shared(&self) -> bool {
420432
let uuid = self.persisted.lock().await.uuid.clone();
421433
let mut link_language_guard = self.link_language.lock().await;
422434
if let Some(link_language) = link_language_guard.as_mut() {
@@ -451,6 +463,7 @@ impl PerspectiveInstance {
451463
}
452464

453465
if !links_to_commit.is_empty() {
466+
let links_count = links_to_commit.len();
454467
let result = link_language
455468
.commit(PerspectiveDiff {
456469
additions: links_to_commit,
@@ -460,11 +473,18 @@ impl PerspectiveInstance {
460473

461474
if let Err(e) = result {
462475
log::error!("Error calling link language's commit in ensure_public_links_are_shared: {:?}", e);
476+
return false;
463477
}
478+
log::debug!(
479+
"Successfully committed {} links to link language in fallback sync",
480+
links_count
481+
);
464482
}
465483

466484
//Ad4mDb::with_global_instance(|db| db.add_many_links(&self.persisted.lock().await.uuid, &remote_links)).unwrap(); // Assuming add_many_links takes a reference to a Vec<LinkExpression> and returns Result<(), AnyError>
485+
return true;
467486
}
487+
false
468488
}
469489

470490
pub async fn update_perspective_state(&self, state: PerspectiveState) -> Result<(), AnyError> {
@@ -909,6 +929,8 @@ impl PerspectiveInstance {
909929

910930
if status == LinkStatus::Shared {
911931
self.spawn_commit_and_handle_error(&diff);
932+
// Reset fallback sync interval when new shared links are added
933+
self.reset_fallback_sync_interval().await;
912934
}
913935
Ok(decorated_diff)
914936
}
@@ -2606,6 +2628,87 @@ impl PerspectiveInstance {
26062628
}
26072629
}
26082630

2631+
async fn fallback_sync_loop(&self) {
2632+
let uuid = self.persisted.lock().await.uuid.clone();
2633+
log::debug!("Starting fallback sync loop for perspective {}", uuid);
2634+
2635+
while !*self.is_teardown.lock().await {
2636+
// Check if we should run the fallback sync (avoid holding multiple locks)
2637+
let should_run = {
2638+
// Check perspective state first
2639+
let is_synced_neighbourhood = {
2640+
let handle = self.persisted.lock().await;
2641+
let result =
2642+
handle.state == PerspectiveState::Synced && handle.neighbourhood.is_some();
2643+
drop(handle); // Release lock immediately
2644+
result
2645+
};
2646+
2647+
if !is_synced_neighbourhood {
2648+
false
2649+
} else {
2650+
// Check link language availability
2651+
let link_lang_available = {
2652+
let link_lang = self.link_language.lock().await;
2653+
let result = link_lang.is_some();
2654+
drop(link_lang); // Release lock immediately
2655+
result
2656+
};
2657+
2658+
if !link_lang_available {
2659+
false
2660+
} else {
2661+
// Check timing conditions
2662+
let last_success = *self.last_successful_fallback_sync.lock().await;
2663+
let current_interval = *self.fallback_sync_interval.lock().await;
2664+
2665+
// Only run if we haven't had a successful sync recently or it's been a while
2666+
last_success.is_none() || last_success.unwrap().elapsed() > current_interval
2667+
}
2668+
}
2669+
};
2670+
2671+
if should_run {
2672+
log::debug!("Running fallback sync for perspective {}", uuid);
2673+
let success = self.ensure_public_links_are_shared().await;
2674+
2675+
if success {
2676+
// Update last successful sync time and increase interval
2677+
{
2678+
*self.last_successful_fallback_sync.lock().await =
2679+
Some(tokio::time::Instant::now());
2680+
*self.fallback_sync_interval.lock().await = Duration::from_secs(300);
2681+
}
2682+
log::debug!("Fallback sync successful for perspective {}, increasing interval to 5 minutes", uuid);
2683+
} else {
2684+
// Reset interval to 30 seconds on failure
2685+
*self.fallback_sync_interval.lock().await = Duration::from_secs(30);
2686+
log::warn!(
2687+
"Fallback sync failed for perspective {}, keeping interval at 30 seconds",
2688+
uuid
2689+
);
2690+
}
2691+
}
2692+
2693+
// Get fresh interval for sleep (after potential updates)
2694+
let sleep_interval = *self.fallback_sync_interval.lock().await;
2695+
sleep(sleep_interval).await;
2696+
}
2697+
2698+
log::debug!("Fallback sync loop ended for perspective {}", uuid);
2699+
}
2700+
2701+
/// Reset the fallback sync interval to 30 seconds when new links are added
2702+
/// This ensures that new links get synced quickly
2703+
async fn reset_fallback_sync_interval(&self) {
2704+
*self.fallback_sync_interval.lock().await = Duration::from_secs(30);
2705+
let uuid = self.persisted.lock().await.uuid.clone();
2706+
log::debug!(
2707+
"Reset fallback sync interval to 30 seconds for perspective {}",
2708+
uuid
2709+
);
2710+
}
2711+
26092712
pub async fn create_batch(&self) -> String {
26102713
let batch_uuid = Uuid::new_v4().to_string();
26112714
self.batch_store.write().await.insert(

tests/js/tests/neighbourhood.ts

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,20 @@ export default function neighbourhoodTests(testContext: TestContext) {
4242
expect(perspective?.neighbourhood).not.to.be.undefined;
4343
expect(perspective?.neighbourhood!.data.linkLanguage).to.be.equal(socialContext.address);
4444
expect(perspective?.neighbourhood!.data.meta.links.length).to.be.equal(1);
45-
expect(perspective?.state).to.be.equal(PerspectiveState.Synced);
45+
46+
// The perspective should start in NeighbourhoodCreationInitiated state
47+
expect(perspective?.state).to.be.equal(PerspectiveState.NeighboudhoodCreationInitiated);
48+
49+
// Wait for the perspective to transition to Synced state
50+
let tries = 0;
51+
const maxTries = 10;
52+
let currentPerspective = perspective;
53+
while (currentPerspective?.state !== PerspectiveState.Synced && tries < maxTries) {
54+
await sleep(1000);
55+
currentPerspective = await ad4mClient.perspective.byUUID(create.uuid);
56+
tries++;
57+
}
58+
expect(currentPerspective?.state).to.be.equal(PerspectiveState.Synced);
4659
})
4760

4861
it('can be created by Alice and joined by Bob', async () => {
@@ -154,12 +167,12 @@ export default function neighbourhoodTests(testContext: TestContext) {
154167
}
155168
//await Promise.all(linkPromises)
156169

157-
console.log("wait 10s")
158-
await sleep(10000)
170+
console.log("wait 15s for initial sync")
171+
await sleep(15000)
159172

160173
let bobLinks = await bob.perspective.queryLinks(bobP1!.uuid, new LinkQuery({source: 'root'}))
161174
let tries = 1
162-
const maxTries = 120 // 2 minutes with 1 second sleep
175+
const maxTries = 180 // 3 minutes with 1 second sleep (increased for fallback sync)
163176

164177
while(bobLinks.length < 1500 && tries < maxTries) {
165178
console.log(`Bob retrying getting links... Got ${bobLinks.length}/1500`);
@@ -186,11 +199,19 @@ export default function neighbourhoodTests(testContext: TestContext) {
186199
await testContext.alice.perspective.addLink(aliceP1.uuid, {source: 'alice', target: 'test://alice/2'})
187200
await testContext.alice.perspective.addLink(aliceP1.uuid, {source: 'alice', target: 'test://alice/3'})
188201

189-
// Wait for sync
190-
await sleep(5000)
202+
// Wait for sync with retry loop
203+
bobLinks = await testContext.bob.perspective.queryLinks(bobP1.uuid, new LinkQuery({source: 'alice'}))
204+
let bobTries = 1
205+
const maxTriesBob = 20 // 20 tries with 2 second sleep = 40 seconds max
206+
207+
while(bobLinks.length < 3 && bobTries < maxTriesBob) {
208+
console.log(`Bob retrying getting Alice's links... Got ${bobLinks.length}/3`);
209+
await sleep(2000)
210+
bobLinks = await testContext.bob.perspective.queryLinks(bobP1.uuid, new LinkQuery({source: 'alice'}))
211+
bobTries++
212+
}
191213

192214
// Verify Bob received Alice's links
193-
bobLinks = await testContext.bob.perspective.queryLinks(bobP1.uuid, new LinkQuery({source: 'alice'}))
194215
expect(bobLinks.length).to.equal(3)
195216
expect(bobLinks.some(link => link.data.target === 'test://alice/1')).to.be.true
196217
expect(bobLinks.some(link => link.data.target === 'test://alice/2')).to.be.true
@@ -321,10 +342,21 @@ export default function neighbourhoodTests(testContext: TestContext) {
321342
})
322343

323344
it('they see each other in `otherAgents`', async () => {
324-
await sleep(10000);
325-
const aliceAgents = await aliceNH!.otherAgents()
345+
// Wait for agents to discover each other with retry loop
346+
let aliceAgents = await aliceNH!.otherAgents()
347+
let bobAgents = await bobNH!.otherAgents()
348+
let tries = 1
349+
const maxTries = 20 // 20 tries with 1 second sleep = 20 seconds max
350+
351+
while ((aliceAgents.length < 1 || bobAgents.length < 1) && tries < maxTries) {
352+
console.log(`Waiting for agents to discover each other... Alice: ${aliceAgents.length}, Bob: ${bobAgents.length}`);
353+
await sleep(1000)
354+
aliceAgents = await aliceNH!.otherAgents()
355+
bobAgents = await bobNH!.otherAgents()
356+
tries++
357+
}
358+
326359
console.log("alice agents", aliceAgents);
327-
const bobAgents = await bobNH!.otherAgents()
328360
console.log("bob agents", bobAgents);
329361
expect(aliceAgents.length).to.be.equal(1)
330362
expect(aliceAgents[0]).to.be.equal(bobDID)

0 commit comments

Comments
 (0)