Skip to content

Commit 6837874

Browse files
committed
fix: get_connectivity(): Get rid of locking SchedulerState::inner (#7124)
`get_connectivity()` is expected to return immediately, not when the scheduler finishes updating its state in `start_io()/stop_io()/pause_io()`, otherwise it causes app non-responsiveness. Instead of read-locking `SchedulerState::inner`, store the `ConnectivityStore` collection in `Context` and fetch it from there in `get_connectivity()`. Update it every time we release a write lock on `SchedulerState::inner`.
1 parent 3656337 commit 6837874

File tree

6 files changed

+44
-33
lines changed

6 files changed

+44
-33
lines changed

src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ impl Context {
734734
Self::check_config(key, value)?;
735735

736736
let _pause = match key.needs_io_restart() {
737-
true => self.scheduler.pause(self.clone()).await?,
737+
true => self.scheduler.pause(self).await?,
738738
_ => Default::default(),
739739
};
740740
self.set_config_internal(key, value).await?;

src/context.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::param::{Param, Params};
3434
use crate::peer_channels::Iroh;
3535
use crate::push::PushSubscriber;
3636
use crate::quota::QuotaInfo;
37-
use crate::scheduler::{SchedulerState, convert_folder_meaning};
37+
use crate::scheduler::{ConnectivityStore, SchedulerState, convert_folder_meaning};
3838
use crate::sql::Sql;
3939
use crate::stock_str::StockStrings;
4040
use crate::timesmearing::SmearedTimestamp;
@@ -304,6 +304,10 @@ pub struct InnerContext {
304304
/// tokio::sync::OnceCell would be possible to use, but overkill for our usecase;
305305
/// the standard library's OnceLock is enough, and it's a lot smaller in memory.
306306
pub(crate) self_fingerprint: OnceLock<String>,
307+
308+
/// `Connectivity` values for mailboxes, unordered. Used to compute the aggregate connectivity,
309+
/// see [`Context::get_connectivity()`].
310+
pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
307311
}
308312

309313
/// The state of ongoing process.
@@ -473,6 +477,7 @@ impl Context {
473477
push_subscribed: AtomicBool::new(false),
474478
iroh: Arc::new(RwLock::new(None)),
475479
self_fingerprint: OnceLock::new(),
480+
connectivities: parking_lot::Mutex::new(Vec::new()),
476481
};
477482

478483
let ctx = Context {
@@ -502,7 +507,7 @@ impl Context {
502507
// Now, some configs may have changed, so, we need to invalidate the cache.
503508
self.sql.config_cache.write().await.clear();
504509

505-
self.scheduler.start(self.clone()).await;
510+
self.scheduler.start(self).await;
506511
}
507512

508513
/// Stops the IO scheduler.
@@ -579,7 +584,7 @@ impl Context {
579584
} else {
580585
// Pause the scheduler to ensure another connection does not start
581586
// while we are fetching on a dedicated connection.
582-
let _pause_guard = self.scheduler.pause(self.clone()).await?;
587+
let _pause_guard = self.scheduler.pause(self).await?;
583588

584589
// Start a new dedicated connection.
585590
let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;

src/imex.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub async fn imex(
9090
let cancel = context.alloc_ongoing().await?;
9191

9292
let res = {
93-
let _guard = context.scheduler.pause(context.clone()).await?;
93+
let _guard = context.scheduler.pause(context).await?;
9494
imex_inner(context, what, path, passphrase)
9595
.race(async {
9696
cancel.recv().await.ok();

src/imex/transfer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl BackupProvider {
105105

106106
// Acquire global "ongoing" mutex.
107107
let cancel_token = context.alloc_ongoing().await?;
108-
let paused_guard = context.scheduler.pause(context.clone()).await?;
108+
let paused_guard = context.scheduler.pause(context).await?;
109109
let context_dir = context
110110
.get_blobdir()
111111
.parent()

src/scheduler.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use async_channel::{self as channel, Receiver, Sender};
88
use futures::future::try_join_all;
99
use futures_lite::FutureExt;
1010
use rand::Rng;
11-
use tokio::sync::{RwLock, RwLockWriteGuard, oneshot};
11+
use tokio::sync::{RwLock, oneshot};
1212
use tokio::task;
1313
use tokio_util::sync::CancellationToken;
1414
use tokio_util::task::TaskTracker;
1515

16-
use self::connectivity::ConnectivityStore;
16+
pub(crate) use self::connectivity::ConnectivityStore;
1717
use crate::config::{self, Config};
1818
use crate::constants;
1919
use crate::contact::{ContactId, RecentlySeenLoop};
@@ -53,32 +53,32 @@ impl SchedulerState {
5353
}
5454

5555
/// Starts the scheduler if it is not yet started.
56-
pub(crate) async fn start(&self, context: Context) {
56+
pub(crate) async fn start(&self, context: &Context) {
5757
let mut inner = self.inner.write().await;
5858
match *inner {
5959
InnerSchedulerState::Started(_) => (),
60-
InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
60+
InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
6161
InnerSchedulerState::Paused {
6262
ref mut started, ..
6363
} => *started = true,
6464
}
65+
context.update_connectivities(&inner);
6566
}
6667

6768
/// Starts the scheduler if it is not yet started.
68-
async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
69+
async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
6970
info!(context, "starting IO");
7071

7172
// Notify message processing loop
7273
// to allow processing old messages after restart.
7374
context.new_msgs_notify.notify_one();
7475

75-
let ctx = context.clone();
76-
match Scheduler::start(&context).await {
76+
match Scheduler::start(context).await {
7777
Ok(scheduler) => {
7878
*inner = InnerSchedulerState::Started(scheduler);
7979
context.emit_event(EventType::ConnectivityChanged);
8080
}
81-
Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
81+
Err(err) => error!(context, "Failed to start IO: {:#}", err),
8282
}
8383
}
8484

@@ -87,18 +87,19 @@ impl SchedulerState {
8787
let mut inner = self.inner.write().await;
8888
match *inner {
8989
InnerSchedulerState::Started(_) => {
90-
Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
90+
Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
9191
}
9292
InnerSchedulerState::Stopped => (),
9393
InnerSchedulerState::Paused {
9494
ref mut started, ..
9595
} => *started = false,
9696
}
97+
context.update_connectivities(&inner);
9798
}
9899

99100
/// Stops the scheduler if it is currently running.
100101
async fn do_stop(
101-
mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
102+
inner: &mut InnerSchedulerState,
102103
context: &Context,
103104
new_state: InnerSchedulerState,
104105
) {
@@ -122,7 +123,7 @@ impl SchedulerState {
122123
debug_logging.loop_handle.abort();
123124
debug_logging.loop_handle.await.ok();
124125
}
125-
let prev_state = std::mem::replace(&mut *inner, new_state);
126+
let prev_state = std::mem::replace(inner, new_state);
126127
context.emit_event(EventType::ConnectivityChanged);
127128
match prev_state {
128129
InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
@@ -138,7 +139,7 @@ impl SchedulerState {
138139
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
139140
/// resume will do the right thing and restore the scheduler to the state requested by
140141
/// the last call.
141-
pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
142+
pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
142143
{
143144
let mut inner = self.inner.write().await;
144145
match *inner {
@@ -147,7 +148,7 @@ impl SchedulerState {
147148
started: true,
148149
pause_guards_count: NonZeroUsize::new(1).unwrap(),
149150
};
150-
Self::do_stop(inner, &context, new_state).await;
151+
Self::do_stop(&mut inner, context, new_state).await;
151152
}
152153
InnerSchedulerState::Stopped => {
153154
*inner = InnerSchedulerState::Paused {
@@ -164,9 +165,11 @@ impl SchedulerState {
164165
.ok_or_else(|| Error::msg("Too many pause guards active"))?
165166
}
166167
}
168+
context.update_connectivities(&inner);
167169
}
168170

169171
let (tx, rx) = oneshot::channel();
172+
let context = context.clone();
170173
tokio::spawn(async move {
171174
rx.await.ok();
172175
let mut inner = context.scheduler.inner.write().await;
@@ -183,7 +186,7 @@ impl SchedulerState {
183186
} => {
184187
if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
185188
match *started {
186-
true => SchedulerState::do_start(inner, context.clone()).await,
189+
true => SchedulerState::do_start(&mut inner, &context).await,
187190
false => *inner = InnerSchedulerState::Stopped,
188191
}
189192
} else {
@@ -193,6 +196,7 @@ impl SchedulerState {
193196
}
194197
}
195198
}
199+
context.update_connectivities(&inner);
196200
});
197201
Ok(IoPausedGuard { sender: Some(tx) })
198202
}
@@ -202,7 +206,7 @@ impl SchedulerState {
202206
info!(context, "restarting IO");
203207
if self.is_running().await {
204208
self.stop(context).await;
205-
self.start(context.clone()).await;
209+
self.start(context).await;
206210
}
207211
}
208212

@@ -288,7 +292,7 @@ impl SchedulerState {
288292
}
289293

290294
#[derive(Debug, Default)]
291-
enum InnerSchedulerState {
295+
pub(crate) enum InnerSchedulerState {
292296
Started(Scheduler),
293297
#[default]
294298
Stopped,

src/scheduler/connectivity.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -272,16 +272,7 @@ impl Context {
272272
///
273273
/// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted.
274274
pub async fn get_connectivity(&self) -> Connectivity {
275-
let lock = self.scheduler.inner.read().await;
276-
let stores: Vec<_> = match *lock {
277-
InnerSchedulerState::Started(ref sched) => sched
278-
.boxes()
279-
.map(|b| b.conn_state.state.connectivity.clone())
280-
.collect(),
281-
_ => return Connectivity::NotConnected,
282-
};
283-
drop(lock);
284-
275+
let stores = self.connectivities.lock().clone();
285276
let mut connectivities = Vec::new();
286277
for s in stores {
287278
if let Some(connectivity) = s.get_basic().await {
@@ -291,7 +282,18 @@ impl Context {
291282
connectivities
292283
.into_iter()
293284
.min()
294-
.unwrap_or(Connectivity::Connected)
285+
.unwrap_or(Connectivity::NotConnected)
286+
}
287+
288+
pub(crate) fn update_connectivities(&self, sched: &InnerSchedulerState) {
289+
let stores: Vec<_> = match sched {
290+
InnerSchedulerState::Started(sched) => sched
291+
.boxes()
292+
.map(|b| b.conn_state.state.connectivity.clone())
293+
.collect(),
294+
_ => Vec::new(),
295+
};
296+
*self.connectivities.lock() = stores;
295297
}
296298

297299
/// Get an overview of the current connectivity, and possibly more statistics.

0 commit comments

Comments
 (0)