Skip to content

Up a bunch of WS-related logs #3162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ async fn ws_main_loop<HotswapWatcher>(
code: CloseCode::Away,
reason: "module exited".into()
};
log::warn!("Module exited for client {}", state.client_id);
unordered_tx(close.into());
}
watch_hotswap.set(hotswap());
Expand Down Expand Up @@ -685,14 +686,14 @@ async fn ws_recv_task<MessageHandler>(
let result = message_handler(data, timer).await;
if let Err(e) = result {
if let MessageHandleError::Execution(err) = e {
log::error!("{err:#}");
log::error!("Message execution error: {err:#}");
// If the send task has exited, also exit this recv task.
if unordered_tx.send(err.into()).is_err() {
break;
}
continue;
}
log::debug!("Client caused error: {e}");
log::warn!("Client caused error: {e}");
let close = CloseFrame {
code: CloseCode::Error,
reason: format!("{e:#}").into(),
Expand Down Expand Up @@ -730,7 +731,7 @@ fn ws_recv_loop(
ws: &mut (impl Stream<Item = Result<WsMessage, WsError>> + Unpin),
) -> Option<Result<WsMessage, WsError>> {
if state.closed() {
log::trace!("drain websocket waiting for client close");
log::info!("drain websocket waiting for client close");
let res: Result<Option<Result<WsMessage, WsError>>, Elapsed> =
timeout(state.config.close_handshake_timeout, async {
while let Some(item) = ws.next().await {
Expand Down Expand Up @@ -772,7 +773,7 @@ fn ws_recv_loop(
//
// - the client sends a close frame (`ws` returns `None)
// - or `ws` yields an error
log::trace!("message received while already closed");
log::warn!("message received while already closed");
}
// None of the error cases can be meaningfully recovered from
// (and some can't even occur on the `ws` stream).
Expand Down Expand Up @@ -921,20 +922,20 @@ fn ws_client_message_handler(
while let Some(message) = messages.next().await {
match message {
ClientMessage::Message(message) => {
log::trace!("Received client message");
log::info!("Received client message");
yield (message, Instant::now());
},
ClientMessage::Ping(_bytes) => {
log::trace!("Received ping from client {}", state.client_id);
log::info!("Received ping from client {}", state.client_id);
// `tungstenite` will respond with `Pong` for us,
// no need to send it ourselves.
},
ClientMessage::Pong(_bytes) => {
log::trace!("Received pong from client {}", state.client_id);
log::info!("Received pong from client {}", state.client_id);
state.set_ponged();
},
ClientMessage::Close(close_frame) => {
log::trace!("Received Close frame from client {}: {:?}", state.client_id, close_frame);
log::warn!("Received Close frame from client {}: {:?}", state.client_id, close_frame);
let was_closed = state.close();
// This is the client telling us they want to close.
if !was_closed {
Expand All @@ -943,7 +944,7 @@ fn ws_client_message_handler(
}
}
}
log::trace!("client message handler done");
log::info!("client message handler done");
}
}

Expand Down Expand Up @@ -1013,7 +1014,7 @@ async fn ws_send_loop(
}
match msg {
UnorderedWsMessage::Close(close_frame) => {
log::trace!("sending close frame");
log::warn!("sending close frame: {close_frame:?}");
if let Err(e) = ws.send(WsMessage::Close(Some(close_frame))).await {
log::warn!("error sending close frame: {e:#}");
break;
Expand All @@ -1027,14 +1028,14 @@ async fn ws_send_loop(
messages.close();
},
UnorderedWsMessage::Ping(bytes) => {
log::trace!("sending ping");
log::info!("sending ping");
if let Err(e) = ws.feed(WsMessage::Ping(bytes)).await {
log::warn!("error sending ping: {e:#}");
break;
}
},
UnorderedWsMessage::Error(err) => {
log::trace!("sending error result");
log::info!("sending error result");
let (msg_alloc, res) = send_message(
&state.database,
config,
Expand All @@ -1057,7 +1058,7 @@ async fn ws_send_loop(
if n == 0 {
continue;
}
log::trace!("sending {n} outgoing messages");
log::info!("sending {n} outgoing messages");
for msg in messages_buf.drain(..n) {
let (msg_alloc, res) = send_message(
&state.database,
Expand Down
15 changes: 13 additions & 2 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ impl ClientConnectionSender {
// we've hit CLIENT_CHANNEL_CAPACITY messages backed up in
// the channel, so forcibly kick the client
tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded");
log::warn!(
"client channel capacity exceeded for identity {}, connection id {}",
self.id.identity,
self.id.connection_id
);
self.abort_handle.abort();
self.cancelled.store(true, Ordering::Relaxed);
return Err(ClientSendError::Cancelled);
Expand Down Expand Up @@ -455,10 +460,16 @@ impl ClientConnection {

let _gauge_guard = module_info.metrics.connected_clients.inc_scope();
module_info.metrics.ws_clients_spawned.inc();
scopeguard::defer! {
scopeguard::defer_on_success! {
let database_identity = module_info.database_identity;
let client_identity = id.identity;
log::warn!("websocket connection aborted for client identity `{client_identity}` and database identity `{database_identity}`");
log::warn!("websocket connection aborted (defer_on_success) for client identity `{client_identity}` and database identity `{database_identity}`");
module_info.metrics.ws_clients_aborted.inc();
}
scopeguard::defer_on_unwind! {
let database_identity = module_info.database_identity;
let client_identity = id.identity;
log::warn!("websocket connection aborted (defer_on_unwind) for client identity `{client_identity}` and database identity `{database_identity}`");
module_info.metrics.ws_clients_aborted.inc();
};

Expand Down
Loading