Skip to content

Commit a7d41ca

Browse files
committed
Fix ShardQueue batch size bug (#3121)
If the last batch to be started was not of maximum size, it wouldn't get popped off the queue. Instead, we should just pop off the queue periodically to ensure that we fully flush it. On client startup, we queue all shards before popping the first batch, so suboptimal batch sizes are not a concern. Now, the queue also doesn't have to worry about "concurrent mode" and saturating the size of its batches.
1 parent ec33fbb commit a7d41ca

File tree

2 files changed

+9
-27
lines changed

2 files changed

+9
-27
lines changed

src/gateway/sharding/shard_manager.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,18 @@ impl ShardManager {
141141
shard_init: u16,
142142
shard_total: NonZeroU16,
143143
) -> Result<(), GatewayError> {
144-
self.initialize(shard_index, shard_init, shard_total).await;
144+
self.initialize(shard_index, shard_init, shard_total);
145145
loop {
146146
if let Ok(Some(msg)) =
147147
timeout(self.wait_time_between_shard_start, self.manager_rx.next()).await
148148
{
149149
match msg {
150-
ShardManagerMessage::Boot(shard_id) => self.boot(shard_id, false).await,
150+
ShardManagerMessage::Boot(shard_id) => self.queue_for_start(shard_id),
151151
ShardManagerMessage::Quit(err) => return Err(err),
152152
}
153153
}
154+
let batch = self.queue.pop_batch();
155+
self.checked_start(batch).await;
154156
}
155157
}
156158

@@ -159,24 +161,20 @@ impl ShardManager {
159161
/// Note that this queues all shards but does not actually start them. To start the manager's
160162
/// event loop and dispatch [`ShardRunner`]s as they get queued, call [`Self::run`] instead.
161163
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
162-
pub async fn initialize(&mut self, shard_index: u16, shard_init: u16, shard_total: NonZeroU16) {
164+
pub fn initialize(&mut self, shard_index: u16, shard_init: u16, shard_total: NonZeroU16) {
163165
let shard_to = shard_index + shard_init;
164166

165167
self.shard_total = shard_total;
166168
for shard_id in shard_index..shard_to {
167-
self.boot(ShardId(shard_id), true).await;
169+
self.queue_for_start(ShardId(shard_id));
168170
}
169171
}
170172

171173
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
172-
async fn boot(&mut self, shard_id: ShardId, concurrent: bool) {
174+
fn queue_for_start(&mut self, shard_id: ShardId) {
173175
info!("Queueing shard {shard_id} for starting");
174176

175177
self.queue.push_back(shard_id);
176-
self.queue.set_concurrent(concurrent);
177-
if let Some(batch) = self.queue.pop_batch() {
178-
self.checked_start(batch).await;
179-
}
180178
}
181179

182180
/// Restarts a shard runner.

src/gateway/sharding/shard_queue.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use crate::internal::prelude::*;
99
#[must_use]
1010
pub struct ShardQueue {
1111
buckets: FixedArray<VecDeque<ShardId>, u16>,
12-
concurrent: bool,
1312
}
1413

1514
impl ShardQueue {
@@ -19,7 +18,6 @@ impl ShardQueue {
1918

2019
Self {
2120
buckets,
22-
concurrent: false,
2321
}
2422
}
2523

@@ -40,21 +38,7 @@ impl ShardQueue {
4038
}
4139

4240
/// Pops a `ShardId` from every bucket containing at least one and returns them all as a `Vec`.
43-
///
44-
/// If the queue is in concurrent mode, calling this will only return `Some` if all buckets are
45-
/// filled and a maximally-sized batch can be popped, otherwise will return `None`.
46-
///
47-
/// If the queue is _not_ in concurrent mode, this will return `Some` as long as at least one
48-
/// bucket is filled.
49-
pub fn pop_batch(&mut self) -> Option<Vec<ShardId>> {
50-
if self.concurrent && !self.buckets.iter().all(|b| !b.is_empty()) {
51-
None
52-
} else {
53-
Some(self.buckets.iter_mut().filter_map(VecDeque::pop_front).collect())
54-
}
55-
}
56-
57-
pub fn set_concurrent(&mut self, concurrent: bool) {
58-
self.concurrent = concurrent;
41+
pub fn pop_batch(&mut self) -> Vec<ShardId> {
42+
self.buckets.iter_mut().filter_map(VecDeque::pop_front).collect()
5943
}
6044
}

0 commit comments

Comments
 (0)