Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ tracing = { version = "0.1.44", features = ["log"] }
serde = { version = "1.0.228", features = ["derive", "rc"] }
url = { version = "2.5.7", features = ["serde"] }
tokio = { version = "1.48.0", features = ["macros", "rt", "sync", "time", "io-util"] }
futures = { version = "0.3.32", default-features = false, features = ["std"] }
futures = { version = "0.3.31", default-features = false, features = ["std"] }
time = { version = "0.3.44", features = ["formatting", "parsing", "serde-well-known"] }
base64 = { version = "0.22.1" }
zeroize = { version = "1.8.2" } # Not used in serenity, but bumps the minimal version from secrecy
Expand Down
8 changes: 5 additions & 3 deletions examples/e07_shard_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ async fn main() {
sleep(Duration::from_secs(30)).await;

for entry in runners.iter() {
let (id, runner) = entry.pair();
let info = runner.info.read();
println!("Shard ID {} is {} with a latency of {:?}", id, info.stage, info.latency,);
let (id, (runner, _)) = entry.pair();
println!(
"Shard ID {} is {} with a latency of {:?}",
id, runner.stage, runner.latency,
);
}
}
});
Expand Down
6 changes: 3 additions & 3 deletions src/gateway/client/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use dashmap::DashMap;
use futures::channel::mpsc::UnboundedSender as Sender;
use parking_lot::RwLock;

use crate::builder::DataUri;
#[cfg(feature = "cache")]
Expand Down Expand Up @@ -48,8 +48,8 @@ pub struct Context {
pub http: Arc<Http>,
#[cfg(feature = "cache")]
pub cache: Arc<Cache>,
/// Metadata about the shard.
pub runner_info: Arc<RwLock<ShardRunnerInfo>>,
/// Metadata about the initialised shards, and their control channels.
pub runners: Arc<DashMap<ShardId, (ShardRunnerInfo, Sender<ShardRunnerMessage>)>>,
#[cfg(feature = "collector")]
pub(crate) collectors: Arc<parking_lot::RwLock<Vec<CollectorCallback>>>,
}
Expand Down
3 changes: 0 additions & 3 deletions src/gateway/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,6 @@ fn update_cache_with_event(ctx: &Context, event: Event) -> FullEvent {
Event::MessagePollVoteRemove(event) => FullEvent::MessagePollVoteRemove {
event,
},
Event::ShardStageUpdate(event) => FullEvent::ShardStageUpdate {
event,
},
}
}

Expand Down
1 change: 1 addition & 0 deletions src/gateway/client/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use strum::{EnumCount, IntoStaticStr, VariantNames};
use super::context::Context;
#[cfg(doc)]
use crate::gateway::ShardRunner;
use crate::gateway::ShardStageUpdateEvent;
use crate::http::RatelimitInfo;
use crate::model::prelude::*;

Expand Down
1 change: 0 additions & 1 deletion src/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod ws;
#[cfg(feature = "http")]
use reqwest::IntoUrl;
use reqwest::Url;
use serde::Serialize;

pub use self::error::Error as GatewayError;
pub use self::sharding::*;
Expand Down
116 changes: 99 additions & 17 deletions src/gateway/sharding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod shard_manager;
mod shard_queue;
mod shard_runner;

use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand All @@ -46,19 +47,15 @@ use tracing::{debug, error, info, trace, warn};
use url::Url;

