diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index 86ebfe9ba3..d4d28d7b1b 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -12,7 +12,7 @@ use crate::{ port_utils::{try_with_port, INTERFACE_CONNECT_ATTEMPTS_MAX}, Holochain, }; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::{bounded, Receiver, Sender}; use holochain_common::paths::DNA_EXTENSION; use holochain_core::{logger::Logger, signal::Signal}; use holochain_core_types::{ @@ -61,6 +61,8 @@ use holochain_net::p2p_config::{BackendConfig, P2pBackendKind, P2pConfig}; pub const MAX_DYNAMIC_PORT: u16 = std::u16::MAX; +use crate::CHANNEL_SIZE; + /// Special string to be printed on stdout, which clients must parse /// in order to discover which port the interface bound to. /// DO NOT CHANGE! @@ -168,8 +170,6 @@ pub fn notify(msg: String) { println!("{}", msg); } -#[autotrace] -#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CONDUCTOR_LIB)] impl Conductor { pub fn from_config(config: Configuration) -> Self { lib3h_sodium::check_init(); @@ -243,8 +243,8 @@ impl Conductor { pub fn spawn_stats_thread(&mut self) { self.stop_stats_thread(); let instances = self.instances.clone(); - let (kill_switch_tx, kill_switch_rx) = unbounded(); - let (stats_tx, stats_rx) = unbounded(); + let (kill_switch_tx, kill_switch_rx) = bounded(CHANNEL_SIZE); + let (stats_tx, stats_rx) = bounded(CHANNEL_SIZE); self.stats_thread_kill_switch = Some(kill_switch_tx); self.stats_signal_receiver = Some(stats_rx); thread::Builder::new() @@ -314,7 +314,7 @@ impl Conductor { let instance_signal_receivers = self.instance_signal_receivers.clone(); let signal_tx = self.signal_tx.clone(); let config = self.config.clone(); - let (kill_switch_tx, kill_switch_rx) = unbounded(); + let (kill_switch_tx, kill_switch_rx) = bounded(CHANNEL_SIZE); self.signal_multiplexer_kill_switch = Some(kill_switch_tx); self.spawn_stats_thread(); let stats_signal_receiver = self.stats_signal_receiver.clone().expect( @@ -784,7 +784,7 @@ impl Conductor { )> { match self.config.tracing.clone().unwrap_or_default() { TracingConfiguration::Jaeger(jaeger_config) => { - let (span_tx, span_rx) = crossbeam_channel::unbounded(); + let (span_tx, span_rx) = crossbeam_channel::bounded(CHANNEL_SIZE); let service_name = format!("{}-{}", jaeger_config.service_name, id); let mut reporter = ht::reporter::JaegerCompactReporter::new(&service_name).unwrap(); if let Some(s) = jaeger_config.socket_address { @@ -834,7 +834,7 @@ impl Conductor { context_builder = context_builder.with_p2p_config(self.get_p2p_config()); // Signal config: - let (sender, receiver) = unbounded(); + let (sender, receiver) = bounded(CHANNEL_SIZE); self.instance_signal_receivers .write() .unwrap() @@ -1341,7 +1341,7 @@ impl Conductor { fn spawn_interface_thread(&self, interface_config: InterfaceConfiguration) -> Sender<()> { let dispatcher = self.make_interface_handler(&interface_config); // The "kill switch" is the channel which allows the interface to be stopped from outside its thread - let (kill_switch_tx, kill_switch_rx) = unbounded(); + let (kill_switch_tx, kill_switch_rx) = bounded(CHANNEL_SIZE); let (broadcaster, _handle) = run_interface(&interface_config, dispatcher, kill_switch_rx) .map_err(|error| { diff --git a/crates/conductor_lib/src/conductor/passphrase_manager.rs b/crates/conductor_lib/src/conductor/passphrase_manager.rs index bfdbc1361e..ee5c8a9bce 100644 --- a/crates/conductor_lib/src/conductor/passphrase_manager.rs +++ b/crates/conductor_lib/src/conductor/passphrase_manager.rs @@ -1,4 +1,4 @@ -use crossbeam_channel::{unbounded, Sender}; +use crossbeam_channel::{bounded, Sender}; use holochain_core_types::error::HolochainError; use holochain_locksmith::Mutex; use lib3h_sodium::secbuf::SecBuf; @@ -11,6 +11,7 @@ use std::{ time::{Duration, Instant}, }; +use crate::CHANNEL_SIZE; #[cfg(unix)] use std::io::{BufRead, BufReader}; #[cfg(unix)] @@ -34,7 +35,7 @@ pub struct PassphraseManager { #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CONDUCTOR_LIB)] impl PassphraseManager { pub fn new(passphrase_service: Arc>) -> Self { - let (kill_switch_tx, kill_switch_rx) = unbounded::<()>(); + let (kill_switch_tx, kill_switch_rx) = bounded::<()>(CHANNEL_SIZE); let pm = PassphraseManager { passphrase_cache: Arc::new(Mutex::new(None)), passphrase_service, diff --git a/crates/conductor_lib/src/lib.rs b/crates/conductor_lib/src/lib.rs index e484bc2ef6..ac72186d36 100644 --- a/crates/conductor_lib/src/lib.rs +++ b/crates/conductor_lib/src/lib.rs @@ -131,4 +131,6 @@ pub mod static_server_impls; pub use crate::holochain::Holochain; +const CHANNEL_SIZE: usize = 1000; + new_relic_setup!("NEW_RELIC_LICENSE_KEY"); diff --git a/crates/conductor_lib/src/static_server_impls/nickel_static_server.rs b/crates/conductor_lib/src/static_server_impls/nickel_static_server.rs index 69caaf0b08..f77932eea3 100644 --- a/crates/conductor_lib/src/static_server_impls/nickel_static_server.rs +++ b/crates/conductor_lib/src/static_server_impls/nickel_static_server.rs @@ -13,6 +13,8 @@ use nickel::{ Response, StaticFilesHandler, }; +use crate::CHANNEL_SIZE; + pub struct NickelStaticServer { shutdown_signal: Option>, config: UiInterfaceConfiguration, @@ -38,7 +40,7 @@ impl ConductorStaticFileServer for NickelStaticServer { } fn start(&mut self) -> HolochainResult<()> { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = crossbeam_channel::bounded(CHANNEL_SIZE); self.shutdown_signal = Some(tx); self.running = true; diff --git a/crates/core/src/agent/actions/commit.rs b/crates/core/src/agent/actions/commit.rs index 5560a9e636..2ca6e43325 100644 --- a/crates/core/src/agent/actions/commit.rs +++ b/crates/core/src/agent/actions/commit.rs @@ -53,10 +53,16 @@ impl Future for CommitFuture { // See: https://github.com/holochain/holochain-rust/issues/314 // cx.waker().clone().wake(); + if self.context.action_channel().is_full() { + return Poll::Pending; + } if let Some(state) = self.context.try_state() { match state.agent().actions().get(&self.action) { Some(r) => match r.response() { AgentActionResponse::Commit(result) => { + if self.context.action_channel().is_full() { + return Poll::Pending; + } dispatch_action( self.context.action_channel(), ActionWrapper::new(Action::ClearActionResponse( diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 81d00c4f28..7d43e18dba 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -6,8 +6,9 @@ use crate::{ persister::Persister, signal::{Signal, SignalSender}, state::StateWrapper, + CHANNEL_SIZE, }; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::{bounded, Receiver, Sender}; use futures::{ executor::ThreadPool, task::{noop_waker_ref, Poll}, @@ -50,8 +51,8 @@ use std::{ #[cfg(test)] use test_utils::mock_signing::mock_conductor_api; -pub type ActionSender = ht::channel::SpanSender; -pub type ActionReceiver = ht::channel::SpanReceiver; +pub type ActionSender = crossbeam_channel::Sender; +pub type ActionReceiver = crossbeam_channel::Receiver; pub struct P2pNetworkWrapper(Arc>>); @@ -306,13 +307,13 @@ impl Context { pub fn is_action_channel_open(&self) -> bool { self.action_channel .clone() - .map(|tx| tx.send_wrapped(ActionWrapper::new(Action::Ping)).is_ok()) + .map(|tx| tx.send(ActionWrapper::new(Action::Ping)).is_ok()) .unwrap_or(false) } pub fn action_channel_error(&self, msg: &str) -> Option { match &self.action_channel { - Some(tx) => match tx.send_wrapped(ActionWrapper::new(Action::Ping)) { + Some(tx) => match tx.send(ActionWrapper::new(Action::Ping)) { Ok(()) => None, Err(_) => Some(HolochainError::LifecycleError(msg.into())), }, @@ -343,7 +344,7 @@ impl Context { /// got mutated. /// This enables blocking/parking the calling thread until the application state got changed. pub fn create_observer(&self) -> Receiver<()> { - let (tick_tx, tick_rx) = unbounded(); + let (tick_tx, tick_rx) = bounded(CHANNEL_SIZE); self.observer_channel() .send(Observer { ticker: tick_tx }) .expect("Observer channel not initialized"); diff --git a/crates/core/src/instance.rs b/crates/core/src/instance.rs index c1c0e41aeb..1301d41fc8 100644 --- a/crates/core/src/instance.rs +++ b/crates/core/src/instance.rs @@ -12,6 +12,7 @@ use crate::{ signal::Signal, state::{State, StateWrapper}, workflows::{application, run_holding_workflow}, + CHANNEL_SIZE, }; #[cfg(test)] use crate::{ @@ -19,7 +20,7 @@ use crate::{ nucleus::actions::initialize::initialize_chain, }; use clokwerk::{ScheduleHandle, Scheduler, TimeUnits}; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::{bounded, Receiver, Sender}; use holochain_core_types::{ dna::Dna, error::{HcResult, HolochainError}, @@ -27,7 +28,6 @@ use holochain_core_types::{ use holochain_locksmith::RwLock; #[cfg(test)] use holochain_persistence_api::cas::content::Address; -use holochain_tracing::{self as ht, channel::lax_send_wrapped}; use snowflake::ProcessUniqueId; use std::{ sync::{ @@ -67,7 +67,6 @@ pub struct Observer { pub static DISPATCH_WITHOUT_CHANNELS: &str = "dispatch called without channels open"; #[autotrace] -#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] impl Instance { /// This is initializing and starting the redux action loop and adding channels to dispatch /// actions and observers to the context @@ -162,9 +161,9 @@ impl Instance { /// Returns recievers for actions and observers that get added to this instance fn initialize_channels(&mut self) -> (ActionReceiver, Receiver) { - let (tx_action, rx_action) = unbounded::>(); - let (tx_observer, rx_observer) = unbounded::(); - self.action_channel = Some(tx_action.into()); + let (tx_action, rx_action) = bounded::(CHANNEL_SIZE); + let (tx_observer, rx_observer) = bounded::(CHANNEL_SIZE); + self.action_channel = Some(tx_action); self.observer_channel = Some(tx_observer); (rx_action, rx_observer) @@ -190,7 +189,7 @@ impl Instance { let mut sync_self = self.clone(); let sub_context = self.initialize_context(context); - let (kill_sender, kill_receiver) = crossbeam_channel::unbounded(); + let (kill_sender, kill_receiver) = crossbeam_channel::bounded(CHANNEL_SIZE); self.kill_switch = Some(kill_sender); let instance_is_alive = sub_context.instance_is_alive.clone(); instance_is_alive.store(true, Ordering::Relaxed); @@ -202,7 +201,7 @@ impl Instance { )) .spawn(move || { let mut state_observers: Vec = Vec::new(); - let mut unprocessed_action: Option> = None; + let mut unprocessed_action: Option = None; while kill_receiver.try_recv().is_err() { if let Some(action_wrapper) = unprocessed_action.take().or_else(|| rx_action.recv_timeout(Duration::from_secs(1)).ok()) { // Add new observers @@ -213,11 +212,6 @@ impl Instance { if should_process { match sync_self.process_action(&action_wrapper, &sub_context) { Ok(()) => { - let tag = ht::Tag::new("action", format!("{:?}", action)); - let _guard = action_wrapper.follower_(&sub_context.tracer, "action_loop thread", |s| s.tag(tag).start()).map(|span| { - - ht::push_span(span) - }); sync_self.emit_signals(&sub_context, &action_wrapper); // Tick all observers and remove those that have lost their receiving part state_observers= state_observers @@ -255,20 +249,9 @@ impl Instance { /// returns the new vector of observers pub(crate) fn process_action( &self, - action_wrapper: &ht::SpanWrap, + action_wrapper: &ActionWrapper, context: &Arc, ) -> Result<(), HolochainError> { - let span = action_wrapper - .follower(&context.tracer, "begin process_action") - .unwrap_or_else(|| { - context - .tracer - .span("ROOT: process_action") - .tag(ht::debug_tag("action_wrapper", action_wrapper)) - .start() - .into() - }); - let _trace_guard = ht::push_span(span); context.redux_wants_write.store(true, Relaxed); // Mutate state { @@ -282,7 +265,7 @@ impl Instance { HolochainError::Timeout(format!("timeout src: {}:{}", file!(), line!())) })?; - new_state = state.reduce(action_wrapper.data.clone()); + new_state = state.reduce(action_wrapper.clone()); // Change the state *state = new_state; @@ -308,7 +291,7 @@ impl Instance { } fn start_holding_loop(&mut self, context: Arc) { - let (kill_sender, kill_receiver) = crossbeam_channel::unbounded(); + let (kill_sender, kill_receiver) = crossbeam_channel::bounded(CHANNEL_SIZE); self.kill_switch_holding = Some(kill_sender); thread::Builder::new() .name(format!( @@ -500,7 +483,7 @@ impl Drop for Instance { /// Panics if the channels passed are disconnected. #[autotrace] pub fn dispatch_action(action_channel: &ActionSender, action_wrapper: ActionWrapper) { - lax_send_wrapped(action_channel.clone(), action_wrapper, "dispatch_action"); + action_channel.send(action_wrapper).ok(); } #[cfg(test)] diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index ad89a6113c..eedddf3c4e 100755 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -67,4 +67,6 @@ pub mod state_dump; pub mod wasm_engine; pub mod workflows; +const CHANNEL_SIZE: usize = 1000; + new_relic_setup!("NEW_RELIC_LICENSE_KEY"); diff --git a/crates/core/src/logger.rs b/crates/core/src/logger.rs index e6b145c025..08474c5227 100644 --- a/crates/core/src/logger.rs +++ b/crates/core/src/logger.rs @@ -1,6 +1,7 @@ //! This logger is the logger that's attached to each Holochain application //! which is separate from standard logging via the log crate warn! info! debug! logging that //! gets emitted globaly from the conductor. +use crate::CHANNEL_SIZE; use chrono::Local; use crossbeam_channel; use holochain_locksmith::Mutex; @@ -56,7 +57,7 @@ impl ChannelLogger { ChannelLogger { id, sender } } pub fn setup() -> (Sender, Receiver) { - crossbeam_channel::unbounded() + crossbeam_channel::bounded(CHANNEL_SIZE) } } pub fn default_handler(msg: String) { diff --git a/crates/core/src/network/actions/custom_send.rs b/crates/core/src/network/actions/custom_send.rs index 8e8733e584..ba052e01fa 100644 --- a/crates/core/src/network/actions/custom_send.rs +++ b/crates/core/src/network/actions/custom_send.rs @@ -62,6 +62,9 @@ impl Future for SendResponseFuture { // cx.waker().clone().wake(); + if self.context.action_channel().is_full() { + return Poll::Pending; + } if let Some(state) = self.context.try_state() { let state = state.network(); if let Err(error) = state.initialized() { @@ -69,6 +72,9 @@ impl Future for SendResponseFuture { } match state.custom_direct_message_replys.get(&self.id) { Some(result) => { + if self.context.action_channel().is_full() { + return Poll::Pending; + } dispatch_action( self.context.action_channel(), ActionWrapper::new(Action::ClearCustomSendResponse(self.id.clone())), diff --git a/crates/core/src/network/actions/get_validation_package.rs b/crates/core/src/network/actions/get_validation_package.rs index 76f064c7c3..0f9f48736a 100644 --- a/crates/core/src/network/actions/get_validation_package.rs +++ b/crates/core/src/network/actions/get_validation_package.rs @@ -62,6 +62,9 @@ impl Future for GetValidationPackageFuture { // cx.waker().clone().wake(); + if self.context.action_channel().is_full() { + return Poll::Pending; + } if let Some(state) = self.context.try_state() { let state = state.network(); if let Err(error) = state.initialized() { @@ -70,6 +73,9 @@ impl Future for GetValidationPackageFuture { match state.get_validation_package_results.get(&self.key) { Some(Some(result)) => { + if self.context.action_channel().is_full() { + return Poll::Pending; + } dispatch_action( self.context.action_channel(), ActionWrapper::new(Action::ClearValidationPackageResult(self.key.clone())), diff --git a/crates/core/src/network/actions/publish.rs b/crates/core/src/network/actions/publish.rs index 4f7236c1fa..bf8a09ee3f 100644 --- a/crates/core/src/network/actions/publish.rs +++ b/crates/core/src/network/actions/publish.rs @@ -47,6 +47,9 @@ impl Future for PublishFuture { // cx.waker().clone().wake(); + if self.context.action_channel().is_full() { + return Poll::Pending; + } if let Some(state) = self.context.try_state() { let state = state.network(); if let Err(error) = state.initialized() { @@ -56,6 +59,9 @@ impl Future for PublishFuture { match state.actions().get(&self.action) { Some(r) => match r.response() { NetworkActionResponse::Publish(result) => { + if self.context.action_channel().is_full() { + return Poll::Pending; + } dispatch_action( self.context.action_channel(), ActionWrapper::new(Action::ClearActionResponse( diff --git a/crates/core/src/network/actions/publish_header_entry.rs b/crates/core/src/network/actions/publish_header_entry.rs index 2493d760fb..f460c4370a 100644 --- a/crates/core/src/network/actions/publish_header_entry.rs +++ b/crates/core/src/network/actions/publish_header_entry.rs @@ -47,6 +47,9 @@ impl Future for PublishHeaderEntryFuture { // cx.waker().clone().wake(); + if self.context.action_channel().is_full() { + return Poll::Pending; + } if let Some(state) = self.context.try_state() { let state = state.network(); if let Err(error) = state.initialized() { @@ -55,6 +58,9 @@ impl Future for PublishHeaderEntryFuture { match state.actions().get(&self.action) { Some(r) => match r.response() { NetworkActionResponse::PublishHeaderEntry(result) => { + if self.context.action_channel().is_full() { + return Poll::Pending; + } dispatch_action( self.context.action_channel(), ActionWrapper::new(Action::ClearActionResponse( diff --git a/crates/core/src/network/actions/query.rs b/crates/core/src/network/actions/query.rs index e7ecdce264..19d75b66c4 100644 --- a/crates/core/src/network/actions/query.rs +++ b/crates/core/src/network/actions/query.rs @@ -96,12 +96,18 @@ impl Future for QueryFuture { // cx.waker().clone().wake(); + if self.context.action_channel().is_full() { + return Poll::Pending; + } if let Some(state) = self.context.try_state() { if let Err(error) = state.network().initialized() { return Poll::Ready(Err(error)); } match state.network().get_query_results.get(&self.key) { Some(Some(result)) => { + if self.context.action_channel().is_full() { + return Poll::Pending; + } dispatch_action( self.context.action_channel(), ActionWrapper::new(Action::ClearQueryResult(self.key.clone())), diff --git a/crates/core/src/nucleus/actions/call_zome_function.rs b/crates/core/src/nucleus/actions/call_zome_function.rs index 187b4e5a72..6283449309 100644 --- a/crates/core/src/nucleus/actions/call_zome_function.rs +++ b/crates/core/src/nucleus/actions/call_zome_function.rs @@ -16,7 +16,6 @@ use holochain_core_types::{ use holochain_json_api::json::JsonString; use holochain_persistence_api::cas::content::{Address, AddressableContent}; -use holochain_tracing::channel::lax_send_wrapped; use holochain_dpki::utils::Verify; @@ -86,7 +85,7 @@ pub async fn call_zome_function( // Signal (currently mainly to the nodejs_waiter) that we are about to start a zome function: context .action_channel() - .send_wrapped(ActionWrapper::new(Action::QueueZomeFunctionCall( + .send(ActionWrapper::new(Action::QueueZomeFunctionCall( zome_call.clone(), ))) .expect("action channel to be open"); @@ -305,11 +304,12 @@ pub fn spawn_zome_function(context: Arc, zome_call: ZomeFnCall) { context, "actions/call_zome_fn: sending ReturnZomeFunctionResult action." ); - lax_send_wrapped( - context.action_channel().clone(), - ActionWrapper::new(Action::ReturnZomeFunctionResult(response)), - "call_zome_function", - ); + context + .action_channel() + .send(ActionWrapper::new(Action::ReturnZomeFunctionResult( + response, + ))) + .ok(); log_debug!( context, "actions/call_zome_fn: sent ReturnZomeFunctionResult action." @@ -341,10 +341,16 @@ impl Future for CallResultFuture { // Leaving this in to be safe against running this future in another executor. cx.waker().clone().wake(); + if self.context.action_channel().is_full() { + return Poll::Pending; + } if let Some(state) = self.context.clone().try_state() { if self.call_spawned { match state.nucleus().zome_call_result(&self.zome_call) { Some(result) => { + if self.context.action_channel().is_full() { + return Poll::Pending; + } dispatch_action( self.context.action_channel(), ActionWrapper::new(Action::ClearZomeFunctionCall( diff --git a/crates/core/src/nucleus/actions/initialize.rs b/crates/core/src/nucleus/actions/initialize.rs index 3b9a0c50af..ca49279d70 100644 --- a/crates/core/src/nucleus/actions/initialize.rs +++ b/crates/core/src/nucleus/actions/initialize.rs @@ -75,7 +75,7 @@ pub async fn initialize_chain( fn dispatch_error_result(context: &Arc, err: HolochainError) { context .action_channel() - .send_wrapped(ActionWrapper::new(Action::ReturnInitializationResult(Err( + .send(ActionWrapper::new(Action::ReturnInitializationResult(Err( err.to_string(), )))) .expect("Action channel not usable in initialize_chain()"); @@ -174,7 +174,7 @@ pub async fn initialize_chain( context_clone .action_channel() - .send_wrapped(ActionWrapper::new(Action::ReturnInitializationResult( + .send(ActionWrapper::new(Action::ReturnInitializationResult( initialization_result, ))) .expect("Action channel not usable in initialize_chain()"); diff --git a/crates/core/src/nucleus/reducers/init_application.rs b/crates/core/src/nucleus/reducers/init_application.rs index 2f6499ca52..d9bfd4f593 100644 --- a/crates/core/src/nucleus/reducers/init_application.rs +++ b/crates/core/src/nucleus/reducers/init_application.rs @@ -46,10 +46,10 @@ pub mod tests { state::{NucleusState, NucleusStatus}, }, state::test_store, + CHANNEL_SIZE, }; - use crossbeam_channel::unbounded; + use crossbeam_channel::bounded; use holochain_core_types::dna::Dna; - use holochain_tracing as ht; use std::sync::Arc; #[test] @@ -58,8 +58,8 @@ pub mod tests { let dna = Dna::new(); let action_wrapper = ActionWrapper::new(Action::InitializeChain(dna.clone())); let nucleus = Arc::new(NucleusState::new()); // initialize to bogus value - let (sender, _receiver) = unbounded::>(); - let (tx_observer, _observer) = unbounded::(); + let (sender, _receiver) = bounded::(CHANNEL_SIZE); + let (tx_observer, _observer) = bounded::(CHANNEL_SIZE); let context = test_context_with_channels("jimmy", &sender.into(), &tx_observer, None); let root_state = test_store(context); diff --git a/crates/core/src/nucleus/reducers/return_initialization_result.rs b/crates/core/src/nucleus/reducers/return_initialization_result.rs index 9942f4ba80..39ddda7574 100644 --- a/crates/core/src/nucleus/reducers/return_initialization_result.rs +++ b/crates/core/src/nucleus/reducers/return_initialization_result.rs @@ -41,10 +41,10 @@ pub mod tests { state::{NucleusState, NucleusStatus}, }, state::test_store, + CHANNEL_SIZE, }; - use crossbeam_channel::unbounded; + use crossbeam_channel::bounded; use holochain_core_types::dna::Dna; - use holochain_tracing as ht; use std::sync::Arc; #[test] @@ -53,10 +53,9 @@ pub mod tests { let dna = Dna::new(); let action_wrapper = ActionWrapper::new(Action::InitializeChain(dna)); let nucleus = Arc::new(NucleusState::new()); // initialize to bogus value - let (sender, _receiver) = unbounded::>(); - let (tx_observer, _observer) = unbounded::(); - let context = - test_context_with_channels("jimmy", &sender.into(), &tx_observer, None).clone(); + let (sender, _receiver) = bounded::(CHANNEL_SIZE); + let (tx_observer, _observer) = bounded::(CHANNEL_SIZE); + let context = test_context_with_channels("jimmy", &sender, &tx_observer, None).clone(); let root_state = test_store(context); // Reduce Init action diff --git a/crates/core/src/signal.rs b/crates/core/src/signal.rs index cf9114e88c..6f80a73cc1 100644 --- a/crates/core/src/signal.rs +++ b/crates/core/src/signal.rs @@ -1,5 +1,5 @@ -use crate::{action::ActionWrapper, consistency::ConsistencySignal}; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crate::{action::ActionWrapper, consistency::ConsistencySignal, CHANNEL_SIZE}; +use crossbeam_channel::{bounded, Receiver, Sender}; use holochain_json_api::{error::JsonError, json::JsonString}; use holochain_wasm_utils::api_serialization::emit_signal::EmitSignalArgs; use serde::{Deserialize, Deserializer}; @@ -43,7 +43,7 @@ pub type SignalSender = Sender; pub type SignalReceiver = Receiver; pub fn signal_channel() -> (SignalSender, SignalReceiver) { - unbounded() + bounded(CHANNEL_SIZE) } /// Pass on messages from multiple receivers into a single receiver @@ -52,7 +52,7 @@ pub fn _combine_receivers(rxs: Vec>) -> Receiver where T: 'static + Send, { - let (master_tx, master_rx) = unbounded::(); + let (master_tx, master_rx) = bounded::(CHANNEL_SIZE); for rx in rxs { let tx = master_tx.clone(); let _ = thread::Builder::new() diff --git a/crates/core/src/wasm_engine/api/emit_signal.rs b/crates/core/src/wasm_engine/api/emit_signal.rs index 3e6986d874..4195abc69c 100644 --- a/crates/core/src/wasm_engine/api/emit_signal.rs +++ b/crates/core/src/wasm_engine/api/emit_signal.rs @@ -61,8 +61,9 @@ pub mod tests { }, Defn, }, + CHANNEL_SIZE, }; - use crossbeam_channel::unbounded; + use crossbeam_channel::bounded; use holochain_json_api::json::JsonString; use holochain_wasm_utils::api_serialization::emit_signal::EmitSignalArgs; use std::sync::Arc; @@ -92,7 +93,7 @@ pub mod tests { let (_instance, context) = test_instance_and_context(dna, None).expect("Could not create test instance"); - let (tx, rx) = unbounded::(); + let (tx, rx) = bounded::(CHANNEL_SIZE); let mut context = (*context).clone(); context.signal_tx = Some(tx); let context = Arc::new(context); diff --git a/crates/core_types/src/bits_n_pieces.rs b/crates/core_types/src/bits_n_pieces.rs index 4d142a5c1e..b3ae1608b0 100644 --- a/crates/core_types/src/bits_n_pieces.rs +++ b/crates/core_types/src/bits_n_pieces.rs @@ -8,7 +8,7 @@ pub fn u32_high_bits(i: u32) -> u16 { /// returns the u16 low bits from a u32 by doing a lossy cast pub fn u32_low_bits(i: u32) -> u16 { - (i as u16) + i as u16 } /// splits the high and low bits of u32 into a tuple of u16, for destructuring convenience @@ -26,7 +26,7 @@ pub fn u64_high_bits(i: u64) -> u32 { } pub fn u64_low_bits(i: u64) -> u32 { - (i as u32) + i as u32 } pub fn u64_split_bits(i: u64) -> (u32, u32) { diff --git a/crates/in_stream/src/lib.rs b/crates/in_stream/src/lib.rs index 4aadc65480..eec418a77b 100644 --- a/crates/in_stream/src/lib.rs +++ b/crates/in_stream/src/lib.rs @@ -6,7 +6,7 @@ //! use url2::prelude::*; //! use in_stream::*; //! -//! let (send_binding, recv_binding) = crossbeam_channel::unbounded(); +//! let (send_binding, recv_binding) = crossbeam_channel::bounded(CHANNEL_SIZE); //! //! let server_thread = std::thread::spawn(move || { //! let config = TcpBindConfig::default(); @@ -94,3 +94,5 @@ pub use ws::*; pub mod json_rpc; pub use json_rpc::*; + +pub const CHANNEL_SIZE: usize = 1000; diff --git a/crates/in_stream/src/mem.rs b/crates/in_stream/src/mem.rs index 0f70949cd9..9fd56116cc 100644 --- a/crates/in_stream/src/mem.rs +++ b/crates/in_stream/src/mem.rs @@ -257,8 +257,8 @@ use in_stream_mem::random_url; /// private stream pair constructor, these streams can message each other fn create_mem_stream_pair(url_a: Url2, url_b: Url2) -> (InStreamMem, InStreamMem) { - let (send1, recv1) = crossbeam_channel::unbounded(); - let (send2, recv2) = crossbeam_channel::unbounded(); + let (send1, recv1) = crossbeam_channel::bounded(CHANNEL_SIZE); + let (send2, recv2) = crossbeam_channel::bounded(CHANNEL_SIZE); ( InStreamMem::priv_new(url_a, send1, recv2), InStreamMem::priv_new(url_b, send2, recv1), @@ -312,7 +312,7 @@ impl MemManager { Entry::Occupied(_) => Err(ErrorKind::AddrInUse.into()), Entry::Vacant(e) => { // the url is not in use, let's create a new listener - let (send, recv) = crossbeam_channel::unbounded(); + let (send, recv) = crossbeam_channel::bounded(CHANNEL_SIZE); e.insert(send); Ok(InStreamListenerMem::priv_new(new_url, recv)) } @@ -373,7 +373,7 @@ mod tests { fn mem_works() { use std::io::{Read, Write}; - let (send_binding, recv_binding) = crossbeam_channel::unbounded(); + let (send_binding, recv_binding) = crossbeam_channel::bounded(CHANNEL_SIZE); let server_thread = std::thread::spawn(move || { let mut listener = diff --git a/crates/in_stream/src/tcp.rs b/crates/in_stream/src/tcp.rs index 29eb367a4f..0e6b909c22 100644 --- a/crates/in_stream/src/tcp.rs +++ b/crates/in_stream/src/tcp.rs @@ -328,7 +328,7 @@ mod tests { let bind = bind.to_string(); let con = con.map(|c| c.to_string()); - let (send_binding, recv_binding) = crossbeam_channel::unbounded(); + let (send_binding, recv_binding) = crossbeam_channel::bounded(CHANNEL_SIZE); let server_thread = std::thread::spawn(move || { let mut listener = diff --git a/crates/in_stream/src/tls.rs b/crates/in_stream/src/tls.rs index 42cbd28830..3c7527ef65 100644 --- a/crates/in_stream/src/tls.rs +++ b/crates/in_stream/src/tls.rs @@ -373,7 +373,7 @@ mod tests { mut listener: InStreamListenerTls, c: C, ) { - let (send_binding, recv_binding) = crossbeam_channel::unbounded(); + let (send_binding, recv_binding) = crossbeam_channel::bounded(CHANNEL_SIZE); let server_thread = std::thread::spawn(move || { println!("bound to: {}", listener.binding()); diff --git a/crates/in_stream/src/ws.rs b/crates/in_stream/src/ws.rs index ba926e66f4..997f292192 100644 --- a/crates/in_stream/src/ws.rs +++ b/crates/in_stream/src/ws.rs @@ -451,7 +451,7 @@ mod tests { mut listener: InStreamListenerWss, c: C, ) { - let (send_binding, recv_binding) = crossbeam_channel::unbounded(); + let (send_binding, recv_binding) = crossbeam_channel::bounded(CHANNEL_SIZE); let server_thread = std::thread::spawn(move || { println!("bound to: {}", listener.binding()); diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index 1ee0fc3c1c..a549bc1294 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -27,3 +27,5 @@ pub use cloudwatch::*; pub use config::*; pub use metrics::*; new_relic_setup!("NEW_RELIC_LICENSE_KEY"); + +pub const CHANNEL_SIZE: usize = 1000; diff --git a/crates/metrics/src/metrics.rs b/crates/metrics/src/metrics.rs index 3b638328dc..6b115021a8 100644 --- a/crates/metrics/src/metrics.rs +++ b/crates/metrics/src/metrics.rs @@ -1,3 +1,4 @@ +use crate::CHANNEL_SIZE; use chrono::prelude::*; use crossbeam_channel::*; use holochain_locksmith::RwLock; @@ -68,7 +69,7 @@ pub struct ChannelPublisher { impl ChannelPublisher { pub fn new(mut metric_publisher: Box) -> Self { - let (sender, receiver) = unbounded(); + let (sender, receiver) = bounded(CHANNEL_SIZE); let _join_handle: std::thread::JoinHandle<()> = std::thread::spawn(move || loop { match receiver.try_recv() { Ok(metric) => metric_publisher.publish(&metric), diff --git a/crates/net/src/connection/net_connection_thread.rs b/crates/net/src/connection/net_connection_thread.rs index d32eddcf44..45db1d03ee 100644 --- a/crates/net/src/connection/net_connection_thread.rs +++ b/crates/net/src/connection/net_connection_thread.rs @@ -2,7 +2,7 @@ use super::{ net_connection::{NetHandler, NetSend, NetWorkerFactory}, NetResult, }; -use crate::p2p_network::Lib3hClientProtocolWrapped; +use crate::{p2p_network::Lib3hClientProtocolWrapped, CHANNEL_SIZE}; use failure::err_msg; use holochain_locksmith::Mutex; use holochain_logging::prelude::*; @@ -48,8 +48,8 @@ impl NetConnectionThread { let can_keep_running = Arc::new(AtomicBool::new(true)); let can_keep_running_child = can_keep_running.clone(); // Create channels between self and spawned thread - let (send_channel, recv_channel) = crossbeam_channel::unbounded(); - let (send_endpoint, recv_endpoint) = crossbeam_channel::unbounded(); + let (send_channel, recv_channel) = crossbeam_channel::bounded(CHANNEL_SIZE); + let (send_endpoint, recv_endpoint) = crossbeam_channel::bounded(CHANNEL_SIZE); // Spawn worker thread let thread = thread::Builder::new() @@ -180,7 +180,7 @@ impl NetConnectionThread { mod tests { use super::{super::net_connection::NetWorker, *}; use crate::p2p_network::Lib3hServerProtocolWrapped; - use crossbeam_channel::unbounded; + use crossbeam_channel::bounded; use holochain_persistence_api::hash::HashString; use lib3h_protocol::{ data_types::GenericResultData, @@ -254,7 +254,7 @@ mod tests { #[test] fn it_invokes_connection_thread() { - let (sender, receiver) = unbounded(); + let (sender, receiver) = bounded(CHANNEL_SIZE); let mut con = NetConnectionThread::new( NetHandler::new(Box::new(move |r| { @@ -293,7 +293,7 @@ mod tests { #[test] fn it_can_tick() { - let (sender, receiver) = unbounded(); + let (sender, receiver) = bounded(CHANNEL_SIZE); let mut con = NetConnectionThread::new( NetHandler::new(Box::new(move |r| { diff --git a/crates/net/src/in_memory/memory_worker.rs b/crates/net/src/in_memory/memory_worker.rs index ad6e3acf93..b16921c199 100644 --- a/crates/net/src/in_memory/memory_worker.rs +++ b/crates/net/src/in_memory/memory_worker.rs @@ -14,6 +14,7 @@ use holochain_persistence_api::{cas::content::Address, hash::HashString}; use lib3h_protocol::{protocol_client::Lib3hClientProtocol, protocol_server::Lib3hServerProtocol}; use std::collections::{hash_map::Entry, HashMap}; +use crate::CHANNEL_SIZE; /// a p2p worker for mocking in-memory scenario tests #[allow(non_snake_case)] pub struct InMemoryWorker { @@ -47,7 +48,7 @@ impl NetWorker for InMemoryWorker { match self.receiver_per_dna.entry(dna_address.clone()) { Entry::Occupied(_) => (), Entry::Vacant(e) => { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = crossbeam_channel::bounded(CHANNEL_SIZE); println!("register_chain: {}::{}", dna_address, track_msg.agent_id); server.register_chain(&dna_address, &track_msg.agent_id, tx)?; e.insert(rx); @@ -166,7 +167,7 @@ impl Drop for InMemoryWorker { mod tests { use super::*; use crate::p2p_config::P2pConfig; - use crossbeam_channel::unbounded; + use crossbeam_channel::bounded; use holochain_persistence_api::cas::content::Address; use lib3h_protocol::{data_types::SpaceData, types::AgentPubKey}; @@ -184,7 +185,7 @@ mod tests { fn can_memory_worker_double_track() { // setup client 1 let memory_config = &JsonString::from(P2pConfig::unique_memory_backend_json()); - let (handler_send_1, handler_recv_1) = unbounded::(); + let (handler_send_1, handler_recv_1) = bounded::(CHANNEL_SIZE); let mut memory_worker_1 = Box::new( InMemoryWorker::new( diff --git a/crates/net/src/lib.rs b/crates/net/src/lib.rs index e4f4f1588b..b16f86e337 100755 --- a/crates/net/src/lib.rs +++ b/crates/net/src/lib.rs @@ -33,4 +33,6 @@ pub mod p2p_network; pub mod sim2h_worker; pub mod tweetlog; +const CHANNEL_SIZE: usize = 1000; + new_relic_setup!("NEW_RELIC_LICENSE_KEY"); diff --git a/crates/net/src/p2p_network.rs b/crates/net/src/p2p_network.rs index f400aa9f88..6e271d95c0 100644 --- a/crates/net/src/p2p_network.rs +++ b/crates/net/src/p2p_network.rs @@ -17,7 +17,7 @@ use lib3h_protocol::{ protocol_client::Lib3hClientProtocol, protocol_server::Lib3hServerProtocol, Address, }; -use crate::sim2h_worker::Sim2hWorker; +use crate::{sim2h_worker::Sim2hWorker, CHANNEL_SIZE}; use crossbeam_channel; use holochain_conductor_lib_api::conductor_api::ConductorApi; use holochain_json_api::json::JsonString; @@ -111,7 +111,7 @@ impl P2pNetwork { }), }; - let (t, rx) = crossbeam_channel::unbounded(); + let (t, rx) = crossbeam_channel::bounded(CHANNEL_SIZE); let tx = t.clone(); let wrapped_handler = if Self::should_wait_for_p2p_ready(&p2p_config2.clone()) { NetHandler::new(Box::new(move |message| { diff --git a/crates/net/tests/integration.rs b/crates/net/tests/integration.rs index dd3bdfc7a9..3f787b0627 100644 --- a/crates/net/tests/integration.rs +++ b/crates/net/tests/integration.rs @@ -18,6 +18,8 @@ use lib3h_sodium::SodiumCryptoSystem; use sim2h::{run_sim2h, DhtAlgorithm}; use std::sync::Arc; +const CHANNEL_SIZE: usize = 1000; + struct Server { pub bound_uri: Lib3hUri, pub thread: Option>, @@ -35,7 +37,7 @@ impl Server { pub fn new(url: &str) -> Self { let url = url2!("{}", url); - let (snd, rcv) = crossbeam_channel::unbounded(); + let (snd, rcv) = crossbeam_channel::bounded(CHANNEL_SIZE); let cont = Arc::new(Mutex::new(true)); diff --git a/crates/sim2h/src/connection_mgr.rs b/crates/sim2h/src/connection_mgr.rs index 8ea6806fa0..a7b3ef3b39 100644 --- a/crates/sim2h/src/connection_mgr.rs +++ b/crates/sim2h/src/connection_mgr.rs @@ -31,10 +31,10 @@ enum ConMgrCommand { ListConnections(tokio::sync::oneshot::Sender>), } -type EvtSend = tokio::sync::mpsc::UnboundedSender; -type EvtRecv = tokio::sync::mpsc::UnboundedReceiver; -type CmdSend = tokio::sync::mpsc::UnboundedSender; -type CmdRecv = tokio::sync::mpsc::UnboundedReceiver; +type EvtSend = tokio::sync::mpsc::Sender; +type EvtRecv = tokio::sync::mpsc::Receiver; +type CmdSend = tokio::sync::mpsc::Sender; +type CmdRecv = tokio::sync::mpsc::Receiver; pub type ConnectionMgrEventRecv = EvtRecv; @@ -59,7 +59,7 @@ fn process_control_cmds(cmd_info: &mut CmdInfo) -> Loop { let CmdInfo { ref mut did_work, ref mut cmd_count, - ref evt_send, + ref mut evt_send, ref mut cmd_recv, ref uri, ref mut wss, @@ -105,10 +105,10 @@ fn process_control_cmds(cmd_info: &mut CmdInfo) -> Loop { } // process a batch of incoming websocket frames -fn process_websocket_frames(cmd_info: &mut CmdInfo) -> Loop { +async fn process_websocket_frames(cmd_info: &mut CmdInfo) -> Loop { let CmdInfo { ref mut did_work, - ref evt_send, + ref mut evt_send, ref uri, ref mut wss, ref mut read_count, @@ -125,7 +125,10 @@ fn process_websocket_frames(cmd_info: &mut CmdInfo) -> Loop { *did_work = true; let data = frame.take().unwrap(); debug!("socket {} read {} bytes", uri, len); - if let Err(_) = evt_send.send(ConMgrEvent::ReceiveData(uri.clone(), data)) { + if let Err(_) = evt_send + .send(ConMgrEvent::ReceiveData(uri.clone(), data)) + .await + { debug!("socket evt channel closed {}", uri); // end task return Loop::Break; @@ -174,7 +177,7 @@ async fn wss_task(uri: Lib3hUri, wss: TcpWss, evt_send: EvtSend, cmd_recv: CmdRe } // next process a batch of incoming websocket frames - if let Loop::Break = process_websocket_frames(&mut cmd_info) { + if let Loop::Break = process_websocket_frames(&mut cmd_info).await { break 'wss_task_loop; } @@ -204,7 +207,7 @@ async fn wss_task(uri: Lib3hUri, wss: TcpWss, evt_send: EvtSend, cmd_recv: CmdRe /// internal actually spawn the above wss_task into the tokio runtime fn spawn_wss_task(uri: Lib3hUri, wss: TcpWss, evt_send: EvtSend) -> CmdSend { debug!(?uri); - let (cmd_send, cmd_recv) = tokio::sync::mpsc::unbounded_channel(); + let (cmd_send, cmd_recv) = tokio::sync::mpsc::channel(CHANNEL_SIZE); tokio::task::spawn(wss_task(uri, wss, evt_send, cmd_recv).instrument(debug_span!("wss_task"))); cmd_send } @@ -227,7 +230,7 @@ async fn con_mgr_task(mut con_mgr: ConnectionMgr, weak_ref_dummy: Weak<()>) { break 'con_mgr_task; } - match con_mgr.process() { + match con_mgr.process().await { DidWork => tokio::task::yield_now().await, NoWork => tokio::time::delay_for(std::time::Duration::from_millis(5)).await, EndTask => break 'con_mgr_task, @@ -251,9 +254,9 @@ impl ConnectionMgr { /// spawn a new connection manager task, returning a handle for controlling it /// and a receiving channel for any incoming data pub fn new() -> (ConnectionMgrHandle, ConnectionMgrEventRecv, ConnectionCount) { - let (evt_p_send, evt_p_recv) = tokio::sync::mpsc::unbounded_channel(); - let (evt_c_send, evt_c_recv) = tokio::sync::mpsc::unbounded_channel(); - let (cmd_send, cmd_recv) = tokio::sync::mpsc::unbounded_channel(); + let (evt_p_send, evt_p_recv) = tokio::sync::mpsc::channel(CHANNEL_SIZE); + let (evt_c_send, evt_c_recv) = tokio::sync::mpsc::channel(CHANNEL_SIZE); + let (cmd_send, cmd_recv) = tokio::sync::mpsc::channel(CHANNEL_SIZE); let ref_dummy = Arc::new(()); @@ -281,20 +284,23 @@ impl ConnectionMgr { ) } - fn handle_connect_data(&mut self, uri: Lib3hUri, wss: InStreamWss) { + async fn handle_connect_data(&mut self, uri: Lib3hUri, wss: InStreamWss) { debug!(?uri); let cmd_send = spawn_wss_task(uri.clone(), wss, self.evt_send_from_children.clone()); - if let Some(old) = self.wss_map.insert(uri.clone(), cmd_send) { + if let Some(mut old) = self.wss_map.insert(uri.clone(), cmd_send) { error!("REPLACING ACTIVE CONNECTION: {}", uri); - let _ = old.send(ConMgrCommand::Disconnect(uri)); + let _ = old.send(ConMgrCommand::Disconnect(uri)).await; } } - fn handle_send_data(&mut self, uri: Lib3hUri, frame: WsFrame) { + async fn handle_send_data(&mut self, uri: Lib3hUri, frame: WsFrame) { debug!(?uri); let mut remove = false; - if let Some(cmd_send) = self.wss_map.get(&uri) { - if let Err(_) = cmd_send.send(ConMgrCommand::SendData(uri.clone(), frame)) { + if let Some(cmd_send) = self.wss_map.get_mut(&uri) { + if let Err(_) = cmd_send + .send(ConMgrCommand::SendData(uri.clone(), frame)) + .await + { tracing::error!(?uri); remove = true; } @@ -304,7 +310,7 @@ impl ConnectionMgr { } } - fn process_sent_cmds( + async fn process_sent_cmds( &mut self, cmd_count: &mut u64, did_work: &mut bool, @@ -315,15 +321,17 @@ impl ConnectionMgr { *cmd_count += 1; *did_work = true; match cmd { - ConMgrCommand::SendData(uri, frame) => self.handle_send_data(uri, frame), + ConMgrCommand::SendData(uri, frame) => { + self.handle_send_data(uri, frame).await + } ConMgrCommand::Disconnect(uri) => { - if let Some(cmd_send) = self.wss_map.remove(&uri) { + if let Some(mut cmd_send) = self.wss_map.remove(&uri) { tracing::error!(?uri); - let _ = cmd_send.send(ConMgrCommand::Disconnect(uri)); + let _ = cmd_send.send(ConMgrCommand::Disconnect(uri)).await; } } ConMgrCommand::Connect(uri, wss) => { - self.handle_connect_data(uri, wss); + self.handle_connect_data(uri, wss).await; } ConMgrCommand::ListConnections(respond) => { let _ = respond.send(self.wss_map.keys().cloned().collect()); @@ -341,7 +349,7 @@ impl ConnectionMgr { None } - fn process_child_cmds( + async fn process_child_cmds( &mut self, recv_count: &mut u64, did_work: &mut bool, @@ -353,12 +361,13 @@ impl ConnectionMgr { *did_work = true; match evt { ConMgrEvent::Disconnect(uri, maybe_err) => { - if let Some(cmd_send) = self.wss_map.remove(&uri) { + if let Some(mut cmd_send) = self.wss_map.remove(&uri) { let _ = cmd_send.send(ConMgrCommand::Disconnect(uri.clone())); } if let Err(_) = self .evt_send_to_parent .send(ConMgrEvent::Disconnect(uri, maybe_err)) + .await { // channel broken, end task return Some(EndTask); @@ -366,7 +375,7 @@ impl ConnectionMgr { } evt @ _ => { // just forward - if let Err(e) = self.evt_send_to_parent.send(evt) { + if let Err(e) = self.evt_send_to_parent.send(evt).await { // channel broken, end task tracing::error!(?e); return Some(EndTask); @@ -386,7 +395,7 @@ impl ConnectionMgr { } /// internal check our channels - fn process(&mut self) -> ConMgrResult { + async fn process(&mut self) -> ConMgrResult { let span = debug_span!("process"); let _g = span.enter(); @@ -400,14 +409,17 @@ impl ConnectionMgr { let c_count = self.wss_map.len(); // first, if any of our handles sent commands / process a batch of them - if let Some(r) = self.process_sent_cmds(&mut cmd_count, &mut did_work) { + if let Some(r) = self.process_sent_cmds(&mut cmd_count, &mut did_work).await { return r; } // next, if any of our child wss_tasks sent info, process it // mostly we just need to know if any are disconnected. // we forward all other messages - if let Some(r) = self.process_child_cmds(&mut recv_count, &mut did_work) { + if let Some(r) = self + .process_child_cmds(&mut recv_count, &mut did_work) + .await + { return r; } @@ -458,27 +470,34 @@ impl ConnectionMgrHandle { #[tracing::instrument(skip(self))] /// send in a websocket connection to be managed - pub fn connect(&self, uri: Lib3hUri, wss: TcpWss) { + pub async fn connect(&self, uri: Lib3hUri, wss: TcpWss) { debug!(?uri); - if let Err(e) = self.send_cmd.send(ConMgrCommand::Connect(uri, wss)) { + let mut send_cmd = self.send_cmd.clone(); + if let Err(e) = send_cmd.send(ConMgrCommand::Connect(uri, wss)).await { tracing::error!("failed to send on channel - shutting down? {:?}", e); } } #[tracing::instrument(skip(self, frame))] /// send data to a managed websocket connection - pub fn send_data(&self, uri: Lib3hUri, frame: WsFrame) { + pub async fn send_data(&self, uri: Lib3hUri, frame: WsFrame) { debug!(?uri); - if let Err(e) = self.send_cmd.send(ConMgrCommand::SendData(uri, frame)) { + if let Err(e) = self + .send_cmd + .clone() + .send(ConMgrCommand::SendData(uri, frame)) + .await + { error!("failed to send on channel - shutting down? {:?}", e); } } #[tracing::instrument(skip(self))] /// disconnect and forget about a managed websocket connection - pub fn disconnect(&self, uri: Lib3hUri) { + pub async fn disconnect(&self, uri: Lib3hUri) { debug!(?uri); - if let Err(e) = self.send_cmd.send(ConMgrCommand::Disconnect(uri)) { + let mut send_cmd = self.send_cmd.clone(); + if let Err(e) = send_cmd.send(ConMgrCommand::Disconnect(uri)).await { error!("failed to send on channel - shutting down? {:?}", e); } } @@ -487,7 +506,8 @@ impl ConnectionMgrHandle { /// disconnect and forget about a managed websocket connection pub async fn list_connections(&self) -> Vec { let (s, r) = tokio::sync::oneshot::channel(); - if let Err(e) = self.send_cmd.send(ConMgrCommand::ListConnections(s)) { + let mut send_cmd = self.send_cmd.clone(); + if let Err(e) = send_cmd.send(ConMgrCommand::ListConnections(s)).await { error!("failed to send on channel - shutting down? {:?}", e); return vec![]; } diff --git a/crates/sim2h/src/lib.rs b/crates/sim2h/src/lib.rs index 8dd744eb21..1bbc15db32 100755 --- a/crates/sim2h/src/lib.rs +++ b/crates/sim2h/src/lib.rs @@ -59,6 +59,7 @@ const NO_MESSAGE_CONNECTION_TIMEOUT_MS: u64 = 30000; /// use the default 0 seed for xxHash pub const RECEIPT_HASH_SEED: u64 = 0; +pub const CHANNEL_SIZE: usize = 1000; /// Generates a u64 hash response for an `Ack` message given input bytes pub fn generate_ack_receipt_hash(payload: &Opaque) -> u64 { @@ -134,12 +135,12 @@ fn open_lifecycle(desc: &str, uuid: &str, uri: &Lib3hUri) { #[derive(Clone)] struct MetricsTimerGenerator { - sender: tokio::sync::mpsc::UnboundedSender<(&'static str, f64)>, + sender: tokio::sync::mpsc::Sender<(&'static str, f64)>, } impl MetricsTimerGenerator { pub fn new() -> (Self, BoxFuture<'static, ()>) { - let (sender, mut recv) = tokio::sync::mpsc::unbounded_channel::<(&'static str, f64)>(); + let (sender, mut recv) = tokio::sync::mpsc::channel::<(&'static str, f64)>(CHANNEL_SIZE); let out = async move { let metric_publisher = MetricPublisherConfig::default().create_metric_publisher(); 'metric_loop: loop { @@ -168,14 +169,11 @@ impl MetricsTimerGenerator { struct MetricsTimer { tag: &'static str, create_time: std::time::Instant, - sender: tokio::sync::mpsc::UnboundedSender<(&'static str, f64)>, + sender: tokio::sync::mpsc::Sender<(&'static str, f64)>, } impl MetricsTimer { - pub fn new( - tag: &'static str, - sender: tokio::sync::mpsc::UnboundedSender<(&'static str, f64)>, - ) -> Self { + pub fn new(tag: &'static str, sender: tokio::sync::mpsc::Sender<(&'static str, f64)>) -> Self { Self { tag, create_time: std::time::Instant::now(), @@ -194,7 +192,7 @@ impl Drop for MetricsTimer { } else if elapsed >= 10.0 { info!("metric - {} - {} ms", self.tag, elapsed); } - if let Err(e) = self.sender.send((self.tag, elapsed)) { + if let Err(e) = self.sender.try_send((self.tag, elapsed)) { error!( "failed to send metric - shutting down? {} {:?}", self.tag, e @@ -274,14 +272,15 @@ impl Sim2hHandle { } /// send a message to another connected agent - pub fn send(&self, agent: AgentId, uri: Lib3hUri, msg: &WireMessage) { + pub async fn send(&self, agent: AgentId, uri: Lib3hUri, msg: &WireMessage) { debug!(">>OUT>> {} to {}", msg.message_type(), uri); - MESSAGE_LOGGER - .lock() - .log_out(agent, uri.clone(), msg.clone()); + if let Some(mut ml) = MESSAGE_LOGGER.try_lock() { + ml.log_out(agent, uri.clone(), msg.clone()); + } let payload: Opaque = msg.clone().into(); self.connection_mgr - .send_data(uri, payload.as_bytes().into()); + .send_data(uri, payload.as_bytes().into()) + .await } /// get access to our im_state object @@ -291,8 +290,8 @@ impl Sim2hHandle { /// Notify core/sim2h_worker that we have processed the current message /// sufficiently, and are ready to receive another message. - pub fn send_receipt(&self, receipt: &WireMessage, source: &AgentId, url: &Lib3hUri) { - self.send(source.clone(), url.clone(), receipt); + pub async fn send_receipt(&self, receipt: &WireMessage, source: &AgentId, url: &Lib3hUri) { + self.send(source.clone(), url.clone(), receipt).await; } /// forward a message to be handled @@ -366,7 +365,7 @@ impl Sim2hHandle { uri, message.message_type() ); - sim2h_handle.disconnect(vec![uri.clone()]); + sim2h_handle.disconnect(vec![uri.clone()]).await; return; } }; @@ -379,11 +378,11 @@ impl Sim2hHandle { return; } - sim2h_handle.send_receipt(&receipt, &signer, &uri); + sim2h_handle.send_receipt(&receipt, &signer, &uri).await; match message { WireMessage::ClientToLib3h(ht::EncodedSpanWrap { data, .. }) => { - return client_to_lib3h(data, uri, sim2h_handle, signer, space_hash); + return client_to_lib3h(data, uri, sim2h_handle, signer, space_hash).await; } WireMessage::Lib3hToClientResponse(ht::EncodedSpanWrap { data, .. }) => { return lib3h_to_client_response(data, uri, sim2h_handle, signer, space_hash); @@ -397,16 +396,16 @@ impl Sim2hHandle { } /// disconnect an active connection - pub fn disconnect(&self, disconnect: Vec) { + pub async fn disconnect(&self, disconnect: Vec) { for d in disconnect.iter() { self.state().spawn_drop_connection_by_uri(d.clone()); - self.connection_mgr.disconnect(d.clone()); + self.connection_mgr.disconnect(d.clone()).await; } } } #[instrument(skip(data, sim2h_handle))] -fn client_to_lib3h( +async fn client_to_lib3h( data: ClientToLib3h, uri: Lib3hUri, sim2h_handle: Sim2hHandle, @@ -416,7 +415,7 @@ fn client_to_lib3h( match data { ClientToLib3h::LeaveSpace(_data) => { // for now, just disconnect on LeaveSpace - sim2h_handle.disconnect(vec![uri.clone()]); + sim2h_handle.disconnect(vec![uri.clone()]).await; } ClientToLib3h::SendDirectMessage(dm_data) => { return spawn_handle_message_send_dm(sim2h_handle, uri, signer, space_hash, dm_data); @@ -516,14 +515,13 @@ fn spawn_handle_message_ping( signer: AgentId, receipt: WireMessage, ) { - /* + debug!("Sending Pong in response to Ping"); tokio::task::spawn(async move { + sim2h_handle + .send(signer.clone(), uri.clone(), &WireMessage::Pong) + .await; + sim2h_handle.send_receipt(&receipt, &signer, &uri).await; }); - */ - // no processing here, don't bother actually spawning - debug!("Sending Pong in response to Ping"); - sim2h_handle.send(signer.clone(), uri.clone(), &WireMessage::Pong); - sim2h_handle.send_receipt(&receipt, &signer, &uri); } fn spawn_handle_message_status( @@ -539,21 +537,23 @@ fn spawn_handle_message_status( for (_, space) in state.spaces.iter() { joined_connections += space.connections.len(); } - sim2h_handle.send( - signer.clone(), - uri.clone(), - &WireMessage::StatusResponse(StatusData { - spaces: state.spaces_count(), - connections: sim2h_handle.connection_count.get().await, - joined_connections, - redundant_count: match sim2h_handle.dht_algorithm() { - DhtAlgorithm::FullSync => 0, - DhtAlgorithm::NaiveSharding { redundant_count } => *redundant_count, - }, - version: WIRE_VERSION, - }), - ); - sim2h_handle.send_receipt(&receipt, &signer, &uri); + sim2h_handle + .send( + signer.clone(), + uri.clone(), + &WireMessage::StatusResponse(StatusData { + spaces: state.spaces_count(), + connections: sim2h_handle.connection_count.get().await, + joined_connections, + redundant_count: match sim2h_handle.dht_algorithm() { + DhtAlgorithm::FullSync => 0, + DhtAlgorithm::NaiveSharding { redundant_count } => *redundant_count, + }, + version: WIRE_VERSION, + }), + ) + .await; + sim2h_handle.send_receipt(&receipt, &signer, &uri).await; }); } @@ -571,6 +571,7 @@ fn spawn_handle_message_debug( let json = serde_json::to_string(&space).expect("Space must be serializable"); response_map.insert((**hash).clone(), json.clone()); let filename = format!("{}.json", **hash); + // FIXME should be async file write if let Ok(mut file) = File::create(filename.clone()) { file.write_all(json.into_bytes().as_slice()) .unwrap_or_else(|_| error!("Could not write to file {}!", filename)) @@ -580,12 +581,14 @@ fn spawn_handle_message_debug( } let connection_list = sim2h_handle.connection_mgr().list_connections().await; let extra_data = format!("LIST_CONNECTIONS: {:#?}", connection_list); - sim2h_handle.send( - signer.clone(), - uri.clone(), - &WireMessage::DebugResponse((response_map, extra_data)), - ); - sim2h_handle.send_receipt(&receipt, &signer, &uri); + sim2h_handle + .send( + signer.clone(), + uri.clone(), + &WireMessage::DebugResponse((response_map, extra_data)), + ) + .await; + sim2h_handle.send_receipt(&receipt, &signer, &uri).await; }); } @@ -596,33 +599,32 @@ fn spawn_handle_message_hello( version: u32, receipt: WireMessage, ) { - /* - tokio::task::spawn(async move { - }); - */ - // no processing here, don't bother actually spawning debug!("Sending HelloResponse in response to Hello({})", version); - sim2h_handle.send( - signer.clone(), - uri.clone(), - &WireMessage::HelloResponse(HelloData { - redundant_count: match sim2h_handle.dht_algorithm() { - DhtAlgorithm::FullSync => 0, - DhtAlgorithm::NaiveSharding { redundant_count } => *redundant_count, - }, - version: WIRE_VERSION, - extra: None, - }), - ); - sim2h_handle.send_receipt(&receipt, &signer, &uri); - // versions do not match - disconnect them - if version != WIRE_VERSION { - warn!( + tokio::task::spawn(async move { + sim2h_handle + .send( + signer.clone(), + uri.clone(), + &WireMessage::HelloResponse(HelloData { + redundant_count: match sim2h_handle.dht_algorithm() { + DhtAlgorithm::FullSync => 0, + DhtAlgorithm::NaiveSharding { redundant_count } => *redundant_count, + }, + version: WIRE_VERSION, + extra: None, + }), + ) + .await; + sim2h_handle.send_receipt(&receipt, &signer, &uri).await; + // versions do not match - disconnect them + if version != WIRE_VERSION { + warn!( "Disconnecting client for bad version this WIRE_VERSION = {}, client WIRE_VERSION = {}", WIRE_VERSION, version ); - sim2h_handle.disconnect(vec![uri]); - } + sim2h_handle.disconnect(vec![uri]).await; + } + }); } #[tracing::instrument(level = "info", skip(sim2h_handle))] @@ -635,6 +637,7 @@ async fn handle_message_join_space( ) { sim2h_handle .state() + .clone() .new_connection( data.space_address.clone(), data.agent_id.clone(), @@ -642,39 +645,43 @@ async fn handle_message_join_space( ) .await; - sim2h_handle.send_receipt(&receipt, &signer, &uri); + sim2h_handle.send_receipt(&receipt, &signer, &uri).await; - sim2h_handle.send( - data.agent_id.clone(), - uri.clone(), - &WireMessage::Lib3hToClient( - ht::span_wrap_encode!( - Level::INFO, - Lib3hToClient::HandleGetGossipingEntryList(GetListData { - request_id: "".into(), - space_address: data.space_address.clone(), - provider_agent_id: data.agent_id.clone(), - }) - ) - .into(), - ), - ); + sim2h_handle + .send( + data.agent_id.clone(), + uri.clone(), + &WireMessage::Lib3hToClient( + ht::span_wrap_encode!( + Level::INFO, + Lib3hToClient::HandleGetGossipingEntryList(GetListData { + request_id: "".into(), + space_address: data.space_address.clone(), + provider_agent_id: data.agent_id.clone(), + }) + ) + .into(), + ), + ) + .await; - sim2h_handle.send( - data.agent_id.clone(), - uri, - &WireMessage::Lib3hToClient( - ht::span_wrap_encode!( - Level::INFO, - Lib3hToClient::HandleGetAuthoringEntryList(GetListData { - request_id: "".into(), - space_address: data.space_address.clone(), - provider_agent_id: data.agent_id, - }) - ) - .into(), - ), - ); + sim2h_handle + .send( + data.agent_id.clone(), + uri, + &WireMessage::Lib3hToClient( + ht::span_wrap_encode!( + Level::INFO, + Lib3hToClient::HandleGetAuthoringEntryList(GetListData { + request_id: "".into(), + space_address: data.space_address.clone(), + provider_agent_id: data.agent_id, + }) + ) + .into(), + ), + ) + .await; } fn inner_spawn_handle_message_send_dmx( @@ -701,7 +708,9 @@ fn inner_spawn_handle_message_send_dmx( return; } }; - sim2h_handle.send(to_agent_id, to_url.clone(), &message); + sim2h_handle + .send(to_agent_id, to_url.clone(), &message) + .await; }); } @@ -799,7 +808,9 @@ fn spawn_handle_message_publish_entry( for agent_id in send_to { if let Some(uri) = state.lookup_joined(&space_hash, &agent_id) { - sim2h_handle.send((&*agent_id).clone(), uri.clone(), &multi_message); + sim2h_handle + .send((&*agent_id).clone(), uri.clone(), &multi_message) + .await; } sim2h_handle.state().spawn_agent_holds_aspects( (&*space_hash).clone(), @@ -896,7 +907,7 @@ fn spawn_handle_message_authoring_entry_list( if !multi_message.is_empty() { let multi_send = WireMessage::MultiSend(multi_message); - sim2h_handle.send(signer, uri, &multi_send); + sim2h_handle.send(signer, uri, &multi_send).await; } } .instrument(debug_span!("authoring_entry")), @@ -965,7 +976,9 @@ fn spawn_handle_message_fetch_entry_result( let multi_send = WireMessage::MultiSend(multi_message); - sim2h_handle.send((&*agent_id).clone(), (&*uri).clone(), &multi_send); + sim2h_handle + .send((&*agent_id).clone(), (&*uri).clone(), &multi_send) + .await; for (entry_hash, aspects) in holding.drain() { sim2h_handle.state().spawn_agent_holds_aspects( @@ -1021,7 +1034,9 @@ fn spawn_handle_message_query_entry( ht::span_wrap_encode!(Level::INFO, Lib3hToClient::HandleQueryEntry(query_data)) .into(), ); - sim2h_handle.send((&*query_target).clone(), url.clone(), &query_message); + sim2h_handle + .send((&*query_target).clone(), url.clone(), &query_message) + .await; } .instrument(debug_span!("message_query")), ); @@ -1061,7 +1076,9 @@ fn spawn_handle_message_query_entry_result( return; } }; - sim2h_handle.send(req_agent_id, to_url.clone(), &msg_out); + sim2h_handle + .send(req_agent_id, to_url.clone(), &msg_out) + .await; } .instrument(debug_span!("handle_message_query_entry_result")), ); @@ -1189,7 +1206,7 @@ impl Sim2h { let (connection_mgr, connection_mgr_evt_recv, connection_count) = ConnectionMgr::new(); - let (wss_send, wss_recv) = crossbeam_channel::unbounded(); + let (wss_send, wss_recv) = crossbeam_channel::bounded(CHANNEL_SIZE); let sim2h_handle = Sim2hHandle::new( crypto.box_clone(), dht_algorithm, @@ -1262,7 +1279,7 @@ impl Sim2h { open_lifecycle("adding conn job", &uuid, &url); - sim2h_handle.connection_mgr().connect(url, wss); + sim2h_handle.connection_mgr().connect(url, wss).await; } }); } @@ -1276,7 +1293,10 @@ impl Sim2h { "dropping connection to {} because of error: {:?}", uri, error, ); - self.sim2h_handle.disconnect(vec![uri]); + let sim2h_handle = self.sim2h_handle.clone(); + tokio::task::spawn(async move { + sim2h_handle.disconnect(vec![uri]).await; + }); } /// if our connections sent us any data, process it @@ -1312,7 +1332,10 @@ impl Sim2h { } if !disconnect.is_empty() { - self.sim2h_handle.disconnect(disconnect); + let sim2h_handle = self.sim2h_handle.clone(); + tokio::task::spawn(async move { + sim2h_handle.disconnect(disconnect).await; + }); } trace!( @@ -1345,7 +1368,10 @@ impl Sim2h { WsFrame::Pong(_) => (), WsFrame::Close(c) => { debug!("Disconnecting {} after connection reset {:?}", uri, c); - self.sim2h_handle.disconnect(vec![uri]); + let sim2h_handle = self.sim2h_handle.clone(); + tokio::task::spawn(async move { + sim2h_handle.disconnect(vec![uri]).await; + }); } } } @@ -1373,7 +1399,7 @@ impl Sim2h { "Could not verify payload from {}!\nError: {:?}\nPayload was: {:?}", url, error, payload ); - sim2h_handle.disconnect(vec![url]); + sim2h_handle.disconnect(vec![url]).await; } } }); @@ -1422,7 +1448,7 @@ impl Sim2h { loop { tokio::time::delay_for(std::time::Duration::from_millis(500)).await; let disconnect_uri = sim2h_handle.state().check_disconnected().await; - sim2h_handle.disconnect(disconnect_uri); + sim2h_handle.disconnect(disconnect_uri).await; } }); } @@ -1543,6 +1569,13 @@ fn fetch_entry_data( ht::span_wrap_encode!(tracing::Level::INFO, Lib3hToClient::HandleFetchEntry(s)).into() }); - sim2h_handle.send((&*query_agent).clone(), (&*uri).clone(), &wire_message); + tokio::task::spawn({ + let sim2h_handle = sim2h_handle.clone(); + let agent = (&*query_agent).clone(); + let uri = (&*uri).clone(); + async move { + sim2h_handle.send(agent, uri, &wire_message).await; + } + }); } } diff --git a/crates/sim2h/src/sim2h_im_state.rs b/crates/sim2h/src/sim2h_im_state.rs index fc3f6cc2a9..d3e09e3e40 100644 --- a/crates/sim2h/src/sim2h_im_state.rs +++ b/crates/sim2h/src/sim2h_im_state.rs @@ -522,7 +522,7 @@ impl Store { redundancy: u64, gossip_interval: Option, ) -> StoreHandle { - let (send_mut, mut recv_mut) = tokio::sync::mpsc::unbounded_channel(); + let (send_mut, mut recv_mut) = tokio::sync::mpsc::channel(CHANNEL_SIZE); let ref_dummy = Arc::new(()); @@ -876,7 +876,7 @@ pub struct StoreHandle { _ref_dummy: Arc<()>, // clone ref clone_ref: Arc>, - send_mut: tokio::sync::mpsc::UnboundedSender, + send_mut: tokio::sync::mpsc::Sender, con_incr: Arc, } @@ -884,7 +884,7 @@ impl StoreHandle { fn new( ref_dummy: Arc<()>, clone_ref: Arc>, - send_mut: tokio::sync::mpsc::UnboundedSender, + send_mut: tokio::sync::mpsc::Sender, con_incr: Arc, ) -> Self { Self { @@ -905,7 +905,7 @@ impl StoreHandle { space_hash: SpaceHash, agent_id: AgentId, uri: Lib3hUri, - ) -> BoxFuture<'static, ()> { + ) -> BoxFuture<()> { let (sender, receiver) = tokio::sync::oneshot::channel(); let msg = StoreProto::Mutate( AolEntry::NewConnection { @@ -916,12 +916,14 @@ impl StoreHandle { }, sender, ); - if let Err(_) = self.send_mut.send(msg) { - error!("failed to send im store message - shutting down?"); - return async { () }.boxed(); - } + let mut send_mut = self.send_mut.clone(); async move { - let _ = receiver.await; + if let Err(_) = send_mut.send(msg).await { + error!("failed to send im store message - shutting down?"); + //async { () }.boxed() + } else { + let _ = receiver.await; + } } .boxed() } @@ -953,55 +955,57 @@ impl StoreHandle { */ #[must_use] - pub fn drop_connection_by_uri(&self, uri: Lib3hUri) -> BoxFuture<'static, ()> { - let (sender, receiver) = tokio::sync::oneshot::channel(); - if let Err(_) = self.send_mut.send(StoreProto::Mutate( - AolEntry::DropConnectionByUri { - aol_idx: self.con_incr.inc(), - uri, - }, - sender, - )) { - error!("failed to send im store message - shutting down?"); - return async { () }.boxed(); - } - async move { - let _ = receiver.await; - } - .boxed() - } - - pub fn spawn_drop_connection_by_uri(&self, uri: Lib3hUri) { - let f = self.drop_connection_by_uri(uri); - tokio::task::spawn(f); - } - - #[must_use] - pub fn agent_holds_aspects( - &self, + pub async fn agent_holds_aspects( + mut self, space_hash: SpaceHash, agent_id: AgentId, entry_hash: EntryHash, aspects: im::HashSet, - ) -> BoxFuture<'static, ()> { + ) { let (sender, receiver) = tokio::sync::oneshot::channel(); - if let Err(_) = self.send_mut.send(StoreProto::Mutate( - AolEntry::AgentHoldsAspects { - aol_idx: self.con_incr.inc(), - space_hash, - agent_id, - entry_hash, - aspects, - }, - sender, - )) { + if let Err(_) = self + .send_mut + .send(StoreProto::Mutate( + AolEntry::AgentHoldsAspects { + aol_idx: self.con_incr.inc(), + space_hash, + agent_id, + entry_hash, + aspects, + }, + sender, + )) + .await + { error!("failed to send im store message - shutting down?"); - return async { () }.boxed(); + return; } - async move { - let _ = receiver.await; + let _ = receiver.await; + } + #[must_use] + pub async fn drop_connection_by_uri(mut self, uri: Lib3hUri) { + let (sender, receiver) = tokio::sync::oneshot::channel(); + + if let Err(_) = self + .send_mut + .send(StoreProto::Mutate( + AolEntry::DropConnectionByUri { + aol_idx: self.con_incr.inc(), + uri, + }, + sender, + )) + .await + { + error!("failed to send im store message - shutting down?"); + return; } - .boxed() + let _ = receiver.await; + } + pub fn spawn_drop_connection_by_uri(&self, uri: Lib3hUri) { + let store_handle = self.clone(); + let f = store_handle.drop_connection_by_uri(uri); + tokio::task::spawn(f); } pub fn spawn_agent_holds_aspects( @@ -1011,7 +1015,8 @@ impl StoreHandle { entry_hash: EntryHash, aspects: im::HashSet, ) { - let f = self.agent_holds_aspects(space_hash, agent_id, entry_hash, aspects); + let store_handle = self.clone(); + let f = store_handle.agent_holds_aspects(space_hash, agent_id, entry_hash, aspects); tokio::task::spawn(f); } @@ -1019,13 +1024,17 @@ impl StoreHandle { pub async fn check_gossip(&self) -> CheckGossipData { let (sender, receiver) = tokio::sync::oneshot::channel(); let (sender_c, receiver_c) = tokio::sync::oneshot::channel(); - if let Err(_) = self.send_mut.send(StoreProto::Mutate( - AolEntry::CheckGossip { - aol_idx: self.con_incr.inc(), - response: sender, - }, - sender_c, - )) { + let mut send_mut = self.send_mut.clone(); + if let Err(_) = send_mut + .send(StoreProto::Mutate( + AolEntry::CheckGossip { + aol_idx: self.con_incr.inc(), + response: sender, + }, + sender_c, + )) + .await + { error!("failed to send im store message - shutting down?"); // we're probably shutting down, prevent panic!s // note this future will never resolve - because it cannot @@ -1038,13 +1047,17 @@ impl StoreHandle { pub async fn check_disconnected(&self) -> Vec { let (sender, receiver) = tokio::sync::oneshot::channel(); let (sender_c, receiver_c) = tokio::sync::oneshot::channel(); - if let Err(_) = self.send_mut.send(StoreProto::Mutate( - AolEntry::CheckDisconnected { - aol_idx: self.con_incr.inc(), - response: sender, - }, - sender_c, - )) { + let mut send_mut = self.send_mut.clone(); + if let Err(_) = send_mut + .send(StoreProto::Mutate( + AolEntry::CheckDisconnected { + aol_idx: self.con_incr.inc(), + response: sender, + }, + sender_c, + )) + .await + { error!("failed to send im store message - shutting down?"); // we're probably shutting down, prevent panic!s // note this future will never resolve - because it cannot @@ -1126,6 +1139,7 @@ mod tests { debug!("GOT: {:#?}", store.get_clone().await); store + .clone() .agent_holds_aspects( space_hash.clone(), aid1.clone(), @@ -1134,6 +1148,7 @@ mod tests { ) .await; store + .clone() .agent_holds_aspects( space_hash.clone(), aid2.clone(), @@ -1167,7 +1182,7 @@ mod tests { debug!("--- end check missing ---"); //store.drop_connection(space_hash.clone(), aid1.clone()); - store.drop_connection_by_uri(uri1.clone()).await; + store.clone().drop_connection_by_uri(uri1.clone()).await; debug!("GOT: {:#?}", store.get_clone().await); } @@ -1198,6 +1213,7 @@ mod tests { .await; store + .clone() .agent_holds_aspects( space_hash.clone(), aid1.clone(), @@ -1207,6 +1223,7 @@ mod tests { .await; store + .clone() .agent_holds_aspects( space_hash.clone(), aid1.clone(), @@ -1257,6 +1274,7 @@ mod tests { debug!("GOT: {:#?}", store.get_clone().await); store + .clone() .agent_holds_aspects( space_hash.clone(), aid1.clone(), diff --git a/crates/sim2h/src/websocket/mem_stream.rs b/crates/sim2h/src/websocket/mem_stream.rs index 6f5fb41eec..46caf220b1 100644 --- a/crates/sim2h/src/websocket/mem_stream.rs +++ b/crates/sim2h/src/websocket/mem_stream.rs @@ -1,3 +1,4 @@ +use crate::CHANNEL_SIZE; use holochain_tracing_macros::newrelic_autotrace; use lazy_static::lazy_static; use lib3h_zombie_actor::GhostMutex; @@ -179,8 +180,8 @@ fn random_url(prefix: &str) -> Url2 { /// private stream pair constructor, these streams can message each other fn create_mem_stream_pair(url_a: Url2, url_b: Url2) -> (MemStream, MemStream) { - let (send1, recv1) = crossbeam_channel::unbounded(); - let (send2, recv2) = crossbeam_channel::unbounded(); + let (send1, recv1) = crossbeam_channel::bounded(CHANNEL_SIZE); + let (send2, recv2) = crossbeam_channel::bounded(CHANNEL_SIZE); ( MemStream::priv_new(url_a, send1, recv2), MemStream::priv_new(url_b, send2, recv1), @@ -231,7 +232,7 @@ impl MemManager { Entry::Occupied(_) => Err(std::io::ErrorKind::AddrInUse.into()), Entry::Vacant(e) => { // the url is not in use, let's create a new listener - let (send, recv) = crossbeam_channel::unbounded(); + let (send, recv) = crossbeam_channel::bounded(CHANNEL_SIZE); e.insert(send); Ok(MemListener::priv_new(new_url, recv)) } diff --git a/crates/stress/src/bin/sim2h_stress.rs b/crates/stress/src/bin/sim2h_stress.rs index 9fa6074736..c26b179c54 100644 --- a/crates/stress/src/bin/sim2h_stress.rs +++ b/crates/stress/src/bin/sim2h_stress.rs @@ -601,8 +601,8 @@ impl Suite { /// create a new sim2h server instance on given port #[allow(clippy::mutex_atomic)] pub fn new(port: u16) -> Self { - let (snd1, rcv1) = crossbeam_channel::unbounded(); - let (snd2, rcv2) = crossbeam_channel::unbounded::(); + let (snd1, rcv1) = crossbeam_channel::bounded(CHANNEL_SIZE); + let (snd2, rcv2) = crossbeam_channel::bounded::(CHANNEL_SIZE); let sim2h_cont = Arc::new(Mutex::new(true)); let sim2h_cont_clone = sim2h_cont.clone(); diff --git a/crates/stress/src/lib.rs b/crates/stress/src/lib.rs index 665a44c2d0..1b38bf177c 100644 --- a/crates/stress/src/lib.rs +++ b/crates/stress/src/lib.rs @@ -7,6 +7,8 @@ use std::{ sync::{Arc, Mutex}, }; +const CHANNEL_SIZE: usize = 100; + /// utitily for recording stress test metrics #[derive(Clone)] pub struct StressJobMetricLogger { @@ -149,8 +151,8 @@ impl StressRunner { /// private stress runner constructor #[allow(clippy::mutex_atomic)] fn priv_new(config: StressRunConfig) -> Self { - let (job_send, job_recv) = crossbeam_channel::unbounded(); - let (log_send, log_recv) = crossbeam_channel::unbounded(); + let (job_send, job_recv) = crossbeam_channel::bounded(CHANNEL_SIZE); + let (log_send, log_recv) = crossbeam_channel::bounded(CHANNEL_SIZE); let warmup_target = std::time::Instant::now() .checked_add(std::time::Duration::from_millis(config.warm_time_ms))