diff --git a/Cargo.lock b/Cargo.lock index e5ea094a3..f466a204e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2733,6 +2733,7 @@ dependencies = [ "egui-winit", "ehttp", "enostr", + "futures", "hex", "image", "jni", @@ -2750,6 +2751,7 @@ dependencies = [ "tempfile", "thiserror 2.0.12", "tokenator", + "tokio", "tracing", "url", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 2e9b9f724..4906a9efb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ egui_virtual_list = { git = "https://github.com/jb55/hello_egui", rev = "a66b679 ehttp = "0.5.0" enostr = { path = "crates/enostr" } ewebsock = { version = "0.2.0", features = ["tls"] } +futures = "0.3.31" hex = "0.4.3" image = { version = "0.25", features = ["jpeg", "png", "webp"] } indexmap = "2.6.0" diff --git a/crates/enostr/src/relay/pool.rs b/crates/enostr/src/relay/pool.rs index efbbbcc7f..46796db94 100644 --- a/crates/enostr/src/relay/pool.rs +++ b/crates/enostr/src/relay/pool.rs @@ -96,6 +96,15 @@ impl PoolRelay { } pub fn subscribe(&mut self, subid: String, filter: Vec) -> Result<()> { + debug!( + "PoolRelay subscribe: sending sub {} {}: {:?}", + subid, + self.url(), + filter + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), + ); self.send(&ClientMessage::req(subid, filter)) } @@ -197,6 +206,7 @@ impl RelayPool { if let Some(debug) = &mut self.debug { debug.send_cmd(relay.url().to_owned(), &cmd); } + debug!("RelayPool unsubscribe close {} {}", subid, relay.url()); if let Err(err) = relay.send(&cmd) { error!( "error unsubscribing from {} on {}: {err}", @@ -215,10 +225,57 @@ impl RelayPool { &ClientMessage::req(subid.clone(), filter.clone()), ); } + debug!( + "RelayPool subscribe: sending sub {} {}: {:?}", + subid, + relay.url(), + filter + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), + ); + if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) { + error!("error subscribing to {}: {err}", relay.url()); + } + } + } + // if the relay is in the pool already send the subscription, otherwise add the + // relay to the pool and we'll send it on open. + pub fn subscribe_relay( + &mut self, + subid: String, + filter: Vec, + relaystr: String, + ) -> bool { + if let Some(&mut ref mut relay) = self.relays.iter_mut().find(|r| r.url() == relaystr) { + if let Some(debug) = &mut self.debug { + debug.send_cmd( + relay.url().to_owned(), + &ClientMessage::req(subid.clone(), filter.clone()), + ); + } + debug!( + "RelayPool subscribe_relay: sending sub {} {}: {:?}", + subid, + relay.url(), + filter + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), + ); if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) { error!("error subscribing to {}: {err}", relay.url()); } + true + } else { + let wakeup = move || { + // ignore + }; + if let Err(err) = self.add_url(relaystr.clone(), wakeup) { + error!("trouble adding url {}: {:?}", relaystr, err); + } + false } } @@ -343,7 +400,7 @@ impl RelayPool { } // standardize the format (ie, trailing slashes) - fn canonicalize_url(url: String) -> String { + pub fn canonicalize_url(url: String) -> String { match Url::parse(&url) { Ok(parsed_url) => parsed_url.to_string(), Err(_) => url, // If parsing fails, return the original URL. diff --git a/crates/notedeck/Cargo.toml b/crates/notedeck/Cargo.toml index a948b73bc..f620dfd88 100644 --- a/crates/notedeck/Cargo.toml +++ b/crates/notedeck/Cargo.toml @@ -15,6 +15,7 @@ enostr = { workspace = true } nostr = { workspace = true } egui = { workspace = true } eframe = { workspace = true } +futures = { workspace = true } image = { workspace = true } base32 = { workspace = true } poll-promise = { workspace = true } @@ -32,9 +33,11 @@ ehttp = {workspace = true } mime_guess = { workspace = true } egui-winit = { workspace = true } tokenator = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] } [dev-dependencies] tempfile = { workspace = true } +nostr = { workspace = true } [target.'cfg(target_os = "android")'.dependencies] jni = { workspace = true } diff --git a/crates/notedeck/src/accounts.rs b/crates/notedeck/src/accounts.rs index 2b0b256ae..ea6ab435f 100644 --- a/crates/notedeck/src/accounts.rs +++ b/crates/notedeck/src/accounts.rs @@ -1,17 +1,18 @@ use tracing::{debug, error, info}; +use crate::subman::{RemoteId, SubSpecBuilder}; use crate::{ - AccountStorage, MuteFun, Muted, RelaySpec, SingleUnkIdAction, UnknownIds, UserAccount, + AccountStorage, MuteFun, Muted, RelaySpec, SingleUnkIdAction, SubError, SubMan, UnknownIds, + UserAccount, }; -use enostr::{ClientMessage, FilledKeypair, Keypair, Pubkey, RelayPool}; -use nostrdb::{Filter, Ndb, Note, NoteBuilder, NoteKey, Subscription, Transaction}; +use enostr::{FilledKeypair, Keypair, Pubkey, RelayPool}; +use nostrdb::{Filter, Ndb, Note, NoteBuilder, NoteKey, Transaction}; use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; use url::Url; -use uuid::Uuid; // TODO: remove this -use std::sync::Arc; +use std::sync::{Arc, Mutex}; #[derive(Debug, Clone)] pub struct SwitchAccountAction { @@ -36,10 +37,9 @@ pub enum AccountsAction { pub struct AccountRelayData { filter: Filter, - subid: Option, - sub: Option, - local: BTreeSet, // used locally but not advertised - advertised: BTreeSet, // advertised via NIP-65 + sub_remote_id: Option, + _local: BTreeSet, // used locally but not advertised + advertised: Arc>>, // advertised via NIP-65 } #[derive(Default)] @@ -81,47 +81,64 @@ impl AccountRelayData { AccountRelayData { filter, - subid: None, - sub: None, - local: BTreeSet::new(), - advertised: relays.into_iter().collect(), + sub_remote_id: None, + _local: BTreeSet::new(), + advertised: Arc::new(Mutex::new(relays.into_iter().collect())), } } // make this account the current selected account - pub fn activate(&mut self, ndb: &Ndb, pool: &mut RelayPool) { + pub fn activate(&mut self, subman: &mut SubMan, default_relays: &[RelaySpec]) { debug!("activating relay sub {}", self.filter.json().unwrap()); - assert_eq!(self.subid, None, "subid already exists"); - assert_eq!(self.sub, None, "sub already exists"); - - // local subscription - let sub = ndb - .subscribe(&[self.filter.clone()]) - .expect("ndb relay list subscription"); - - // remote subscription - let subid = Uuid::new_v4().to_string(); - pool.subscribe(subid.clone(), vec![self.filter.clone()]); - - self.sub = Some(sub); - self.subid = Some(subid); + assert!(self.sub_remote_id.is_none(), "subscription already exists"); + let ndb = subman.ndb(); + let subspec = SubSpecBuilder::new() + .filters(vec![self.filter.clone()]) + .build(); + debug!( + "activating account relay sub {}: {}", + subspec.remote_id, + self.filter.json().unwrap() + ); + if let Ok(mut rcvr) = subman.subscribe(subspec, default_relays) { + let idstr = rcvr.idstr(); + self.sub_remote_id = rcvr.remote_id(); + let advertisedref = self.advertised.clone(); + tokio::spawn(async move { + loop { + match rcvr.next().await { + Err(SubError::StreamEnded) => { + debug!("account relays: sub {} complete", idstr); + break; + } + Err(err) => { + error!("account relays: sub {}: error: {:?}", idstr, err); + break; + } + Ok(nks) => { + debug!("account relays: sub {}: note keys: {:?}", idstr, nks); + let txn = Transaction::new(&ndb).expect("txn"); + let relays = Self::harvest_nip65_relays(&ndb, &txn, &nks); + debug!("updated relays {:?}", relays); + *advertisedref.lock().unwrap() = relays.into_iter().collect(); + } + } + } + }); + } } // this account is no longer the selected account - pub fn deactivate(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { - debug!("deactivating relay sub {}", self.filter.json().unwrap()); - assert_ne!(self.subid, None, "subid doesn't exist"); - assert_ne!(self.sub, None, "sub doesn't exist"); - - // remote subscription - pool.unsubscribe(self.subid.as_ref().unwrap().clone()); - - // local subscription - ndb.unsubscribe(self.sub.unwrap()) - .expect("ndb relay list unsubscribe"); - - self.sub = None; - self.subid = None; + pub fn deactivate(&mut self, subman: &mut SubMan) { + assert!(self.sub_remote_id.is_some(), "subscription doesn't exist"); + let remote_id = self.sub_remote_id.as_ref().unwrap(); + debug!( + "deactivating account relays sub {}: {}", + remote_id, + self.filter.json().unwrap() + ); + subman.unsubscribe_remote_id(remote_id).ok(); + self.sub_remote_id = None; } // standardize the format (ie, trailing slashes) to avoid dups @@ -171,7 +188,7 @@ impl AccountRelayData { pub fn publish_nip65_relays(&self, seckey: &[u8; 32], pool: &mut RelayPool) { let mut builder = NoteBuilder::new().kind(10002).content(""); - for rs in &self.advertised { + for rs in self.advertised.lock().unwrap().iter() { builder = builder.start_tag().tag_str("r").tag_str(&rs.url); if rs.has_read_marker { builder = builder.tag_str("read"); @@ -186,9 +203,8 @@ impl AccountRelayData { pub struct AccountMutedData { filter: Filter, - subid: Option, - sub: Option, - muted: Arc, + sub_remote_id: Option, + muted: Arc>, } impl AccountMutedData { @@ -214,46 +230,63 @@ impl AccountMutedData { AccountMutedData { filter, - subid: None, - sub: None, - muted: Arc::new(muted), + sub_remote_id: None, + muted: Arc::new(Mutex::new(muted)), } } // make this account the current selected account - pub fn activate(&mut self, ndb: &Ndb, pool: &mut RelayPool) { - debug!("activating muted sub {}", self.filter.json().unwrap()); - assert_eq!(self.subid, None, "subid already exists"); - assert_eq!(self.sub, None, "sub already exists"); - - // local subscription - let sub = ndb - .subscribe(&[self.filter.clone()]) - .expect("ndb muted subscription"); - - // remote subscription - let subid = Uuid::new_v4().to_string(); - pool.subscribe(subid.clone(), vec![self.filter.clone()]); - - self.sub = Some(sub); - self.subid = Some(subid); + pub fn activate(&mut self, subman: &mut SubMan, default_relays: &[RelaySpec]) { + assert!(self.sub_remote_id.is_none(), "subscription already exists"); + let ndb = subman.ndb(); + let subspec = SubSpecBuilder::new() + .filters(vec![self.filter.clone()]) + .build(); + debug!( + "activating account muted sub {}: {}", + subspec.remote_id, + self.filter.json().unwrap() + ); + if let Ok(mut rcvr) = subman.subscribe(subspec, default_relays) { + let idstr = rcvr.idstr(); + self.sub_remote_id = rcvr.remote_id(); + let mutedref = self.muted.clone(); + tokio::spawn(async move { + loop { + match rcvr.next().await { + Err(SubError::StreamEnded) => { + debug!("account muted: sub {} complete", idstr); + break; + } + Err(err) => { + error!("account muted: sub {}: error: {:?}", idstr, err); + break; + } + Ok(nks) => { + debug!("account muted: sub {}: note keys: {:?}", idstr, nks); + let txn = Transaction::new(&ndb).expect("txn"); + let muted = AccountMutedData::harvest_nip51_muted(&ndb, &txn, &nks); + debug!("updated muted {:?}", muted); + *mutedref.lock().unwrap() = muted; + } + } + } + }); + } } // this account is no longer the selected account - pub fn deactivate(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { - debug!("deactivating muted sub {}", self.filter.json().unwrap()); - assert_ne!(self.subid, None, "subid doesn't exist"); - assert_ne!(self.sub, None, "sub doesn't exist"); - - // remote subscription - pool.unsubscribe(self.subid.as_ref().unwrap().clone()); - - // local subscription - ndb.unsubscribe(self.sub.unwrap()) - .expect("ndb muted unsubscribe"); + pub fn deactivate(&mut self, subman: &mut SubMan) { + assert!(self.sub_remote_id.is_some(), "subscription doesn't exist"); + let remote_id = self.sub_remote_id.as_ref().unwrap(); + debug!( + "deactivating account muted sub {}: {}", + remote_id, + self.filter.json().unwrap() + ); - self.sub = None; - self.subid = None; + subman.unsubscribe_remote_id(remote_id).ok(); + self.sub_remote_id = None; } fn harvest_nip51_muted(ndb: &Ndb, txn: &Transaction, nks: &[NoteKey]) -> Muted { @@ -504,7 +537,12 @@ impl Accounts { .map(|i| self.get_account(i))? } - pub fn get_selected_account_data(&mut self) -> Option<&mut AccountData> { + pub fn get_selected_account_data(&self) -> Option<&AccountData> { + let account_pubkey = *self.selected_account_pubkey_bytes()?; + self.account_data.get(&account_pubkey) + } + + pub fn get_selected_account_data_mut(&mut self) -> Option<&mut AccountData> { let account_pubkey = *self.selected_account_pubkey_bytes()?; self.account_data.get_mut(&account_pubkey) } @@ -534,7 +572,7 @@ impl Accounts { if let Some(account) = self.accounts.get(index) { let pubkey = account.key.pubkey.bytes(); if let Some(account_data) = self.account_data.get(pubkey) { - let muted = Arc::clone(&account_data.muted.muted); + let muted = account_data.muted.muted.lock().unwrap().clone(); return Box::new(move |note: &Note, thread: &[u8; 32]| { muted.is_muted(note, thread) }); @@ -544,25 +582,6 @@ impl Accounts { Box::new(|_: &Note, _: &[u8; 32]| false) } - pub fn send_initial_filters(&mut self, pool: &mut RelayPool, relay_url: &str) { - for data in self.account_data.values() { - // send the active account's relay list subscription - if let Some(relay_subid) = &data.relay.subid { - pool.send_to( - &ClientMessage::req(relay_subid.clone(), vec![data.relay.filter.clone()]), - relay_url, - ); - } - // send the active account's muted subscription - if let Some(muted_subid) = &data.muted.subid { - pool.send_to( - &ClientMessage::req(muted_subid.clone(), vec![data.muted.filter.clone()]), - relay_url, - ); - } - } - } - // Return accounts which have no account_data yet (added) and accounts // which have still data but are no longer in our account list (removed). fn delta_accounts(&self) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) { @@ -598,150 +617,110 @@ impl Accounts { self.account_data.remove(pubkey); } - fn poll_for_updates(&mut self, ndb: &Ndb) -> bool { - let mut changed = false; - for (pubkey, data) in &mut self.account_data { - if let Some(sub) = data.relay.sub { - let nks = ndb.poll_for_notes(sub, 1); - if !nks.is_empty() { - let txn = Transaction::new(ndb).expect("txn"); - let relays = AccountRelayData::harvest_nip65_relays(ndb, &txn, &nks); - debug!( - "pubkey {}: updated relays {:?}", - hex::encode(pubkey), - relays - ); - data.relay.advertised = relays.into_iter().collect(); - changed = true; - } - } - if let Some(sub) = data.muted.sub { - let nks = ndb.poll_for_notes(sub, 1); - if !nks.is_empty() { - let txn = Transaction::new(ndb).expect("txn"); - let muted = AccountMutedData::harvest_nip51_muted(ndb, &txn, &nks); - debug!("pubkey {}: updated muted {:?}", hex::encode(pubkey), muted); - data.muted.muted = Arc::new(muted); - changed = true; - } - } - } - changed + // true if get_combined_relays is returning forced relays + pub fn is_forced_relays(&self) -> bool { + !self.forced_relays.is_empty() } - fn update_relay_configuration( - &mut self, - pool: &mut RelayPool, - wakeup: impl Fn() + Send + Sync + Clone + 'static, - ) { - debug!( - "updating relay configuration for currently selected {:?}", - self.currently_selected_account - .map(|i| hex::encode(self.accounts.get(i).unwrap().key.pubkey.bytes())) - ); + fn get_combined_relays( + &self, + data_option: Option<&AccountData>, + filter: impl Fn(&RelaySpec) -> bool, + ) -> Vec { + // relays specified on the command line override everything + if !self.forced_relays.is_empty() { + return self.forced_relays.iter().cloned().collect(); + } - // If forced relays are set use them only - let mut desired_relays = self.forced_relays.clone(); + // is there a currently selected account? + let mut relays = if let Some(data) = data_option { + data.relay + .advertised + .lock() + .unwrap() + .iter() + .filter(|&x| filter(x)) + .cloned() + .collect() + } else { + // no selected account + Vec::new() + }; - // Compose the desired relay lists from the selected account - if desired_relays.is_empty() { - if let Some(data) = self.get_selected_account_data() { - desired_relays.extend(data.relay.local.iter().cloned()); - desired_relays.extend(data.relay.advertised.iter().cloned()); - } + if relays.is_empty() { + relays.extend(self.bootstrap_relays.iter().filter(|&x| filter(x)).cloned()); } - // If no relays are specified at this point use the bootstrap list - if desired_relays.is_empty() { - desired_relays = self.bootstrap_relays.clone(); - } + relays + } - debug!("current relays: {:?}", pool.urls()); - debug!("desired relays: {:?}", desired_relays); + pub fn get_selected_account_readable_relays(&self) -> Vec { + self.get_combined_relays(self.get_selected_account_data(), |relay| { + relay.is_readable() + }) + } - let pool_specs = pool - .urls() - .iter() - .map(|url| RelaySpec::new(url.clone(), false, false)) - .collect(); - let add: BTreeSet = desired_relays.difference(&pool_specs).cloned().collect(); - let mut sub: BTreeSet = - pool_specs.difference(&desired_relays).cloned().collect(); - if !add.is_empty() { - debug!("configuring added relays: {:?}", add); - let _ = pool.add_urls(add.iter().map(|r| r.url.clone()).collect(), wakeup); - } - if !sub.is_empty() { - // certain relays are persistent like the multicast relay, - // although we should probably have a way to explicitly - // disable it - sub.remove(&RelaySpec::new("multicast", false, false)); - - debug!("removing unwanted relays: {:?}", sub); - pool.remove_urls(&sub.iter().map(|r| r.url.clone()).collect()); - } + pub fn get_selected_account_writable_relays(&self) -> Vec { + self.get_combined_relays(self.get_selected_account_data(), |relay| { + relay.is_writable() + }) + } - debug!("current relays: {:?}", pool.urls()); + pub fn get_all_selected_account_relays(&self) -> Vec { + self.get_combined_relays(self.get_selected_account_data(), |_| true) + } + + // returns the active account's relays or empty set if there are none (no bootstrap) + pub fn get_advertised_relays(&self) -> BTreeSet { + if let Some(data) = self.get_selected_account_data() { + data.relay.advertised.lock().unwrap().clone() + } else { + BTreeSet::new() + } } - pub fn update(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, ctx: &egui::Context) { + pub fn update(&mut self, subman: &mut SubMan, _ctx: &egui::Context) { // IMPORTANT - This function is called in the UI update loop, // make sure it is fast when idle - // On the initial update the relays need config even if nothing changes below - let mut need_reconfig = self.needs_relay_config; - - let ctx2 = ctx.clone(); - let wakeup = move || { - ctx2.request_repaint(); - }; - // Do we need to deactivate any existing account subs? for (ndx, account) in self.accounts.iter().enumerate() { if Some(ndx) != self.currently_selected_account { // this account is not currently selected if let Some(data) = self.account_data.get_mut(account.key.pubkey.bytes()) { - if data.relay.sub.is_some() { + if data.relay.sub_remote_id.is_some() { // this account has relay subs, deactivate them - data.relay.deactivate(ndb, pool); + data.relay.deactivate(subman); } - if data.muted.sub.is_some() { + if data.muted.sub_remote_id.is_some() { // this account has muted subs, deactivate them - data.muted.deactivate(ndb, pool); + data.muted.deactivate(subman); } } } } + let ndb = subman.ndb().clone(); + // Were any accounts added or removed? let (added, removed) = self.delta_accounts(); for pk in added { - self.handle_added_account(ndb, &pk); - need_reconfig = true; + self.handle_added_account(&ndb, &pk); } for pk in removed { self.handle_removed_account(&pk); - need_reconfig = true; - } - - // Did any accounts receive updates (ie NIP-65 relay lists) - need_reconfig = self.poll_for_updates(ndb) || need_reconfig; - - // If needed, update the relay configuration - if need_reconfig { - self.update_relay_configuration(pool, wakeup); - self.needs_relay_config = false; } // Do we need to activate account subs? - if let Some(data) = self.get_selected_account_data() { - if data.relay.sub.is_none() { + let default_relays = self.get_all_selected_account_relays(); + if let Some(data) = self.get_selected_account_data_mut() { + if data.relay.sub_remote_id.is_none() { // the currently selected account doesn't have relay subs, activate them - data.relay.activate(ndb, pool); + data.relay.activate(subman, &default_relays); } - if data.muted.sub.is_none() { + if data.muted.sub_remote_id.is_none() { // the currently selected account doesn't have muted subs, activate them - data.muted.activate(ndb, pool); + data.muted.activate(subman, &default_relays); } } } @@ -778,7 +757,7 @@ impl Accounts { match self.account_data.get_mut(&key_bytes) { None => error!("no account data found for the provided key."), Some(account_data) => { - let advertised = &mut account_data.relay.advertised; + let advertised = &mut account_data.relay.advertised.lock().unwrap(); if advertised.is_empty() { // If the selected account has no advertised relays, // initialize with the bootstrapping set. diff --git a/crates/notedeck/src/app.rs b/crates/notedeck/src/app.rs index b107218f2..323c813ef 100644 --- a/crates/notedeck/src/app.rs +++ b/crates/notedeck/src/app.rs @@ -1,7 +1,7 @@ use crate::persist::{AppSizeHandler, ZoomHandler}; use crate::{ AccountStorage, Accounts, AppContext, Args, DataPath, DataPathType, Directory, Images, - NoteCache, RelayDebugView, ThemeHandler, UnknownIds, + NoteCache, RelayDebugView, SubMan, ThemeHandler, UnknownIds, }; use egui::ThemePreference; use egui_winit::clipboard::Clipboard; @@ -22,7 +22,6 @@ pub struct Notedeck { ndb: Ndb, img_cache: Images, unknown_ids: UnknownIds, - pool: RelayPool, note_cache: NoteCache, accounts: Accounts, path: DataPath, @@ -33,6 +32,7 @@ pub struct Notedeck { app_size: AppSizeHandler, unrecognized_args: BTreeSet, clipboard: Clipboard, + subman: SubMan, } /// Our chrome, which is basically nothing @@ -80,7 +80,7 @@ impl eframe::App for Notedeck { puffin::GlobalProfiler::lock().new_frame(); // handle account updates - self.accounts.update(&mut self.ndb, &mut self.pool, ctx); + self.accounts.update(&mut self.subman, ctx); render_notedeck(self, ctx); @@ -88,11 +88,11 @@ impl eframe::App for Notedeck { self.app_size.try_save_app_size(ctx); if self.args.relay_debug { - if self.pool.debug.is_none() { - self.pool.use_debug(); + if self.subman.pool().debug.is_none() { + self.subman.pool().use_debug(); } - if let Some(debug) = &mut self.pool.debug { + if let Some(debug) = &mut self.subman.pool().debug { RelayDebugView::window(ctx, debug); } } @@ -202,11 +202,12 @@ impl Notedeck { error!("error migrating image cache: {e}"); } + let subman = SubMan::new(ndb.clone(), pool); + Self { ndb, img_cache, unknown_ids, - pool, note_cache, accounts, path: path.clone(), @@ -217,6 +218,7 @@ impl Notedeck { app_size, unrecognized_args, clipboard: Clipboard::new(None), + subman, } } @@ -230,13 +232,13 @@ impl Notedeck { ndb: &mut self.ndb, img_cache: &mut self.img_cache, unknown_ids: &mut self.unknown_ids, - pool: &mut self.pool, note_cache: &mut self.note_cache, accounts: &mut self.accounts, path: &self.path, args: &self.args, theme: &mut self.theme, clipboard: &mut self.clipboard, + subman: &mut self.subman, } } diff --git a/crates/notedeck/src/context.rs b/crates/notedeck/src/context.rs index 67648738e..58d111ea0 100644 --- a/crates/notedeck/src/context.rs +++ b/crates/notedeck/src/context.rs @@ -1,7 +1,6 @@ -use crate::{Accounts, Args, DataPath, Images, NoteCache, ThemeHandler, UnknownIds}; +use crate::{Accounts, Args, DataPath, Images, NoteCache, SubMan, ThemeHandler, UnknownIds}; use egui_winit::clipboard::Clipboard; -use enostr::RelayPool; use nostrdb::Ndb; // TODO: make this interface more sandboxed @@ -10,11 +9,11 @@ pub struct AppContext<'a> { pub ndb: &'a mut Ndb, pub img_cache: &'a mut Images, pub unknown_ids: &'a mut UnknownIds, - pub pool: &'a mut RelayPool, pub note_cache: &'a mut NoteCache, pub accounts: &'a mut Accounts, pub path: &'a DataPath, pub args: &'a Args, pub theme: &'a mut ThemeHandler, pub clipboard: &'a mut Clipboard, + pub subman: &'a mut SubMan, } diff --git a/crates/notedeck/src/lib.rs b/crates/notedeck/src/lib.rs index 162395b5b..8ed8e3aea 100644 --- a/crates/notedeck/src/lib.rs +++ b/crates/notedeck/src/lib.rs @@ -17,6 +17,7 @@ pub mod relayspec; mod result; pub mod storage; mod style; +pub mod subman; pub mod theme; mod time; mod timecache; @@ -26,6 +27,10 @@ mod unknowns; mod urls; mod user_account; +/// Various utilities +#[macro_use] +pub mod util; + pub use accounts::{AccountData, Accounts, AccountsAction, AddAccountAction, SwitchAccountAction}; pub use app::{App, Notedeck}; pub use args::Args; @@ -46,6 +51,7 @@ pub use relayspec::RelaySpec; pub use result::Result; pub use storage::{AccountStorage, DataPath, DataPathType, Directory}; pub use style::NotedeckTextStyle; +pub use subman::{SubError, SubMan}; pub use theme::ColorTheme; pub use time::time_ago_since; pub use timecache::TimeCached; diff --git a/crates/notedeck/src/muted.rs b/crates/notedeck/src/muted.rs index 3d0380332..dd4638531 100644 --- a/crates/notedeck/src/muted.rs +++ b/crates/notedeck/src/muted.rs @@ -6,7 +6,7 @@ use std::collections::BTreeSet; // If the note is muted return a reason string, otherwise None pub type MuteFun = dyn Fn(&Note, &[u8; 32]) -> bool; -#[derive(Default)] +#[derive(Clone, Default)] pub struct Muted { // TODO - implement private mutes pub pubkeys: BTreeSet<[u8; 32]>, diff --git a/crates/notedeck/src/subman.rs b/crates/notedeck/src/subman.rs new file mode 100644 index 000000000..90eeae363 --- /dev/null +++ b/crates/notedeck/src/subman.rs @@ -0,0 +1,1023 @@ +use futures::{channel::mpsc, FutureExt, StreamExt}; +use std::collections::{BTreeMap, BTreeSet}; +use std::fmt; +use std::time::{Duration, Instant}; +use std::{cell::RefCell, cmp::Ordering, rc::Rc}; +use thiserror::Error; +use tracing::{debug, error, info, trace, warn}; +use uuid::Uuid; + +use enostr::{ClientMessage, Filter, PoolRelay, RelayEvent, RelayMessage, RelayPool}; +use nostrdb::{self, Ndb, Subscription, SubscriptionStream}; + +use crate::RelaySpec; + +/// The Subscription Manager +/// +/// ```no_run +/// use std::error::Error; +/// +/// use nostrdb::{Config, Ndb}; +/// use enostr::{Filter, RelayPool}; +/// use notedeck::subman::{SubConstraint, SubMan, SubSpecBuilder, SubError}; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let mut ndb = Ndb::new("the/db/path/", &Config::new())?; +/// let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); +/// let default_relays = vec![]; +/// +/// // Define a filter and build the subscription specification +/// let filter = Filter::new().kinds(vec![1, 2, 3]).build(); +/// let spec = SubSpecBuilder::new() +/// .filters(vec![filter]) +/// .constraint(SubConstraint::OnlyLocal) +/// .build(); +/// +/// // Subscribe and obtain a SubReceiver +/// let mut receiver = subman.subscribe(spec, &default_relays)?; +/// +/// // Process incoming note keys +/// loop { +/// match receiver.next().await { +/// Ok(note_keys) => { +/// // Process the note keys +/// println!("Received note keys: {:?}", note_keys); +/// }, +/// Err(SubError::StreamEnded) => { +/// // Not really an error; we should clean up +/// break; +/// }, +/// Err(err) => { +/// // Handle other errors +/// eprintln!("Error: {:?}", err); +/// break; +/// }, +/// } +/// } +/// +/// // Unsubscribe when the subscription is no longer needed +/// subman.unsubscribe_local_id(&receiver.local_id().unwrap())?; +/// +/// Ok(()) +/// } +/// ``` +/// +/// Supported Operational Modes: +/// +/// | mode | Constraints | lcl | rmt | end mechanism | +/// |-----------------+--------------------+-----+-----+---------------------| +/// | normal | | sub | sub | client-closes | +/// | local | OnlyLocal | sub | | client-closes | +/// | normal one-shot | OneShot | sub | sub | EOSE -> StreamEnded | +/// | local one-shot | OneShot+OnlyLocal | qry | | query, StreamEnded | +/// | "prefetch" | OneShot+OnlyRemote | | sub | EOSE -> StreamEnded | + +#[derive(Debug, Error)] +pub enum SubError { + #[error("Stream ended")] + StreamEnded, + + #[error("Internal error: {0}")] + InternalError(String), + + #[error("nostrdb error: {0}")] + NdbError(#[from] nostrdb::Error), +} + +pub type SubResult = Result; + +#[derive(Debug, Clone, Copy)] +pub struct LocalId(nostrdb::Subscription); + +impl fmt::Display for LocalId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0.id()) + } +} + +impl From for LocalId { + fn from(subscription: Subscription) -> Self { + LocalId(subscription) + } +} + +impl Ord for LocalId { + fn cmp(&self, other: &Self) -> Ordering { + self.0.id().cmp(&other.0.id()) + } +} + +impl PartialOrd for LocalId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for LocalId { + fn eq(&self, other: &Self) -> bool { + self.0.id() == other.0.id() + } +} + +impl Eq for LocalId {} + +// nostr remote sub id +pub type RemoteId = String; + +#[derive(Debug, Clone)] +pub enum SubConstraint { + OneShot, // terminate subscription after initial query and EOSE + OnlyLocal, // only query the local db, no remote subs + OnlyRemote, // prefetch from remote, nothing returned + OutboxRelays(Vec), // ensure one of these is in the active relay set + AllowedRelays(Vec), // if not empty, only use these relays + BlockedRelays(Vec), // if not empty, don't use these relays +} + +#[derive(Debug, Default, Clone)] +pub struct SubSpecBuilder { + remote_id: Option, + filters: Vec, + constraints: Vec, +} + +impl SubSpecBuilder { + pub fn new() -> Self { + SubSpecBuilder::default() + } + pub fn remote_id(mut self, id: String) -> Self { + self.remote_id = Some(id); + self + } + pub fn filters(mut self, filters: Vec) -> Self { + self.filters.extend(filters); + self + } + pub fn constraint(mut self, constraint: SubConstraint) -> Self { + self.constraints.push(constraint); + self + } + pub fn build(self) -> SubSpec { + let mut outbox_relays = Vec::new(); + let mut allowed_relays = Vec::new(); + let mut blocked_relays = Vec::new(); + let mut is_oneshot = false; + let mut is_onlylocal = false; + let mut is_onlyremote = false; + + for constraint in self.constraints { + match constraint { + SubConstraint::OneShot => is_oneshot = true, + SubConstraint::OnlyLocal => is_onlylocal = true, + SubConstraint::OnlyRemote => is_onlyremote = true, + SubConstraint::OutboxRelays(relays) => outbox_relays.extend(relays), + SubConstraint::AllowedRelays(relays) => allowed_relays.extend(relays), + SubConstraint::BlockedRelays(relays) => blocked_relays.extend(relays), + } + } + + let remote_id = self.remote_id.unwrap_or_else(|| Uuid::new_v4().to_string()); + + SubSpec { + remote_id, + filters: self.filters, + outbox_relays, + allowed_relays, + blocked_relays, + is_oneshot, + is_onlylocal, + is_onlyremote, + } + } +} + +#[derive(Clone)] +pub struct SubSpec { + pub remote_id: String, // unused if is_onlylocal + pub filters: Vec, + pub outbox_relays: Vec, + pub allowed_relays: Vec, + pub blocked_relays: Vec, + pub is_oneshot: bool, + pub is_onlylocal: bool, + pub is_onlyremote: bool, +} + +impl fmt::Debug for SubSpec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Convert each Filter to its JSON representation. + let filters_json: Vec<_> = self + .filters + .iter() + .map(|filter| filter.json().unwrap()) + .collect(); + f.debug_struct("SubSpec") + .field("remote_id", &self.remote_id) + .field("filters", &filters_json) + .field("outbox_relays", &self.outbox_relays) + .field("allowed_relays", &self.allowed_relays) + .field("blocked_relays", &self.blocked_relays) + .field("is_oneshot", &self.is_oneshot) + .field("is_onlylocal", &self.is_onlylocal) + .field("is_onlyremote", &self.is_onlyremote) + .finish() + } +} + +// State for a local subscription +#[derive(Debug)] +struct LocalSubState { + local_id: LocalId, +} + +// State of a remote subscription on a specific relay +#[allow(unused)] +#[derive(Default, Debug, Clone, Eq, PartialEq)] +enum RelaySubState { + #[default] + Pending, // before relay open or subscribed + Syncing, // before EOSE + Current, // after EOSE + Error(String), // went wrong + Closed, // closed +} + +// State for a remote subscription +#[derive(Debug)] +struct RemoteSubState { + remote_id: RemoteId, + relays: BTreeMap, + tx_ended: mpsc::Sender<()>, // send StreamEnded to receiver +} + +impl RemoteSubState { + pub fn update_rss(&mut self, relay: &str, newstate: RelaySubState) { + let rss = self.relays.get_mut(relay).expect("couldn't find relay"); + debug!( + "RemoteSubState update_rss {} {}: {:?} -> {:?}", + self.remote_id, relay, *rss, newstate + ); + *rss = newstate; + } + + // if this is a one-shot and there are no relays left syncing we are done + pub fn consider_finished(&mut self, is_oneshot: bool) -> bool { + let still_syncing: Vec = self + .relays + .iter() + .filter(|(_k, v)| **v == RelaySubState::Syncing) + .map(|(k, _v)| k.clone()) + .collect(); + + if still_syncing.is_empty() { + if is_oneshot { + debug!( + "handle_eose {}: all relays done syncing, sending one-shot ending", + self.remote_id + ); + self.tx_ended.try_send(()).ok(); + true + } else { + debug!("handle_eose {}: all relays done syncing", self.remote_id); + false + } + } else { + debug!( + "handle_eose {}: still_syncing {:?}", + self.remote_id, still_syncing + ); + false + } + } +} + +// State of a subscription +#[allow(unused)] +#[derive(Debug)] +pub struct SubState { + spec: SubSpec, + local: Option, + remote: Option, +} +pub type SubStateRef = Rc>; + +impl Drop for SubState { + fn drop(&mut self) { + debug!("dropping SubState for {}", self.spec.remote_id); + } +} + +pub struct SubMan { + ndb: Ndb, + pool: RelayPool, + local: BTreeMap, + remote: BTreeMap, + idle: BTreeMap, +} + +impl SubMan { + pub fn new(ndb: Ndb, pool: RelayPool) -> Self { + SubMan { + ndb, + pool, + local: BTreeMap::new(), + remote: BTreeMap::new(), + idle: BTreeMap::new(), + } + } + + pub fn ndb(&self) -> Ndb { + self.ndb.clone() + } + + // deprecated, use SubMan directly instead + pub fn pool(&mut self) -> &mut RelayPool { + &mut self.pool + } + + pub fn subscribe( + &mut self, + spec: SubSpec, + default_relays: &[RelaySpec], + ) -> SubResult { + let (substate, subrcvr) = self.make_subscription(&spec, default_relays)?; + let state = Rc::new(RefCell::new(substate)); + if let Some(local_id) = subrcvr.local_id() { + self.local.insert(local_id, Rc::clone(&state)); + } + if let Some(remote_id) = subrcvr.remote_id() { + self.remote.insert(remote_id, Rc::clone(&state)); + } + Ok(subrcvr) + } + + pub fn unsubscribe_local_id(&mut self, local_id: &LocalId) -> SubResult<()> { + // find the substate and delegate to unsubscribe_substate + let ssref = match self.local.get(local_id) { + None => { + return Err(SubError::InternalError(format!( + "substate for {} not found", + local_id + ))) + } + Some(ssref) => ssref.clone(), // clone to drop the borrow on the map + }; + self.unsubscribe_substate(&ssref) + } + + pub fn unsubscribe_remote_id(&mut self, remote_id: &RemoteId) -> SubResult<()> { + // find the substate and delegate to unsubscribe_substate + let ssref = match self.remote.get(remote_id) { + None => { + return Err(SubError::InternalError(format!( + "substate for {} not found", + remote_id + ))) + } + Some(ssref) => ssref.clone(), // clone to drop the borrow on the map + }; + self.unsubscribe_substate(&ssref) + } + + fn unsubscribe_substate(&mut self, ssref: &SubStateRef) -> SubResult<()> { + let mut substate = ssref.borrow_mut(); + if let Some(&mut ref mut remotesubstate) = substate.remote.as_mut() { + let remote_id = remotesubstate.remote_id.clone(); + // unsubscribe from all remote relays + for (url, rss) in remotesubstate.relays.iter() { + match rss { + RelaySubState::Syncing | RelaySubState::Current => { + SubMan::close_relay_sub(&mut self.pool, &remote_id, url); + // not worth marking as closed because we drop below + } + _ => {} + } + } + // send StreamEnded to the receiver + remotesubstate.tx_ended.try_send(()).ok(); + // remove from the SubMan index + self.remote + .remove(&remote_id) + .expect("removed from remote index"); + } + if let Some(localsubstate) = &substate.local { + let local_id = &localsubstate.local_id; + // remove from the SubMan index + self.local + .remove(local_id) + .expect("removed from local index"); + } + Ok(()) + } + + pub fn remove_substate_remote_id(&mut self, remote_id: &RemoteId) -> SubResult<()> { + // remove from the local sub index if needed + if let Some(ssref) = self.remote.get(remote_id) { + let substate = ssref.borrow(); + if let Some(localsubstate) = &substate.local { + self.local.remove(&localsubstate.local_id); + } + } + // remove from the remote sub index + match self.remote.remove(remote_id) { + Some(_) => Ok(()), + None => Err(SubError::InternalError(format!( + "substate for {} not found", + remote_id + ))), + } + } + + fn make_subscription( + &mut self, + spec: &SubSpec, + default_relays: &[RelaySpec], + ) -> SubResult<(SubState, SubReceiver)> { + // Setup local ndb subscription state + let (maybe_localstate, localsub) = if spec.is_onlyremote { + (None, None) + } else { + let subscription = self.ndb.subscribe(&spec.filters)?; + let localstrm = subscription.stream(&self.ndb).notes_per_await(1); + let local_id = subscription.into(); + ( + Some(LocalSubState { local_id }), + Some(LocalSub { + ndb: self.ndb.clone(), + local_id, + localstrm, + }), + ) + }; + + // Setup remote nostr relay subscription state + let (maybe_remotestate, remotesub) = if spec.is_onlylocal { + (None, None) + } else { + let (tx_ended, rx_ended) = mpsc::channel(1); + + // Determine which relays to use + let relays = if !spec.allowed_relays.is_empty() { + spec.allowed_relays.clone() + } else { + default_relays + .iter() + .filter(|rs| rs.is_readable()) + .map(|rs| rs.url.clone()) + .collect() + }; + + // create the state map, special case multicast and blocked + let states: BTreeMap = relays + .iter() + .map(|relay| { + let rss = if spec.blocked_relays.contains(relay) { + RelaySubState::Error("blocked".into()) + } else if self.pool.subscribe_relay( + spec.remote_id.clone(), + spec.filters.clone(), + relay.clone(), + ) { + RelaySubState::Syncing + } else { + RelaySubState::Pending + }; + + (relay.clone(), rss) + }) + .collect(); + + let remote_id = spec.remote_id.clone(); + + ( + Some(RemoteSubState { + remote_id: remote_id.clone(), + relays: states, + tx_ended, + }), + Some(RemoteSub { + remote_id, + rx_ended, + }), + ) + }; + + Ok(( + SubState { + spec: spec.clone(), + local: maybe_localstate, + remote: maybe_remotestate, + }, + SubReceiver { + localsub, + remotesub, + }, + )) + } + + fn close_relay_sub(pool: &mut RelayPool, sid: &str, url: &str) { + if let Some(relay) = pool.relays.iter_mut().find(|r| r.url() == url) { + let cmd = ClientMessage::close(sid.to_string()); + debug!("SubMan close_relay_sub close {} {}", sid, url); + if let Err(err) = relay.send(&cmd) { + error!("trouble closing relay sub: {} {}: {:?}", sid, url, err); + } + } + } + + pub fn process_relays( + &mut self, + legacy_relay_handler: &mut H, + default_relays: &[RelaySpec], + ) -> SubResult<()> { + let wakeup = move || { + // ignore + }; + self.pool.keepalive_ping(wakeup); + + // NOTE: we don't use the while let loop due to borrow issues + #[allow(clippy::while_let_loop)] + loop { + let ev = if let Some(ev) = self.pool.try_recv() { + ev.into_owned() + } else { + break; + }; + + let relay = RelayPool::canonicalize_url(ev.relay.clone()); + + match (&ev.event).into() { + RelayEvent::Opened => { + debug!("handle_opened {}", relay); + + // handle legacy subscriptions + legacy_relay_handler.handle_opened(&mut self.ndb, &mut self.pool, &relay); + + // send our remote subscriptions for this relay + for ssr in self.remote.values_mut() { + let mut substate = ssr.borrow_mut(); + let remote_id = substate.spec.remote_id.clone(); + let filters = substate.spec.filters.clone(); + if let Some(remotesubstate) = &mut substate.remote { + if let Some(rss) = &remotesubstate.relays.get(&relay) { + match rss { + RelaySubState::Pending => { + debug!( + "SubMan handle_opened: sending sub {} {}: {:?}", + remote_id, + relay, + filters + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), + ); + self.pool.send_to( + &ClientMessage::req(remote_id, filters), + &relay, + ); + remotesubstate.update_rss(&relay, RelaySubState::Syncing); + } + _ => { + debug!( + "SubMan handle_opened: {} {} ignored in state {:?}", + remote_id, relay, rss + ); + } + } + } + } + } + } + // TODO: handle reconnects + RelayEvent::Closed => warn!("process_relays {}: connection closed", &relay), + RelayEvent::Error(e) => { + error!("process_relays {} error: {}", &relay, e); + for ssr in self.remote.values_mut() { + let mut substate = ssr.borrow_mut(); + let remote_id = substate.spec.remote_id.clone(); + let filters = substate.spec.filters.clone(); + if let Some(remotesubstate) = &mut substate.remote { + if let Some(ref mut rss) = remotesubstate.relays.get_mut(&relay) { + debug!( + "SubMan handle Error {} {} {:?}: {:?}", + remote_id, + relay, + filters + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), + e + ); + **rss = RelaySubState::Error(format!("{:?}", e)); + } + } + } + } + + RelayEvent::Other(msg) => trace!("process_relays other event {:?}", &msg), + RelayEvent::Message(msg) => { + self.process_message(legacy_relay_handler, &relay, &msg); + } + } + } + + self.close_unneeded_relays(default_relays); + + Ok(()) + } + + pub fn process_message( + &mut self, + legacy_relay_handler: &mut H, + relay: &str, + msg: &RelayMessage, + ) { + match msg { + RelayMessage::Event(_subid, ev) => { + let relay = if let Some(relay) = self.pool.relays.iter().find(|r| r.url() == relay) + { + relay + } else { + error!("couldn't find relay {} for note processing!?", relay); + return; + }; + + match relay { + PoolRelay::Websocket(_) => { + //info!("processing event {}", event); + if let Err(err) = self.ndb.process_event_with( + ev, + nostrdb::IngestMetadata::new() + .client(false) + .relay(relay.url()), + ) { + error!("error processing event {ev}: {err}"); + } + } + PoolRelay::Multicast(_) => { + // multicast events are client events + if let Err(err) = self.ndb.process_event_with( + ev, + nostrdb::IngestMetadata::new() + .client(true) + .relay(relay.url()), + ) { + error!("error processing multicast event {ev}: {err}"); + } + } + } + } + RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg), + RelayMessage::OK(cr) => info!("OK {:?}", cr), + RelayMessage::Eose(sid) => { + debug!("SubMan process_message Eose {} {}", sid, relay); + let mut substate_finished = false; + // do we have this sub in the subman remote subscriptions? + if let Some(ss) = self.remote.get_mut(*sid) { + let is_oneshot = ss.borrow().spec.is_oneshot; + let mut substate = ss.borrow_mut(); + if let Some(remotesubstate) = &mut substate.remote { + remotesubstate.update_rss(relay, RelaySubState::Current); + + if is_oneshot { + SubMan::close_relay_sub(&mut self.pool, sid, relay); + remotesubstate.update_rss(relay, RelaySubState::Closed); + } + + // any relays left syncing? + substate_finished = remotesubstate.consider_finished(is_oneshot); + } + } else { + // we didn't find it in the subman, delegate to the legacy code + legacy_relay_handler.handle_eose(&mut self.ndb, &mut self.pool, sid, relay); + } + if substate_finished { + if let Err(err) = self.remove_substate_remote_id(&sid.to_string()) { + error!("trouble removing substate for {}: {:?}", sid, err); + } + } + } + } + } + + const IDLE_EXPIRATION_SECS: Duration = Duration::from_secs(20); + + fn close_unneeded_relays(&mut self, default_relays: &[RelaySpec]) { + let current_relays: BTreeSet = self.pool.urls(); + let needed_relays: BTreeSet = self.needed_relays(default_relays); + let unneeded_relays: BTreeSet<_> = + current_relays.difference(&needed_relays).cloned().collect(); + + // remove all needed relays from the idle collection + for r in needed_relays { + self.idle.remove(&r); + } + + // manage idle relays + let mut expired = BTreeSet::new(); + let now = Instant::now(); + for r in unneeded_relays { + // could be a new entry, an entry that has only been idle + // a short time, or an expired entry + let entry = self.idle.entry(r.clone()).or_insert(now); + if now.duration_since(*entry) > Self::IDLE_EXPIRATION_SECS { + expired.insert(r.clone()); + } + } + + // close the expired relays + if !expired.is_empty() { + debug!("closing expired relays: {:?}", expired); + self.pool.remove_urls(&expired); + } + + // remove the expired relays from the idle collection + for r in expired { + self.idle.remove(&r); + } + } + + fn needed_relays(&self, default_relays: &[RelaySpec]) -> BTreeSet { + let mut needed: BTreeSet = default_relays.iter().map(|rs| rs.url.clone()).collect(); + // for every remote subscription + for ssr in self.remote.values() { + // that has remote substate (all will) + if let Some(ref remotesubstate) = ssr.borrow().remote { + // for each subscription remote relay + for (relay, state) in &remotesubstate.relays { + // include any that are in-play + match state { + RelaySubState::Error(_) | RelaySubState::Closed => { + // these are terminal and we don't need this relay + } + _ => { + // relays in all other states are needed + _ = needed.insert(relay.clone()); + } + } + } + } + } + needed + } +} + +pub trait LegacyRelayHandler { + fn handle_opened(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, relay: &str); + fn handle_eose(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, id: &str, relay: &str); +} + +struct LocalSub { + ndb: Ndb, + local_id: LocalId, // ndb id + localstrm: SubscriptionStream, +} + +#[allow(unused)] +struct RemoteSub { + remote_id: RemoteId, // remote nostr sub id + rx_ended: mpsc::Receiver<()>, // end-of-stream +} + +pub struct SubReceiver { + localsub: Option, + remotesub: Option, +} + +impl Drop for SubReceiver { + fn drop(&mut self) { + debug!("dropping Receiver for {}", self.idstr()); + } +} + +impl SubReceiver { + pub fn idstr(&self) -> String { + let mut idstr = "".to_string(); + if let Some(lsub) = &self.localsub { + idstr.push_str(&format!("local:{}", lsub.local_id)); + } + if let Some(rsub) = &self.remotesub { + if !idstr.is_empty() { + idstr.push_str(", "); + } + idstr.push_str(&format!("remote:{}", rsub.remote_id)); + } + if idstr.is_empty() { + "query".to_string() + } else { + idstr + } + } + + pub fn local_id(&self) -> Option { + self.localsub.as_ref().map(|lsub| lsub.local_id) + } + + pub fn remote_id(&self) -> Option { + self.remotesub.as_ref().map(|rsub| rsub.remote_id.clone()) + } + + pub async fn next(&mut self) -> SubResult> { + if let (Some(lsub), Some(rsub)) = (&mut self.localsub, &mut self.remotesub) { + // local and remote subs + futures::select! { + notes = lsub.localstrm.next().fuse() => { + match notes { + Some(notes) => Ok(notes), + None => Err(SubError::StreamEnded), + } + }, + _ = rsub.rx_ended.next().fuse() => { + Err(SubError::StreamEnded) + } + } + } else if let Some(lsub) = &mut self.localsub { + // only local sub + lsub.localstrm.next().await.ok_or(SubError::StreamEnded) + } else if let Some(rsub) = &mut self.remotesub { + // only remote sub (prefetch only, values not returned) + match rsub.rx_ended.next().await { + // in both cases the stream has ended + Some(_) => Err(SubError::StreamEnded), // an EOSE was observed + None => Err(SubError::StreamEnded), // the subscription was closed + } + } else { + // query case + Err(SubError::InternalError("unimplmented".to_string())) + } + } + + pub fn poll(&mut self, max_notes: u32) -> Vec { + assert!(self.localsub.is_some()); // FIXME - local only + let localsub = self.localsub.as_mut().unwrap(); + localsub.ndb.poll_for_notes(localsub.local_id.0, max_notes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::testdbs_path_async; + use crate::util::test_util::{raw_msg, test_keypair, ManagedNdb}; + use nostrdb::{NoteKey, Transaction}; + + const RELAY_URL: &str = "test_url"; + + // test basic subscription functionality + #[tokio::test] + async fn test_subman_sub() -> Result<(), Box> { + // setup an ndb and subman to test + let (_mndb, ndb) = ManagedNdb::setup(&testdbs_path_async!()); + let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); + let default_relays = vec![]; + + // subscribe to some stuff + let mut receiver = subman.subscribe( + SubSpecBuilder::new() + .filters(vec![Filter::new().kinds(vec![1]).build()]) + .constraint(SubConstraint::OnlyLocal) + .build(), + &default_relays, + )?; + let local_id = receiver.local_id().unwrap(); + + // nothing should be available yet + assert_eq!(receiver.poll(1), vec![]); + + // process a test event that matches the subscription + let keys1 = test_keypair(1); + let kind = 1; + let content = "abc"; + ndb.process_event_with( + &raw_msg("subid", &keys1, kind, content), + nostrdb::IngestMetadata::new() + .client(false) + .relay(RELAY_URL), + )?; + + // receiver should now see the msg + let nks = receiver.next().await?; + assert_eq!(nks.len(), 1); + let txn = Transaction::new(&ndb)?; + let note = ndb.get_note_by_key(&txn, nks[0])?; + assert_eq!(note.pubkey(), keys1.pubkey.bytes()); + assert_eq!(note.kind(), kind); + assert_eq!(note.content(), content); + + // now nothing should be available again + assert_eq!(receiver.poll(1), vec![]); + + subman.unsubscribe_local_id(&local_id)?; + Ok(()) + } + + // ensure that the subscription works when it is waiting before the event + #[tokio::test] + async fn test_subman_sub_with_waiting_thread() -> Result<(), Box> { + // setup an ndb and subman to test + let (_mndb, ndb) = ManagedNdb::setup(&testdbs_path_async!()); + let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); + let default_relays = vec![]; + + // subscribe to some stuff + let mut receiver = subman.subscribe( + SubSpecBuilder::new() + .filters(vec![Filter::new().kinds(vec![1]).build()]) + .constraint(SubConstraint::OnlyLocal) + .build(), + &default_relays, + )?; + let local_id = receiver.local_id().unwrap(); + + // spawn a task to wait for the next message + let handle = tokio::spawn(async move { + let nks = receiver.next().await.unwrap(); + assert_eq!(nks.len(), 1); // Ensure one message is received + (receiver, nks) // return the receiver as well + }); + + // process a test event that matches the subscription + let keys1 = test_keypair(1); + let kind = 1; + let content = "abc"; + ndb.process_event_with( + &raw_msg("subid", &keys1, kind, content), + nostrdb::IngestMetadata::new() + .client(false) + .relay(RELAY_URL), + )?; + + // await the spawned task to ensure it completes + let (mut receiver, nks) = handle.await?; + + // validate the received message + let txn = Transaction::new(&ndb)?; + let note = ndb.get_note_by_key(&txn, nks[0])?; + assert_eq!(note.pubkey(), keys1.pubkey.bytes()); + assert_eq!(note.kind(), kind); + assert_eq!(note.content(), content); + + // ensure no additional messages are available + assert_eq!(receiver.poll(1), vec![]); + + subman.unsubscribe_local_id(&local_id)?; + Ok(()) + } + + // test subscription poll and next interaction + #[tokio::test] + async fn test_subman_poll_and_next() -> Result<(), Box> { + // setup an ndb and subman to test + let (_mndb, ndb) = ManagedNdb::setup(&testdbs_path_async!()); + let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); + let default_relays = vec![]; + + // subscribe to some stuff + let mut receiver = subman.subscribe( + SubSpecBuilder::new() + .filters(vec![Filter::new().kinds(vec![1]).build()]) + .constraint(SubConstraint::OnlyLocal) + .build(), + &default_relays, + )?; + let local_id = receiver.local_id().unwrap(); + + // nothing should be available yet + assert_eq!(receiver.poll(1), vec![]); + + // process a test event that matches the subscription + let keys1 = test_keypair(1); + let kind = 1; + let content = "abc"; + ndb.process_event_with( + &raw_msg("subid", &keys1, kind, content), + nostrdb::IngestMetadata::new() + .client(false) + .relay(RELAY_URL), + )?; + std::thread::sleep(std::time::Duration::from_millis(150)); + + // now poll should consume the note + assert_eq!(receiver.poll(1), vec![NoteKey::new(1)]); + + // nothing more available + assert_eq!(receiver.poll(1), vec![]); + + // process a second event + let content = "def"; + ndb.process_event_with( + &raw_msg("subid", &keys1, kind, content), + nostrdb::IngestMetadata::new() + .client(false) + .relay(RELAY_URL), + )?; + + // now receiver should now see the second note + assert_eq!(receiver.next().await?, vec![NoteKey::new(2)]); + + subman.unsubscribe_local_id(&local_id)?; + Ok(()) + } +} diff --git a/crates/notedeck/src/unknowns.rs b/crates/notedeck/src/unknowns.rs index 7587726fe..9d0716def 100644 --- a/crates/notedeck/src/unknowns.rs +++ b/crates/notedeck/src/unknowns.rs @@ -1,13 +1,14 @@ use crate::{ note::NoteRef, notecache::{CachedNote, NoteCache}, + subman::{SubConstraint, SubSpec, SubSpecBuilder}, Result, }; use enostr::{Filter, NoteId, Pubkey}; use nostr::RelayUrl; use nostrdb::{BlockType, Mention, Ndb, Note, NoteKey, Transaction}; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::time::{Duration, Instant}; use tracing::error; @@ -117,13 +118,86 @@ impl UnknownIds { &mut self.ids } + pub fn numids(&self) -> usize { + self.ids.len() + } + pub fn clear(&mut self) { self.ids = HashMap::default(); } - pub fn filter(&self) -> Option> { - let ids: Vec<&UnknownId> = self.ids.keys().collect(); - get_unknown_ids_filter(&ids) + pub fn generate_resolution_requests(&self, is_forced_relays: bool) -> Vec { + // 1. resolve as many ids per request as possible + // 2. each request only has one filter (https://github.com/nostr-protocol/nips/pull/1645) + // 3. each request is limited to MAX_CHUNK_IDS + // 4. use relay hints when available + + // Collect the unknown ids by relay + let mut ids_by_relay: BTreeMap, Vec)> = BTreeMap::new(); + for (unknown_id, relay_hints) in self.ids.iter() { + // 1. use default relays (signified by an empty RelayUrl) if no hints are available + // 2. query the default relays even when hints are available + // 3. if overriding relays from the CLI (is_forced_relays) ignore relay_hints + let use_hints = if is_forced_relays { + &HashSet::new() + } else { + relay_hints + }; + for relay in std::iter::once("".to_string()) + .chain(use_hints.iter().map(|relay| relay.to_string())) + { + match unknown_id { + UnknownId::Pubkey(pk) => { + if let Ok(parsed_relay) = RelayUrl::parse(relay.clone().as_str()) { + ids_by_relay + .entry(parsed_relay) + .or_insert_with(|| (Vec::new(), Vec::new())) + .0 + .push(*pk); + } + } + UnknownId::Id(nid) => { + if let Ok(parsed_relay) = RelayUrl::parse(relay.as_str()) { + ids_by_relay + .entry(parsed_relay) + .or_insert_with(|| (Vec::new(), Vec::new())) + .1 + .push(*nid); + } + } + } + } + } + + const MAX_CHUNK_IDS: usize = 500; + + let mut subspecs = vec![]; + for (relay, (pubkeys, noteids)) in ids_by_relay { + // make a template SubSpecBuilder w/ the common parts + let mut ssb = SubSpecBuilder::new() + .constraint(SubConstraint::OneShot) + .constraint(SubConstraint::OnlyRemote); + if !relay.to_string().is_empty() { + ssb = ssb.constraint(SubConstraint::AllowedRelays(vec![relay.to_string()])); + } + for chunk in pubkeys.chunks(MAX_CHUNK_IDS) { + let pks: Vec<&[u8; 32]> = chunk.iter().map(|pk| pk.bytes()).collect(); + subspecs.push( + ssb.clone() + .filters(vec![Filter::new().authors(pks).kinds([0]).build()]) + .build(), + ); + } + for chunk in noteids.chunks(MAX_CHUNK_IDS) { + let nids: Vec<&[u8; 32]> = chunk.iter().map(|nid| nid.bytes()).collect(); + subspecs.push( + ssb.clone() + .filters(vec![Filter::new().ids(nids).build()]) + .build(), + ); + } + } + subspecs } /// We've updated some unknown ids, update the last_updated time to now @@ -349,31 +423,3 @@ pub fn get_unknown_note_ids<'a>( Ok(()) } - -fn get_unknown_ids_filter(ids: &[&UnknownId]) -> Option> { - if ids.is_empty() { - return None; - } - - let ids = &ids[0..500.min(ids.len())]; - let mut filters: Vec = vec![]; - - let pks: Vec<&[u8; 32]> = ids - .iter() - .flat_map(|id| id.is_pubkey().map(|pk| pk.bytes())) - .collect(); - if !pks.is_empty() { - let pk_filter = Filter::new().authors(pks).kinds([0]).build(); - filters.push(pk_filter); - } - - let note_ids: Vec<&[u8; 32]> = ids - .iter() - .flat_map(|id| id.is_id().map(|id| id.bytes())) - .collect(); - if !note_ids.is_empty() { - filters.push(Filter::new().ids(note_ids).build()); - } - - Some(filters) -} diff --git a/crates/notedeck/src/util/mod.rs b/crates/notedeck/src/util/mod.rs new file mode 100644 index 000000000..0964f7ae3 --- /dev/null +++ b/crates/notedeck/src/util/mod.rs @@ -0,0 +1,4 @@ +#[allow(missing_docs)] +#[cfg(test)] +#[macro_use] +pub mod test_util; diff --git a/crates/notedeck/src/util/test_util.rs b/crates/notedeck/src/util/test_util.rs new file mode 100644 index 000000000..aaaa6a78b --- /dev/null +++ b/crates/notedeck/src/util/test_util.rs @@ -0,0 +1,86 @@ +use enostr::{FullKeypair, Pubkey}; +use nostrdb::{Config, Ndb, NoteBuilder}; +use std::fs; +use std::path::Path; + +// FIXME - make nostrdb::test_util::cleanup_db accessible instead +#[allow(dead_code)] +fn cleanup_db(path: &str) { + let p = Path::new(path); + let _ = fs::remove_file(p.join("data.mdb")); + let _ = fs::remove_file(p.join("lock.mdb")); +} + +// managed ndb handle that cleans up test data when dropped +pub struct ManagedNdb { + pub path: String, + pub ndb: Ndb, +} +impl ManagedNdb { + pub fn setup(path: &str) -> (Self, Ndb) { + cleanup_db(path); // ensure a clean slate before starting + let ndb = Ndb::new(path, &Config::new()) + .unwrap_or_else(|err| panic!("Failed to create Ndb at {}: {}", path, err)); + ( + Self { + path: path.to_string(), + ndb: ndb.clone(), + }, + ndb, + ) + } +} +impl Drop for ManagedNdb { + fn drop(&mut self) { + cleanup_db(&self.path); // comment this out to leave the db for inspection + } +} + +// generate a testdbs_path for an async test automatically +#[macro_export] +macro_rules! testdbs_path_async { + () => {{ + fn f() {} + fn type_name_of(_: T) -> &'static str { + core::any::type_name::() + } + let name = type_name_of(f); + + // Find and cut the rest of the path + let test_name = match &name[..name.len() - 3].strip_suffix("::{{closure}}") { + Some(stripped) => match &stripped.rfind(':') { + Some(pos) => &stripped[pos + 1..stripped.len()], + None => &stripped, + }, + None => &name[..name.len() - 3], + }; + + format!("target/testdbs/{}", test_name) + }}; +} + +// generate a deterministic keypair for testing +pub fn test_keypair(input: u64) -> FullKeypair { + use sha2::{Digest, Sha256}; + + let mut hasher = Sha256::new(); + hasher.update(input.to_le_bytes()); + let hash = hasher.finalize(); + + let secret_key = nostr::SecretKey::from_slice(&hash).expect("valid secret key"); + let (xopk, _) = secret_key.x_only_public_key(&nostr::SECP256K1); + let pubkey = Pubkey::new(xopk.serialize()); + + FullKeypair::new(pubkey, secret_key) +} + +// generate a basic raw message from scratch +pub fn raw_msg(subid: &str, keys: &FullKeypair, kind: u32, content: &str) -> String { + let note = NoteBuilder::new() + .kind(kind) + .content(content) + .sign(&keys.secret_key.to_secret_bytes()) + .build() + .expect("note"); + format!(r#"["EVENT", "{}", {}]"#, subid, note.json().expect("json")) +} diff --git a/crates/notedeck_chrome/src/notedeck.rs b/crates/notedeck_chrome/src/notedeck.rs index 0d049340b..55764984b 100644 --- a/crates/notedeck_chrome/src/notedeck.rs +++ b/crates/notedeck_chrome/src/notedeck.rs @@ -151,9 +151,9 @@ mod tests { let args: Vec = [ "--testrunner", "--datapath", - &datapath.to_str().unwrap(), + datapath.to_str().unwrap(), "--dbpath", - &dbpath.to_str().unwrap(), + dbpath.to_str().unwrap(), ] .iter() .map(|s| s.to_string()) @@ -224,8 +224,8 @@ mod tests { .unwrap(); assert_eq!(app.timeline_cache.timelines.len(), 2); - assert!(app.timeline_cache.timelines.get(&tl1).is_some()); - assert!(app.timeline_cache.timelines.get(&tl2).is_some()); + assert!(app.timeline_cache.timelines.contains_key(tl1)); + assert!(app.timeline_cache.timelines.contains_key(tl2)); rmrf(tmpdir); } diff --git a/crates/notedeck_columns/src/accounts/route.rs b/crates/notedeck_columns/src/accounts/route.rs index befcfc873..18e6dbb05 100644 --- a/crates/notedeck_columns/src/accounts/route.rs +++ b/crates/notedeck_columns/src/accounts/route.rs @@ -65,7 +65,7 @@ mod tests { let data_str = "accounts:show"; let data = &data_str.split(":").collect::>(); let mut token_writer = TokenWriter::default(); - let mut parser = TokenParser::new(&data); + let mut parser = TokenParser::new(data); let parsed = AccountsRoute::parse_from_tokens(&mut parser).unwrap(); let expected = AccountsRoute::Accounts; parsed.serialize_tokens(&mut token_writer); diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs index 1ab956023..6fea08615 100644 --- a/crates/notedeck_columns/src/app.rs +++ b/crates/notedeck_columns/src/app.rs @@ -12,9 +12,12 @@ use crate::{ Result, }; -use notedeck::{Accounts, AppContext, DataPath, DataPathType, FilterState, UnknownIds}; +use notedeck::{ + subman::LegacyRelayHandler, Accounts, AppContext, DataPath, DataPathType, FilterState, + NoteCache, RelaySpec, SubError, SubMan, UnknownIds, +}; -use enostr::{ClientMessage, Keypair, PoolRelay, Pubkey, RelayEvent, RelayMessage, RelayPool}; +use enostr::{ClientMessage, Keypair, Pubkey, RelayPool}; use uuid::Uuid; use egui_extras::{Size, StripBuilder}; @@ -24,7 +27,7 @@ use nostrdb::{Ndb, Transaction}; use std::collections::{BTreeSet, HashMap}; use std::path::Path; use std::time::Duration; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; #[derive(Debug, Eq, PartialEq, Clone)] pub enum DamusState { @@ -79,59 +82,95 @@ fn handle_key_events(input: &egui::InputState, columns: &mut Columns) { } } -fn try_process_event( - damus: &mut Damus, - app_ctx: &mut AppContext<'_>, - ctx: &egui::Context, -) -> Result<()> { - let current_columns = get_active_columns_mut(app_ctx.accounts, &mut damus.decks_cache); - ctx.input(|i| handle_key_events(i, current_columns)); - - let ctx2 = ctx.clone(); - let wakeup = move || { - ctx2.request_repaint(); - }; +struct RelayHandler<'a> { + // From AppContext + unknown_ids: &'a mut UnknownIds, + note_cache: &'a mut NoteCache, - app_ctx.pool.keepalive_ping(wakeup); + // From Damus + subscriptions: &'a mut Subscriptions, + timeline_cache: &'a mut TimelineCache, - // NOTE: we don't use the while let loop due to borrow issues - #[allow(clippy::while_let_loop)] - loop { - let ev = if let Some(ev) = app_ctx.pool.try_recv() { - ev.into_owned() - } else { - break; - }; + since_optimize: bool, +} - match (&ev.event).into() { - RelayEvent::Opened => { - app_ctx - .accounts - .send_initial_filters(app_ctx.pool, &ev.relay); - - timeline::send_initial_timeline_filters( - app_ctx.ndb, - damus.since_optimize, - &mut damus.timeline_cache, - &mut damus.subscriptions, - app_ctx.pool, - &ev.relay, - ); - } - // TODO: handle reconnects - RelayEvent::Closed => warn!("{} connection closed", &ev.relay), - RelayEvent::Error(e) => error!("{}: {}", &ev.relay, e), - RelayEvent::Other(msg) => trace!("other event {:?}", &msg), - RelayEvent::Message(msg) => { - process_message(damus, app_ctx, &ev.relay, &msg); - } +impl<'a> RelayHandler<'a> { + fn new( + unknown_ids: &'a mut UnknownIds, + note_cache: &'a mut NoteCache, + subscriptions: &'a mut Subscriptions, + timeline_cache: &'a mut TimelineCache, + since_optimize: bool, + ) -> Self { + RelayHandler { + unknown_ids, + note_cache, + subscriptions, + timeline_cache, + since_optimize, } } +} + +impl LegacyRelayHandler for RelayHandler<'_> { + /// Handle relay opened + fn handle_opened(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, relay: &str) { + timeline::send_initial_timeline_filters( + ndb, + self.since_optimize, + self.timeline_cache, + self.subscriptions, + pool, + relay, + ); + } - for (_kind, timeline) in damus.timeline_cache.timelines.iter_mut() { - let is_ready = - timeline::is_timeline_ready(app_ctx.ndb, app_ctx.pool, app_ctx.note_cache, timeline); + /// Handle end-of-stored-events + fn handle_eose(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, sid: &str, relay: &str) { + do_handle_eose( + ndb, + pool, + &*self.subscriptions, + self.timeline_cache, + sid, + relay, + self.unknown_ids, + self.note_cache, + ) + .ok(); // we've already logged the error and intend to keep going + } +} + +fn try_process_event<'a>( + damus: &'a mut Damus, + app_ctx: &'a mut AppContext<'_>, + ctx: &egui::Context, +) { + let current_columns = get_active_columns_mut(app_ctx.accounts, &mut damus.decks_cache); + ctx.input(|i| handle_key_events(i, current_columns)); + + { + let default_relays = app_ctx.accounts.get_all_selected_account_relays(); + let mut relay_handler = RelayHandler::new( + app_ctx.unknown_ids, + app_ctx.note_cache, + &mut damus.subscriptions, + &mut damus.timeline_cache, + damus.since_optimize, + ); + app_ctx + .subman + .process_relays(&mut relay_handler, &default_relays) + .ok(); + } + for (_kind, timeline) in damus.timeline_cache.timelines.iter_mut() { + let is_ready = timeline::is_timeline_ready( + app_ctx.ndb, + app_ctx.subman.pool(), + app_ctx.note_cache, + timeline, + ); if is_ready { let txn = Transaction::new(app_ctx.ndb).expect("txn"); // only thread timelines are reversed @@ -152,22 +191,49 @@ fn try_process_event( } if app_ctx.unknown_ids.ready_to_send() { - unknown_id_send(app_ctx.unknown_ids, app_ctx.pool); + unknown_id_send( + app_ctx.unknown_ids, + app_ctx.subman, + &app_ctx.accounts.get_all_selected_account_relays(), + app_ctx.accounts.is_forced_relays(), + ); } - - Ok(()) } -fn unknown_id_send(unknown_ids: &mut UnknownIds, pool: &mut RelayPool) { - debug!("unknown_id_send called on: {:?}", &unknown_ids); - let filter = unknown_ids.filter().expect("filter"); - info!( - "Getting {} unknown ids from relays", - unknown_ids.ids_iter().len() - ); - let msg = ClientMessage::req("unknownids".to_string(), filter); +fn unknown_id_send( + unknown_ids: &mut UnknownIds, + subman: &mut SubMan, + default_relays: &[RelaySpec], + is_forced_relays: bool, // the default_relays are forced +) { + info!("Getting {} unknown ids from relays", &unknown_ids.numids()); + for subspec in unknown_ids.generate_resolution_requests(is_forced_relays) { + debug!("unknown_ids subscribe: {:?}", subspec); + match subman.subscribe(subspec, default_relays) { + Err(err) => error!("unknown_id_send subscribe failed: {:?}", err), + Ok(mut rcvr) => { + tokio::spawn(async move { + loop { + match rcvr.next().await { + Err(SubError::StreamEnded) => { + debug!("unknown_id_send: {} complete", rcvr.idstr()); + break; + } + Err(err) => { + error!("unknown_id_send: {}: error: {:?}", rcvr.idstr(), err); + break; + } + Ok(note_keys) => { + debug!("{}: received note keys: {:?}", rcvr.idstr(), note_keys); + // only need the prefetch into ndb, all done + } + } + } + }); + } + } + } unknown_ids.clear(); - pool.send(&msg); } fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Context) { @@ -192,25 +258,28 @@ fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Con DamusState::Initialized => (), }; - if let Err(err) = try_process_event(damus, app_ctx, ctx) { - error!("error processing event: {}", err); - } + try_process_event(damus, app_ctx, ctx); } -fn handle_eose( +#[allow(clippy::too_many_arguments)] +fn do_handle_eose( + ndb: &mut Ndb, + pool: &mut RelayPool, subscriptions: &Subscriptions, timeline_cache: &mut TimelineCache, - ctx: &mut AppContext<'_>, subid: &str, relay_url: &str, + unknown_ids: &mut UnknownIds, + note_cache: &mut NoteCache, ) -> Result<()> { + debug!("app do_handle_eose {} {}", subid, relay_url); let sub_kind = if let Some(sub_kind) = subscriptions.subs.get(subid) { sub_kind } else { let n_subids = subscriptions.subs.len(); warn!( - "got unknown eose subid {}, {} tracked subscriptions", - subid, n_subids + "got unknown eose subid {} relay {}, {} tracked subscriptions", + subid, relay_url, n_subids ); return Ok(()); }; @@ -220,24 +289,15 @@ fn handle_eose( // eose on timeline? whatevs } SubKind::Initial => { - //let txn = Transaction::new(ctx.ndb)?; - //unknowns::update_from_columns( - // &txn, - // ctx.unknown_ids, - // timeline_cache, - // ctx.ndb, - // ctx.note_cache, - //); - //// this is possible if this is the first time - //if ctx.unknown_ids.ready_to_send() { - // unknown_id_send(ctx.unknown_ids, ctx.pool); - //} + // let txn = Transaction::new(ndb)?; + // unknowns::update_from_columns(&txn, unknown_ids, timeline_cache, ndb, note_cache); } // oneshot subs just close when they're done SubKind::OneShot => { let msg = ClientMessage::close(subid.to_string()); - ctx.pool.send_to(&msg, relay_url); + debug!("app do_handle_eose close {} {}", subid, relay_url); + pool.send_to(&msg, relay_url); } SubKind::FetchingContactList(timeline_uid) => { @@ -285,57 +345,6 @@ fn handle_eose( Ok(()) } -fn process_message(damus: &mut Damus, ctx: &mut AppContext<'_>, relay: &str, msg: &RelayMessage) { - match msg { - RelayMessage::Event(_subid, ev) => { - let relay = if let Some(relay) = ctx.pool.relays.iter().find(|r| r.url() == relay) { - relay - } else { - error!("couldn't find relay {} for note processing!?", relay); - return; - }; - - match relay { - PoolRelay::Websocket(_) => { - //info!("processing event {}", event); - if let Err(err) = ctx.ndb.process_event_with( - ev, - nostrdb::IngestMetadata::new() - .client(false) - .relay(relay.url()), - ) { - error!("error processing event {ev}: {err}"); - } - } - PoolRelay::Multicast(_) => { - // multicast events are client events - if let Err(err) = ctx.ndb.process_event_with( - ev, - nostrdb::IngestMetadata::new() - .client(true) - .relay(relay.url()), - ) { - error!("error processing multicast event {ev}: {err}"); - } - } - } - } - RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg), - RelayMessage::OK(cr) => info!("OK {:?}", cr), - RelayMessage::Eose(sid) => { - if let Err(err) = handle_eose( - &damus.subscriptions, - &mut damus.timeline_cache, - ctx, - sid, - relay, - ) { - error!("error handling eose: {}", err); - } - } - } -} - fn render_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ui: &mut egui::Ui) { if notedeck::ui::is_narrow(ui.ctx()) { render_damus_mobile(damus, app_ctx, ui); @@ -389,7 +398,7 @@ impl Damus { &txn, ctx.ndb, ctx.note_cache, - ctx.pool, + ctx.subman.pool(), &timeline_kind, ) { add_result.process( diff --git a/crates/notedeck_columns/src/decks.rs b/crates/notedeck_columns/src/decks.rs index 903b8f3d4..61c4dc3e4 100644 --- a/crates/notedeck_columns/src/decks.rs +++ b/crates/notedeck_columns/src/decks.rs @@ -323,7 +323,7 @@ pub fn demo_decks( &txn, ctx.ndb, ctx.note_cache, - ctx.pool, + ctx.subman.pool(), &kind, ) { results.process( diff --git a/crates/notedeck_columns/src/nav.rs b/crates/notedeck_columns/src/nav.rs index 9af13920c..5300a70b1 100644 --- a/crates/notedeck_columns/src/nav.rs +++ b/crates/notedeck_columns/src/nav.rs @@ -73,7 +73,7 @@ impl SwitchingAction { let kinds_to_pop = get_active_columns_mut(ctx.accounts, decks_cache).delete_column(index); for kind in &kinds_to_pop { - if let Err(err) = timeline_cache.pop(kind, ctx.ndb, ctx.pool) { + if let Err(err) = timeline_cache.pop(kind, ctx.ndb, ctx.subman.pool()) { error!("error popping timeline: {err}"); } } @@ -145,7 +145,7 @@ impl RenderNavResponse { let kinds_to_pop = app.columns_mut(ctx.accounts).delete_column(col); for kind in &kinds_to_pop { - if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, ctx.pool) { + if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, ctx.subman.pool()) { error!("error popping timeline: {err}"); } } @@ -155,7 +155,7 @@ impl RenderNavResponse { RenderNavAction::PostAction(post_action) => { let txn = Transaction::new(ctx.ndb).expect("txn"); - let _ = post_action.execute(ctx.ndb, &txn, ctx.pool, &mut app.drafts); + let _ = post_action.execute(ctx.ndb, &txn, ctx.subman.pool(), &mut app.drafts); get_active_columns_mut(ctx.accounts, &mut app.decks_cache) .column_mut(col) .router_mut() @@ -171,7 +171,7 @@ impl RenderNavResponse { col, &mut app.timeline_cache, ctx.note_cache, - ctx.pool, + ctx.subman.pool(), &txn, ctx.unknown_ids, ); @@ -188,7 +188,7 @@ impl RenderNavResponse { profile_action.process( &mut app.view_state.pubkey_to_profile_state, ctx.ndb, - ctx.pool, + ctx.subman.pool(), get_active_columns_mut(ctx.accounts, &mut app.decks_cache) .column_mut(col) .router_mut(), @@ -207,7 +207,7 @@ impl RenderNavResponse { .pop(); if let Some(Route::Timeline(kind)) = &r { - if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, ctx.pool) { + if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, ctx.subman.pool()) { error!("popping timeline had an error: {err} for {:?}", kind); } }; @@ -281,7 +281,7 @@ fn render_nav_body( } Route::Relays => { - let manager = RelayPoolManager::new(ctx.pool); + let manager = RelayPoolManager::new(ctx.subman.pool()); RelayView::new(ctx.accounts, manager, &mut app.view_state.id_string_map).ui(ui); None } diff --git a/crates/notedeck_columns/src/timeline/route.rs b/crates/notedeck_columns/src/timeline/route.rs index 1d28bf867..89d5f2c19 100644 --- a/crates/notedeck_columns/src/timeline/route.rs +++ b/crates/notedeck_columns/src/timeline/route.rs @@ -144,7 +144,7 @@ mod tests { let data_str = format!("thread:{}", note_id_hex); let data = &data_str.split(":").collect::>(); let mut token_writer = TokenWriter::default(); - let mut parser = TokenParser::new(&data); + let mut parser = TokenParser::new(data); let parsed = TimelineKind::parse(&mut parser, &Pubkey::new(*note_id.bytes())).unwrap(); let expected = TimelineKind::Thread(ThreadSelection::from_root_id( RootNoteIdBuf::new_unsafe(*note_id.bytes()), diff --git a/crates/notedeck_columns/src/ui/add_column.rs b/crates/notedeck_columns/src/ui/add_column.rs index 3fe937772..c2e8b2fc6 100644 --- a/crates/notedeck_columns/src/ui/add_column.rs +++ b/crates/notedeck_columns/src/ui/add_column.rs @@ -635,7 +635,7 @@ pub fn render_add_column_routes( &mut timeline, ctx.ndb, &mut app.subscriptions, - ctx.pool, + ctx.subman.pool(), ctx.note_cache, app.since_optimize, ); @@ -676,7 +676,7 @@ pub fn render_add_column_routes( &mut timeline, ctx.ndb, &mut app.subscriptions, - ctx.pool, + ctx.subman.pool(), ctx.note_cache, app.since_optimize, ); @@ -791,7 +791,7 @@ mod tests { let data_str = "column:algo_selection:last_per_pubkey"; let data = &data_str.split(":").collect::>(); let mut token_writer = TokenWriter::default(); - let mut parser = TokenParser::new(&data); + let mut parser = TokenParser::new(data); let parsed = AddColumnRoute::parse_from_tokens(&mut parser).unwrap(); let expected = AddColumnRoute::Algo(AddAlgoRoute::LastPerPubkey); parsed.serialize_tokens(&mut token_writer); diff --git a/crates/notedeck_columns/src/ui/relay.rs b/crates/notedeck_columns/src/ui/relay.rs index e9d8092d2..13d2fc8f6 100644 --- a/crates/notedeck_columns/src/ui/relay.rs +++ b/crates/notedeck_columns/src/ui/relay.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use crate::colors::PINK; use crate::relay_pool_manager::{RelayPoolManager, RelayStatus}; @@ -76,6 +76,12 @@ impl<'a> RelayView<'a> { /// Show the current relays and return a relay the user selected to delete fn show_relays(&'a self, ui: &mut Ui) -> Option { + let advertised_relays: BTreeSet = self + .accounts + .get_advertised_relays() + .into_iter() + .map(|rs| rs.url) + .collect(); let mut relay_to_remove = None; for (index, relay_info) in self.manager.get_relay_infos().iter().enumerate() { ui.add_space(8.0); @@ -113,10 +119,13 @@ impl<'a> RelayView<'a> { }); ui.with_layout(Layout::right_to_left(Align::Center), |ui| { - if ui.add(delete_button(ui.visuals().dark_mode)).clicked() { - relay_to_remove = Some(relay_info.relay_url.to_string()); - }; - + if advertised_relays.contains(relay_info.relay_url) { + if ui.add(delete_button(ui.visuals().dark_mode)).clicked() { + relay_to_remove = Some(relay_info.relay_url.to_string()); + }; + } else { + ui.add_space(16.0); + } show_connection_status(ui, relay_info.status); }); });