@@ -16,7 +16,6 @@ use async_wsocket::futures_util::{self, SinkExt, StreamExt};
16
16
use async_wsocket:: { ConnectionMode , Message } ;
17
17
use atomic_destructor:: AtomicDestroyer ;
18
18
use negentropy:: { Id , Negentropy , NegentropyStorageVector } ;
19
- use negentropy_deprecated:: { Bytes as BytesDeprecated , Negentropy as NegentropyDeprecated } ;
20
19
use nostr:: secp256k1:: rand:: { self , Rng } ;
21
20
use nostr_database:: prelude:: * ;
22
21
use tokio:: sync:: mpsc:: { self , Receiver , Sender } ;
@@ -1780,7 +1779,7 @@ impl InnerRelay {
1780
1779
1781
1780
/// New negentropy protocol
1782
1781
#[ inline( never) ]
1783
- pub ( super ) async fn sync_new (
1782
+ pub ( super ) async fn sync (
1784
1783
& self ,
1785
1784
filter : & Filter ,
1786
1785
items : Vec < ( EventId , Timestamp ) > ,
@@ -1932,163 +1931,6 @@ impl InnerRelay {
1932
1931
1933
1932
Ok ( ( ) )
1934
1933
}
1935
-
1936
- /// Deprecated negentropy protocol
1937
- #[ inline( never) ]
1938
- pub ( super ) async fn sync_deprecated (
1939
- & self ,
1940
- filter : & Filter ,
1941
- items : Vec < ( EventId , Timestamp ) > ,
1942
- opts : & SyncOptions ,
1943
- output : & mut Reconciliation ,
1944
- ) -> Result < ( ) , Error > {
1945
- // Compose negentropy struct, add items and seal
1946
- let mut negentropy = NegentropyDeprecated :: new ( 32 , Some ( NEGENTROPY_FRAME_SIZE_LIMIT ) ) ?;
1947
- for ( id, timestamp) in items. into_iter ( ) {
1948
- let id = BytesDeprecated :: from_slice ( id. as_bytes ( ) ) ;
1949
- negentropy. add_item ( timestamp. as_u64 ( ) , id) ?;
1950
- }
1951
- negentropy. seal ( ) ?;
1952
-
1953
- // Initiate message
1954
- let initial_message = negentropy. initiate ( ) ?;
1955
-
1956
- // Subscribe to notifications
1957
- let mut notifications = self . internal_notification_sender . subscribe ( ) ;
1958
- let mut temp_notifications = self . internal_notification_sender . subscribe ( ) ;
1959
-
1960
- // Send the initial negentropy message
1961
- let sub_id = SubscriptionId :: generate ( ) ;
1962
- let open_msg: ClientMessage = ClientMessage :: NegOpen {
1963
- subscription_id : Cow :: Borrowed ( & sub_id) ,
1964
- filter : Cow :: Borrowed ( filter) ,
1965
- id_size : Some ( 32 ) ,
1966
- initial_message : Cow :: Owned ( hex:: encode ( initial_message) ) ,
1967
- } ;
1968
- self . send_msg ( open_msg) ?;
1969
-
1970
- // Check if negentropy is supported
1971
- check_negentropy_support ( & sub_id, opts, & mut temp_notifications) . await ?;
1972
-
1973
- let mut in_flight_up: HashSet < EventId > = HashSet :: new ( ) ;
1974
- let mut in_flight_down: bool = false ;
1975
- let mut sync_done: bool = false ;
1976
- let mut have_ids: Vec < EventId > = Vec :: new ( ) ;
1977
- let mut need_ids: Vec < EventId > = Vec :: new ( ) ;
1978
- let down_sub_id: SubscriptionId = SubscriptionId :: generate ( ) ;
1979
-
1980
- // Start reconciliation
1981
- while let Ok ( notification) = notifications. recv ( ) . await {
1982
- match notification {
1983
- RelayNotification :: Message { message } => {
1984
- match message {
1985
- RelayMessage :: NegMsg {
1986
- subscription_id,
1987
- message,
1988
- } => {
1989
- if subscription_id. as_ref ( ) == & sub_id {
1990
- let mut curr_have_ids: Vec < BytesDeprecated > = Vec :: new ( ) ;
1991
- let mut curr_need_ids: Vec < BytesDeprecated > = Vec :: new ( ) ;
1992
-
1993
- // Parse message
1994
- let query: BytesDeprecated =
1995
- BytesDeprecated :: from_hex ( message. as_ref ( ) ) ?;
1996
-
1997
- // Reconcile
1998
- let msg: Option < BytesDeprecated > = negentropy. reconcile_with_ids (
1999
- & query,
2000
- & mut curr_have_ids,
2001
- & mut curr_need_ids,
2002
- ) ?;
2003
-
2004
- // Handle the message
2005
- self . handle_neg_msg (
2006
- & subscription_id,
2007
- msg. map ( |m| m. to_bytes ( ) ) ,
2008
- curr_have_ids. into_iter ( ) . filter_map ( neg_depr_to_event_id) ,
2009
- curr_need_ids. into_iter ( ) . filter_map ( neg_depr_to_event_id) ,
2010
- opts,
2011
- output,
2012
- & mut have_ids,
2013
- & mut need_ids,
2014
- & mut sync_done,
2015
- ) ?;
2016
- }
2017
- }
2018
- RelayMessage :: NegErr {
2019
- subscription_id,
2020
- message,
2021
- } => {
2022
- if subscription_id. as_ref ( ) == & sub_id {
2023
- return Err ( Error :: RelayMessage ( message. into_owned ( ) ) ) ;
2024
- }
2025
- }
2026
- RelayMessage :: Ok {
2027
- event_id,
2028
- status,
2029
- message,
2030
- } => {
2031
- self . handle_neg_ok (
2032
- & mut in_flight_up,
2033
- event_id,
2034
- status,
2035
- message,
2036
- output,
2037
- ) ;
2038
- }
2039
- RelayMessage :: Event {
2040
- subscription_id,
2041
- event,
2042
- } => {
2043
- if subscription_id. as_ref ( ) == & down_sub_id {
2044
- output. received . insert ( event. id ) ;
2045
- }
2046
- }
2047
- RelayMessage :: EndOfStoredEvents ( id) => {
2048
- if id. as_ref ( ) == & down_sub_id {
2049
- in_flight_down = false ;
2050
- }
2051
- }
2052
- RelayMessage :: Closed {
2053
- subscription_id, ..
2054
- } => {
2055
- if subscription_id. as_ref ( ) == & down_sub_id {
2056
- in_flight_down = false ;
2057
- }
2058
- }
2059
- _ => ( ) ,
2060
- }
2061
-
2062
- // Send events
2063
- self . upload_neg_events ( & mut have_ids, & mut in_flight_up, opts)
2064
- . await ?;
2065
-
2066
- // Get events
2067
- self . req_neg_events ( & mut need_ids, & mut in_flight_down, & down_sub_id, opts) ?;
2068
- }
2069
- RelayNotification :: RelayStatus { status } => {
2070
- if status. is_disconnected ( ) {
2071
- return Err ( Error :: NotConnected ) ;
2072
- }
2073
- }
2074
- RelayNotification :: Shutdown => return Err ( Error :: ReceivedShutdown ) ,
2075
- _ => ( ) ,
2076
- } ;
2077
-
2078
- if sync_done
2079
- && have_ids. is_empty ( )
2080
- && need_ids. is_empty ( )
2081
- && in_flight_up. is_empty ( )
2082
- && !in_flight_down
2083
- {
2084
- break ;
2085
- }
2086
- }
2087
-
2088
- tracing:: info!( url = %self . url, "Deprecated negentropy reconciliation terminated." ) ;
2089
-
2090
- Ok ( ( ) )
2091
- }
2092
1934
}
2093
1935
2094
1936
/// Send WebSocket messages with timeout set to [WEBSOCKET_TX_TIMEOUT].
@@ -2114,11 +1956,6 @@ fn neg_id_to_event_id(id: Id) -> EventId {
2114
1956
EventId :: from_byte_array ( id. to_bytes ( ) )
2115
1957
}
2116
1958
2117
- #[ inline]
2118
- fn neg_depr_to_event_id ( id : BytesDeprecated ) -> Option < EventId > {
2119
- EventId :: from_slice ( id. as_bytes ( ) ) . ok ( )
2120
- }
2121
-
2122
1959
fn prepare_negentropy_storage (
2123
1960
items : Vec < ( EventId , Timestamp ) > ,
2124
1961
) -> Result < NegentropyStorageVector , Error > {
0 commit comments