pub use self::shard_manager::{
DEFAULT_WAIT_BETWEEN_SHARD_START,
ShardManager,
ShardManagerMessage,
ShardManagerOptions,
ShardRunnerMetadata,
DEFAULT_WAIT_BETWEEN_SHARD_START, ShardManager, ShardManagerMessage, ShardManagerOptions,
};
pub use self::shard_queue::ShardQueue;
pub use self::shard_runner::{ShardRunner, ShardRunnerMessage};
pub use self::shard_runner::{ShardRunner, ShardRunnerMessage, ShardRunnerOptions};
use super::{ActivityData, ChunkGuildFilter, GatewayError, PresenceData, WsClient};
use crate::constants::{self, CloseCode};
use crate::internal::prelude::*;
use crate::model::event::{DeserializedEvent, Event, GatewayEvent};
use crate::model::gateway::{ConnectionStage, GatewayIntents, ShardInfo};
use crate::model::gateway::{GatewayIntents, ShardInfo};
#[cfg(feature = "voice")]
use crate::model::id::ChannelId;
use crate::model::id::{ApplicationId, GuildId, ShardId};
Expand Down Expand Up @@ -394,10 +391,9 @@ impl Shard {
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub fn handle_event(&mut self, event: Result<GatewayEvent>) -> Result<Option<ShardAction>> {
match event {
Ok(GatewayEvent::Dispatch {
seq,
event,
}) => Ok(self.handle_gateway_dispatch(seq, event).map(ShardAction::Dispatch)),
Ok(GatewayEvent::Dispatch { seq, event }) => {
Ok(self.handle_gateway_dispatch(seq, event).map(ShardAction::Dispatch))
},
Ok(GatewayEvent::Heartbeat) => {
info!("[{:?}] Received a request to heartbeat", self.info);
Ok(Some(ShardAction::Heartbeat))
Expand Down Expand Up @@ -712,12 +708,7 @@ impl HeartbeatMetadata {

impl Default for HeartbeatMetadata {
fn default() -> Self {
Self {
started: Instant::now(),
last_sent: None,
last_ack: None,
interval: None,
}
Self { started: Instant::now(), last_sent: None, last_ack: None, interval: None }
}
}

Expand Down Expand Up @@ -747,13 +738,104 @@ pub struct ShardRunnerInfo {
pub stage: ConnectionStage,
}

/// An event denoting that a shard's connection stage was changed.
///
/// # Examples
///
/// This might happen when a shard changes from [`ConnectionStage::Identifying`] to
/// [`ConnectionStage::Connected`].
#[derive(Clone, Debug, Serialize)]
pub struct ShardStageUpdateEvent {
/// The new connection stage.
pub new: ConnectionStage,
/// The old connection stage.
pub old: ConnectionStage,
/// The ID of the shard that had its connection stage change.
pub shard_id: ShardId,
}

/// Indicates the current connection stage of a [`Shard`].
///
/// This can be useful for knowing which shards are currently "down"/"up".
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
#[non_exhaustive]
pub enum ConnectionStage {
/// Indicator that the [`Shard`] is normally connected and is not in, e.g., a resume phase.
Connected,
/// Indicator that the [`Shard`] is connecting and is in, e.g., a resume phase.
Connecting,
/// Indicator that the [`Shard`] is fully disconnected and is not in a reconnecting phase.
Disconnected,
/// Indicator that the [`Shard`] is currently initiating a handshake.
Handshake,
/// Indicator that the [`Shard`] has sent an IDENTIFY packet and is awaiting a READY packet.
Identifying,
/// Indicator that the [`Shard`] has sent a RESUME packet and is awaiting a RESUMED packet.
Resuming,
}

impl ConnectionStage {
/// Whether the stage is a form of connecting.
///
/// This will return `true` on:
/// - [`Connecting`][`ConnectionStage::Connecting`]
/// - [`Handshake`][`ConnectionStage::Handshake`]
/// - [`Identifying`][`ConnectionStage::Identifying`]
/// - [`Resuming`][`ConnectionStage::Resuming`]
///
/// All other variants will return `false`.
///
/// # Examples
///
/// Assert that [`ConnectionStage::Identifying`] is a connecting stage:
///
/// ```rust
/// use serenity::gateway::ConnectionStage;
///
/// assert!(ConnectionStage::Identifying.is_connecting());
/// ```
///
/// Assert that [`ConnectionStage::Connected`] is _not_ a connecting stage:
///
/// ```rust
/// use serenity::gateway::ConnectionStage;
///
/// assert!(!ConnectionStage::Connected.is_connecting());
/// ```
#[must_use]
pub fn is_connecting(self) -> bool {
use self::ConnectionStage::{Connecting, Handshake, Identifying, Resuming};
matches!(self, Connecting | Handshake | Identifying | Resuming)
}
}

impl fmt::Display for ConnectionStage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match *self {
Self::Connected => "connected",
Self::Connecting => "connecting",
Self::Disconnected => "disconnected",
Self::Handshake => "handshaking",
Self::Identifying => "identifying",
Self::Resuming => "resuming",
})
}
}

/// Newtype around a callback that will be called on every incoming request. As long as this
/// collector should still receive events, it should return `true`. Once it returns `false`, it is
/// removed.
#[cfg(feature = "collector")]
#[derive(Clone)]
pub struct CollectorCallback(pub Arc<dyn Fn(&Event) -> bool + Send + Sync>);

#[cfg(feature = "collector")]
impl fmt::Debug for CollectorCallback {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("CollectorCallback").finish()
}
}

/// The transport compression method to use.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
Expand Down
Loading
Loading