@@ -10,11 +10,11 @@ use std::iter;
1010use std:: sync:: Arc ;
1111use std:: time:: Duration ;
1212
13- use async_utility:: futures_util:: stream:: BoxStream ;
13+ use async_utility:: futures_util:: stream:: { BoxStream , FuturesUnordered } ;
1414use nostr:: prelude:: * ;
1515use nostr_database:: prelude:: * ;
1616use nostr_relay_pool:: prelude:: * ;
17- use tokio:: sync:: broadcast;
17+ use tokio:: sync:: { broadcast, Semaphore } ;
1818
1919pub mod builder;
2020mod error;
@@ -1342,13 +1342,13 @@ impl Client {
13421342 // Get kind
13431343 let kind: Kind = kind. to_event_kind ( ) ;
13441344
1345- // Compose filters
1346- let filter : Filter = Filter :: default ( )
1345+ // Compose database filter
1346+ let db_filter : Filter = Filter :: default ( )
13471347 . authors ( outdated_public_keys. clone ( ) )
13481348 . kind ( kind) ;
13491349
1350- // Query from database
1351- let stored_events: Events = self . database ( ) . query ( filter . clone ( ) ) . await ?;
1350+ // Get events from database
1351+ let stored_events: Events = self . database ( ) . query ( db_filter ) . await ?;
13521352
13531353 // Get DISCOVERY and READ relays
13541354 let urls: Vec < RelayUrl > = self
@@ -1359,16 +1359,69 @@ impl Client {
13591359 )
13601360 . await ;
13611361
1362- // Get events from discovery and read relays
1363- let events: Events = self
1364- . fetch_events_from ( urls, filter, Duration :: from_secs ( 10 ) )
1362+ let semaphore = Arc :: new ( Semaphore :: new ( 10 ) ) ; // Allow at max 10 concurrent requests
1363+ let mut futures = FuturesUnordered :: new ( ) ;
1364+
1365+ // Try to fetch from relays only the newer events (last created_at + 1)
1366+ for event in stored_events. iter ( ) {
1367+ let author = event. pubkey ;
1368+ let created_at = event. created_at ;
1369+ let urls = urls. clone ( ) ;
1370+ let semaphore = semaphore. clone ( ) ;
1371+
1372+ futures. push ( async move {
1373+ // Acquire permit
1374+ let _permit = semaphore. acquire ( ) . await ;
1375+
1376+ // Construct filter
1377+ let filter: Filter = Filter :: new ( )
1378+ . author ( author)
1379+ . kind ( kind)
1380+ . since ( created_at + Duration :: from_secs ( 1 ) )
1381+ . limit ( 1 ) ;
1382+
1383+ // Fetch the event
1384+ let events: Events = self
1385+ . fetch_events_from ( urls, filter, Duration :: from_secs ( 10 ) )
1386+ . await ?;
1387+
1388+ Ok :: < _ , Error > ( events)
1389+ } ) ;
1390+ }
1391+
1392+ // Keep track of the missing public keys
1393+ let mut missing_public_keys: HashSet < PublicKey > = outdated_public_keys;
1394+
1395+ // Keep track of the updated events
1396+ let mut updated_events = Events :: default ( ) ;
1397+
1398+ while let Some ( result) = futures. next ( ) . await {
1399+ if let Ok ( events) = result {
1400+ if let Some ( event) = events. first ( ) {
1401+ // Remove from missing set
1402+ missing_public_keys. remove ( & event. pubkey ) ;
1403+
1404+ // Update the last check for this public key
1405+ self . gossip . update_last_check ( [ event. pubkey ] ) . await ;
1406+ }
1407+
1408+ updated_events = updated_events. merge ( events) ;
1409+ }
1410+ }
1411+
1412+ // Get the missing events
1413+ let missing_filter: Filter = Filter :: default ( )
1414+ . authors ( missing_public_keys. clone ( ) )
1415+ . kind ( kind) ;
1416+ let missing_events: Events = self
1417+ . fetch_events_from ( urls, missing_filter, Duration :: from_secs ( 10 ) )
13651418 . await ?;
13661419
1367- // Update last check for these public keys
1368- self . gossip . update_last_check ( outdated_public_keys ) . await ;
1420+ // Update the last check for the missing public keys
1421+ self . gossip . update_last_check ( missing_public_keys ) . await ;
13691422
1370- // Merge database and relays events
1371- let merged: Events = events . merge ( stored_events ) ;
1423+ // Merge all the events
1424+ let merged: Events = stored_events . merge ( updated_events ) . merge ( missing_events ) ;
13721425
13731426 // Update gossip graph
13741427 self . gossip . update ( merged) . await ;
0 commit comments