Skip to content

Commit ec33fbb

Browse files
committed
Merge ShardQueuer functionality into ShardManager (#3108)
Untangles the spaghetti of shard management code by removing the middleman `ShardQueuer` struct, which used a disorienting mess of mpsc channels to facilitate message passing between the `ShardManager` and each of its `ShardRunner`s. The new `ShardManager::run` function takes care of creating/spawning new runners as needed. This is fine to call without spawning a separate task (which was being done for `ShardQueuer`) because `Client::start_connection` previously already waited for the `ShardQueuer` loop to (essentially) finish before returning. In other words, the Client would start the `ShardManager` and then block, but the waiting was just being facilitated through channels, which wasn't obvious. The only messages passed across channels now are `ShardRunnerMessage` (sent from the manager to a given runner), and a new `ShardManagerMessage` (sent from a runner back to the manager). For example, the manager can send a message to a runner telling it to restart, then the runner will shut itself down and ask the manager to reboot the shard by sending it a message back. The shard queue now keeps track of if it should dispatch in concurrent batches or not. Since we only use concurrent startup when first initializing all shards, and then switch it off once and don't re-enable it, this should be fine.
1 parent 6d87747 commit ec33fbb

File tree

9 files changed

+434
-817
lines changed

9 files changed

+434
-817
lines changed

examples/e07_shard_manager/src/main.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,22 @@ async fn main() {
5252
let mut client =
5353
Client::builder(token, intents).event_handler(Handler).await.expect("Err creating client");
5454

55-
// Here we clone a lock to the Shard Manager, and then move it into a new thread. The thread
56-
// will unlock the manager and print shards' status on a loop.
57-
let manager = client.shard_manager.clone();
55+
// Here we get a HashMap of of the shards' status that we move into a new thread. A separate
56+
// tokio task holds the ownership to each entry, so each one will require acquiring a lock
57+
// before reading.
58+
let runners = client.shard_manager.runner_info();
5859

5960
tokio::spawn(async move {
6061
loop {
6162
sleep(Duration::from_secs(30)).await;
6263

63-
let shard_runners = manager.runners.lock().await;
64-
65-
for (id, runner) in shard_runners.iter() {
66-
println!(
67-
"Shard ID {} is {} with a latency of {:?}",
68-
id, runner.stage, runner.latency,
69-
);
64+
for (id, runner) in &runners {
65+
if let Ok(runner) = runner.lock() {
66+
println!(
67+
"Shard ID {} is {} with a latency of {:?}",
68+
id, runner.stage, runner.latency,
69+
);
70+
}
7071
}
7172
}
7273
});

src/gateway/client/context.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33

44
#[cfg(feature = "cache")]
55
pub use crate::cache::Cache;
6-
use crate::gateway::{ActivityData, ShardMessenger, ShardRunner};
6+
use crate::gateway::{ActivityData, ShardMessenger};
77
use crate::http::{CacheHttp, Http};
88
use crate::model::prelude::*;
99

@@ -62,15 +62,15 @@ impl Context {
6262
/// Create a new Context to be passed to an event handler.
6363
pub(crate) fn new(
6464
data: Arc<dyn std::any::Any + Send + Sync>,
65-
runner: &ShardRunner,
65+
shard_messenger: ShardMessenger,
6666
shard_id: ShardId,
6767
http: Arc<Http>,
6868
#[cfg(feature = "cache")] cache: Arc<Cache>,
6969
) -> Context {
7070
Context {
71-
shard: ShardMessenger::new(runner),
72-
shard_id,
7371
data,
72+
shard: shard_messenger,
73+
shard_id,
7474
http,
7575
#[cfg(feature = "cache")]
7676
cache,

src/gateway/client/mod.rs

Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -35,32 +35,29 @@ use std::sync::Arc;
3535
use std::sync::OnceLock;
3636
use std::time::Duration;
3737

38-
use futures::channel::mpsc::UnboundedReceiver as Receiver;
3938
use futures::future::BoxFuture;
40-
use futures::StreamExt as _;
4139
#[cfg(feature = "tracing_instrument")]
4240
use tracing::instrument;
4341
use tracing::{debug, warn};
4442

4543
pub use self::context::Context;
4644
pub use self::event_handler::{EventHandler, FullEvent, RawEventHandler};
47-
use super::TransportCompression;
48-
#[cfg(feature = "cache")]
49-
use crate::cache::Cache;
50-
#[cfg(feature = "cache")]
51-
use crate::cache::Settings as CacheSettings;
52-
#[cfg(feature = "framework")]
53-
use crate::framework::Framework;
5445
#[cfg(feature = "voice")]
55-
use crate::gateway::VoiceGatewayManager;
56-
use crate::gateway::{
46+
use super::VoiceGatewayManager;
47+
use super::{
5748
ActivityData,
58-
GatewayError,
5949
PresenceData,
6050
ShardManager,
6151
ShardManagerOptions,
52+
TransportCompression,
6253
DEFAULT_WAIT_BETWEEN_SHARD_START,
6354
};
55+
#[cfg(feature = "cache")]
56+
use crate::cache::Cache;
57+
#[cfg(feature = "cache")]
58+
use crate::cache::Settings as CacheSettings;
59+
#[cfg(feature = "framework")]
60+
use crate::framework::Framework;
6461
use crate::http::Http;
6562
use crate::internal::prelude::*;
6663
use crate::internal::tokio::spawn_named;
@@ -316,9 +313,6 @@ impl IntoFuture for ClientBuilder {
316313
}
317314
}
318315

319-
#[cfg(feature = "voice")]
320-
let voice_manager = self.voice_manager;
321-
322316
#[cfg(feature = "cache")]
323317
let cache = Arc::new(Cache::new_with_settings(self.cache_settings));
324318

@@ -337,33 +331,33 @@ impl IntoFuture for ClientBuilder {
337331

338332
#[cfg(feature = "framework")]
339333
let framework_cell = Arc::new(OnceLock::new());
340-
let (shard_manager, shard_manager_ret_value) = ShardManager::new(ShardManagerOptions {
334+
335+
let shard_manager = ShardManager::new(ShardManagerOptions {
341336
token: self.token,
342337
data: Arc::clone(&data),
343338
event_handler: self.event_handler,
344339
raw_event_handler: self.raw_event_handler,
345340
#[cfg(feature = "framework")]
346341
framework: Arc::clone(&framework_cell),
347342
#[cfg(feature = "voice")]
348-
voice_manager: voice_manager.clone(),
343+
voice_manager: self.voice_manager.clone(),
349344
ws_url: Arc::clone(&ws_url),
345+
compression: self.compression,
350346
shard_total,
347+
max_concurrency,
351348
#[cfg(feature = "cache")]
352349
cache: Arc::clone(&cache),
353350
http: Arc::clone(&http),
354351
intents,
355352
presence: Some(presence),
356-
max_concurrency,
357353
wait_time_between_shard_start: self.wait_time_between_shard_start,
358-
compression: self.compression,
359354
});
360355

361356
let client = Client {
362357
data,
363358
shard_manager,
364-
shard_manager_return_value: shard_manager_ret_value,
365359
#[cfg(feature = "voice")]
366-
voice_manager,
360+
voice_manager: self.voice_manager,
367361
ws_url,
368362
#[cfg(feature = "cache")]
369363
cache,
@@ -449,43 +443,17 @@ pub struct Client {
449443
/// # use std::time::Duration;
450444
/// #
451445
/// # fn run(client: Client) {
452-
/// // Create a clone of the `Arc` containing the shard manager.
453-
/// let shard_manager = client.shard_manager.clone();
454-
///
455446
/// tokio::spawn(async move {
456447
/// loop {
457-
/// let count = shard_manager.shards_instantiated().await.len();
448+
/// let count = client.shard_manager.shards_instantiated().len();
458449
/// println!("Shard count instantiated: {}", count);
459450
///
460451
/// tokio::time::sleep(Duration::from_millis(5000)).await;
461452
/// }
462453
/// });
463454
/// # }
464455
/// ```
465-
///
466-
/// Shutting down all connections after one minute of operation:
467-
///
468-
/// ```rust,no_run
469-
/// # use serenity::prelude::*;
470-
/// # use std::time::Duration;
471-
/// #
472-
/// # fn run(client: Client) {
473-
/// // Create a clone of the `Arc` containing the shard manager.
474-
/// let shard_manager = client.shard_manager.clone();
475-
///
476-
/// // Create a thread which will sleep for 60 seconds and then have the shard manager
477-
/// // shutdown.
478-
/// tokio::spawn(async move {
479-
/// tokio::time::sleep(Duration::from_secs(60)).await;
480-
///
481-
/// shard_manager.shutdown_all().await;
482-
///
483-
/// println!("Shutdown shard manager!");
484-
/// });
485-
/// # }
486-
/// ```
487-
pub shard_manager: Arc<ShardManager>,
488-
shard_manager_return_value: Receiver<Result<(), GatewayError>>,
456+
pub shard_manager: ShardManager,
489457
/// The voice manager for the client.
490458
///
491459
/// This is an ergonomic structure for interfacing over shards' voice
@@ -796,12 +764,7 @@ impl Client {
796764

797765
debug!("Initializing shard info: {} - {}/{}", start_shard, init, total_shards);
798766

799-
self.shard_manager.initialize(start_shard, init, total_shards);
800-
if let Some(Err(err)) = self.shard_manager_return_value.next().await {
801-
return Err(Error::Gateway(err));
802-
}
803-
804-
Ok(())
767+
self.shard_manager.run(start_shard, init, total_shards).await.map_err(Error::Gateway)
805768
}
806769
}
807770

src/gateway/sharding/mod.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,9 @@
1616
//!
1717
//! ### [`ShardManager`]
1818
//!
19-
//! The shard manager provides a clean interface for communicating with shard runners either
20-
//! individually or collectively, with functions such as [`ShardManager::shutdown`] and
21-
//! [`ShardManager::restart`] to manage shards in a fine-grained way.
22-
//!
23-
//! For most use cases, the [`ShardManager`] will fit all your low-level sharding needs.
24-
//!
25-
//! ### [`ShardQueuer`]
26-
//!
27-
//! A light wrapper around an mpsc receiver that receives [`ShardQueuerMessage`]s. It should be run
28-
//! in its own thread so it can receive messages to start shards concurrently in a queue.
19+
//! The shard manager provides an interface for managing the starting, restarting, and shutdown of
20+
//! shards by spawning tasks, each containing a [`ShardRunner`], and communicating with each task
21+
//! using message passing.
2922
//!
3023
//! ### [`ShardRunner`]
3124
//!
@@ -37,7 +30,7 @@
3730
3831
mod shard_manager;
3932
mod shard_messenger;
40-
mod shard_queuer;
33+
mod shard_queue;
4134
mod shard_runner;
4235

4336
use std::fmt;
@@ -56,11 +49,12 @@ use url::Url;
5649

5750
pub use self::shard_manager::{
5851
ShardManager,
52+
ShardManagerMessage,
5953
ShardManagerOptions,
6054
DEFAULT_WAIT_BETWEEN_SHARD_START,
6155
};
6256
pub use self::shard_messenger::ShardMessenger;
63-
pub use self::shard_queuer::{ShardQueue, ShardQueuer, ShardQueuerMessage};
57+
pub use self::shard_queue::ShardQueue;
6458
pub use self::shard_runner::{ShardRunner, ShardRunnerMessage, ShardRunnerOptions};
6559
use super::{ActivityData, ChunkGuildFilter, GatewayError, PresenceData, WsClient};
6660
use crate::constants::{self, CloseCode};
@@ -779,9 +773,6 @@ pub enum ShardAction {
779773
pub struct ShardRunnerInfo {
780774
/// The latency between when a heartbeat was sent and when the acknowledgement was received.
781775
pub latency: Option<StdDuration>,
782-
/// The channel used to communicate with the shard runner, telling it what to do with regards
783-
/// to its status.
784-
pub runner_tx: ShardMessenger,
785776
/// The current connection stage of the shard.
786777
pub stage: ConnectionStage,
787778
}

0 commit comments

Comments
 (0)