@@ -19,6 +19,8 @@ use std::time::Instant;
19
19
use async_utility:: futures_util:: stream:: AbortHandle ;
20
20
use async_utility:: { futures_util, thread, time} ;
21
21
use nostr:: message:: MessageHandleError ;
22
+ use nostr:: negentropy:: hex;
23
+ use nostr:: negentropy:: { self , Bytes , Negentropy } ;
22
24
#[ cfg( feature = "nip11" ) ]
23
25
use nostr:: nips:: nip11:: RelayInformationDocument ;
24
26
use nostr:: { ClientMessage , Event , EventId , Filter , RelayMessage , SubscriptionId , Timestamp , Url } ;
@@ -41,6 +43,12 @@ type Message = (RelayEvent, Option<oneshot::Sender<bool>>);
41
43
/// [`Relay`] error
42
44
#[ derive( Debug , Error ) ]
43
45
pub enum Error {
46
+ /// Negentropy error
47
+ #[ error( transparent) ]
48
+ Negentropy ( #[ from] negentropy:: Error ) ,
49
+ /// Hex error
50
+ #[ error( transparent) ]
51
+ Hex ( #[ from] hex:: Error ) ,
44
52
/// Channel timeout
45
53
#[ error( "channel timeout" ) ]
46
54
ChannelTimeout ,
@@ -1546,4 +1554,89 @@ impl Relay {
1546
1554
}
1547
1555
} ) ;
1548
1556
}
1557
+
1558
+ /// Negentropy reconciliation
1559
+ pub async fn reconcilie (
1560
+ & self ,
1561
+ filter : Filter ,
1562
+ my_items : Vec < ( EventId , Timestamp ) > ,
1563
+ ) -> Result < ( ) , Error > {
1564
+ if !self . opts . read ( ) {
1565
+ return Err ( Error :: ReadDisabled ) ;
1566
+ }
1567
+
1568
+ let id_size: usize = 16 ;
1569
+
1570
+ let mut negentropy = Negentropy :: new ( id_size, Some ( 5_000 ) ) ?;
1571
+
1572
+ for ( id, timestamp) in my_items. into_iter ( ) {
1573
+ let cutted_id: & [ u8 ] = & id. as_bytes ( ) [ ..id_size] ;
1574
+ let cutted_id = Bytes :: from_slice ( cutted_id) ;
1575
+ negentropy. add_item ( timestamp. as_u64 ( ) , cutted_id) ?;
1576
+ }
1577
+
1578
+ negentropy. seal ( ) ?;
1579
+
1580
+ let id = SubscriptionId :: generate ( ) ;
1581
+ let open_msg = ClientMessage :: neg_open ( & mut negentropy, & id, filter) ?;
1582
+
1583
+ self . send_msg ( open_msg, Some ( Duration :: from_secs ( 10 ) ) )
1584
+ . await ?;
1585
+
1586
+ let mut notifications = self . notification_sender . subscribe ( ) ;
1587
+ while let Ok ( notification) = notifications. recv ( ) . await {
1588
+ if let RelayPoolNotification :: Message ( url, msg) = notification {
1589
+ if url == self . url {
1590
+ match msg {
1591
+ RelayMessage :: NegMsg {
1592
+ subscription_id,
1593
+ message,
1594
+ } => {
1595
+ if subscription_id == id {
1596
+ let query: Bytes = Bytes :: from_hex ( message) ?;
1597
+ let mut need_ids: Vec < Bytes > = Vec :: new ( ) ;
1598
+ let msg: Option < Bytes > = negentropy. reconcile_with_ids (
1599
+ & query,
1600
+ & mut Vec :: new ( ) ,
1601
+ & mut need_ids,
1602
+ ) ?;
1603
+
1604
+ // TODO: request ids to relay
1605
+ println ! ( "IDs: {need_ids:?}" ) ;
1606
+
1607
+ match msg {
1608
+ Some ( query) => {
1609
+ self . send_msg (
1610
+ ClientMessage :: NegMsg {
1611
+ subscription_id : id. clone ( ) ,
1612
+ message : query. to_hex ( ) ,
1613
+ } ,
1614
+ None ,
1615
+ )
1616
+ . await ?;
1617
+ }
1618
+ None => {
1619
+ tracing:: info!( "Reconciliation terminated" ) ;
1620
+ break ;
1621
+ }
1622
+ }
1623
+ }
1624
+ }
1625
+ RelayMessage :: NegErr {
1626
+ subscription_id,
1627
+ code,
1628
+ } => {
1629
+ if subscription_id == id {
1630
+ tracing:: error!( "Negentropy syncing error: {code}" ) ;
1631
+ break ;
1632
+ }
1633
+ }
1634
+ _ => ( ) ,
1635
+ }
1636
+ }
1637
+ }
1638
+ }
1639
+
1640
+ Ok ( ( ) )
1641
+ }
1549
1642
}
0 commit comments