Skip to content

Commit 10cbc07

Browse files
committed
Always restart if the WsClient encounters an error (serenity-rs#3355)
Fixes serenity-rs#3347
1 parent d17ab38 commit 10cbc07

File tree

1 file changed

+59
-66
lines changed

1 file changed

+59
-66
lines changed

src/gateway/sharding/shard_runner.rs

Lines changed: 59 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -267,60 +267,6 @@ impl ShardRunner {
267267
}
268268
}
269269

270-
// Handles a received value over the shard runner rx channel.
271-
//
272-
// Returns a boolean on whether the shard runner can continue.
273-
//
274-
// This always returns true, except in the case that the shard manager asked the runner to
275-
// shutdown or restart.
276-
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
277-
async fn handle_rx_value(&mut self, msg: ShardRunnerMessage) -> bool {
278-
match msg {
279-
ShardRunnerMessage::Restart => {
280-
self.restart().await;
281-
false
282-
},
283-
ShardRunnerMessage::Shutdown(code) => {
284-
self.shutdown(code).await;
285-
false
286-
},
287-
ShardRunnerMessage::ChunkGuild {
288-
guild_id,
289-
limit,
290-
presences,
291-
filter,
292-
nonce,
293-
} => self
294-
.shard
295-
.chunk_guild(guild_id, limit, presences, filter, nonce.as_deref())
296-
.await
297-
.is_ok(),
298-
ShardRunnerMessage::SetPresence {
299-
activity,
300-
status,
301-
} => {
302-
if let Some(activity) = activity {
303-
self.shard.set_activity(activity);
304-
}
305-
if let Some(status) = status {
306-
self.shard.set_status(status);
307-
}
308-
self.shard.update_presence().await.is_ok()
309-
},
310-
#[cfg(feature = "voice")]
311-
ShardRunnerMessage::UpdateVoiceState {
312-
guild_id,
313-
channel_id,
314-
self_mute,
315-
self_deaf,
316-
} => self
317-
.shard
318-
.update_voice_state(guild_id, channel_id, self_mute, self_deaf)
319-
.await
320-
.is_ok(),
321-
}
322-
}
323-
324270
#[cfg(feature = "voice")]
325271
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
326272
async fn handle_voice_event(&self, event: &Event) {
@@ -346,29 +292,76 @@ impl ShardRunner {
346292
}
347293
}
348294

349-
// Receives values over the internal shard runner rx channel and handles them. This will loop
350-
// over values until there is no longer one.
351-
//
352-
// Requests a restart if the sending half of the channel disconnects. This should _never_
353-
// happen, as the sending half is kept on the runner.
295+
// Receives messages over the internal `runner_rx` channel and handles them. Will loop over all
296+
// queued messages until the channel is empty. Requests a restart if handling a message fails.
354297
//
355-
// Returns whether the shard runner is in a state that can continue.
298+
// Returns whether the shard runner can continue executing its main loop.
356299
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
357300
async fn recv(&mut self) -> bool {
358301
while let Ok(msg) = self.runner_rx.try_next() {
359-
if let Some(value) = msg {
360-
if !self.handle_rx_value(value).await {
302+
let Some(msg) = msg else {
303+
// This should never happen, because `self.runner_tx` always holds a copy of the
304+
// other end of the channel.
305+
warn!(
306+
"[ShardRunner {:?}] Internal channel tx dropped; restarting",
307+
self.shard.shard_info(),
308+
);
309+
310+
self.restart().await;
311+
return false;
312+
};
313+
314+
let res = match msg {
315+
ShardRunnerMessage::Restart => {
316+
self.restart().await;
361317
return false;
362-
}
363-
} else {
364-
warn!("[ShardRunner {:?}] Sending half DC; restarting", self.shard.shard_info(),);
318+
},
319+
ShardRunnerMessage::Shutdown(code) => {
320+
self.shutdown(code).await;
321+
return false;
322+
},
323+
ShardRunnerMessage::ChunkGuild {
324+
guild_id,
325+
limit,
326+
presences,
327+
filter,
328+
nonce,
329+
} => {
330+
self.shard
331+
.chunk_guild(guild_id, limit, presences, filter, nonce.as_deref())
332+
.await
333+
},
334+
ShardRunnerMessage::SetPresence {
335+
activity,
336+
status,
337+
} => {
338+
if let Some(activity) = activity {
339+
self.shard.set_activity(activity);
340+
}
341+
if let Some(status) = status {
342+
self.shard.set_status(status);
343+
}
344+
self.shard.update_presence().await
345+
},
346+
#[cfg(feature = "voice")]
347+
ShardRunnerMessage::UpdateVoiceState {
348+
guild_id,
349+
channel_id,
350+
self_mute,
351+
self_deaf,
352+
} => {
353+
self.shard.update_voice_state(guild_id, channel_id, self_mute, self_deaf).await
354+
},
355+
};
356+
357+
if let Err(why) = res {
358+
warn!("[ShardRunner {:?}] Websocket error: {:?}", self.shard.shard_info(), why);
365359

366360
self.restart().await;
367361
return false;
368362
}
369363
}
370364

371-
// There are no longer any values available.
372365
true
373366
}
374367

0 commit comments

Comments
 (0)