diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index f59f8cd68dd..2a354fb910c 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -590,6 +590,7 @@ async fn ws_main_loop( code: CloseCode::Away, reason: "module exited".into() }; + log::warn!("Module exited for client {}", state.client_id); unordered_tx(close.into()); } watch_hotswap.set(hotswap()); @@ -685,14 +686,14 @@ async fn ws_recv_task( let result = message_handler(data, timer).await; if let Err(e) = result { if let MessageHandleError::Execution(err) = e { - log::error!("{err:#}"); + log::error!("Message execution error: {err:#}"); // If the send task has exited, also exit this recv task. if unordered_tx.send(err.into()).is_err() { break; } continue; } - log::debug!("Client caused error: {e}"); + log::warn!("Client caused error: {e}"); let close = CloseFrame { code: CloseCode::Error, reason: format!("{e:#}").into(), @@ -730,7 +731,7 @@ fn ws_recv_loop( ws: &mut (impl Stream> + Unpin), ) -> Option> { if state.closed() { - log::trace!("drain websocket waiting for client close"); + log::info!("drain websocket waiting for client close"); let res: Result>, Elapsed> = timeout(state.config.close_handshake_timeout, async { while let Some(item) = ws.next().await { @@ -772,7 +773,7 @@ fn ws_recv_loop( // // - the client sends a close frame (`ws` returns `None) // - or `ws` yields an error - log::trace!("message received while already closed"); + log::warn!("message received while already closed"); } // None of the error cases can be meaningfully recovered from // (and some can't even occur on the `ws` stream). @@ -921,20 +922,20 @@ fn ws_client_message_handler( while let Some(message) = messages.next().await { match message { ClientMessage::Message(message) => { - log::trace!("Received client message"); + log::info!("Received client message"); yield (message, Instant::now()); }, ClientMessage::Ping(_bytes) => { - log::trace!("Received ping from client {}", state.client_id); + log::info!("Received ping from client {}", state.client_id); // `tungstenite` will respond with `Pong` for us, // no need to send it ourselves. }, ClientMessage::Pong(_bytes) => { - log::trace!("Received pong from client {}", state.client_id); + log::info!("Received pong from client {}", state.client_id); state.set_ponged(); }, ClientMessage::Close(close_frame) => { - log::trace!("Received Close frame from client {}: {:?}", state.client_id, close_frame); + log::warn!("Received Close frame from client {}: {:?}", state.client_id, close_frame); let was_closed = state.close(); // This is the client telling us they want to close. if !was_closed { @@ -943,7 +944,7 @@ fn ws_client_message_handler( } } } - log::trace!("client message handler done"); + log::info!("client message handler done"); } } @@ -1013,7 +1014,7 @@ async fn ws_send_loop( } match msg { UnorderedWsMessage::Close(close_frame) => { - log::trace!("sending close frame"); + log::warn!("sending close frame: {close_frame:?}"); if let Err(e) = ws.send(WsMessage::Close(Some(close_frame))).await { log::warn!("error sending close frame: {e:#}"); break; @@ -1027,14 +1028,14 @@ async fn ws_send_loop( messages.close(); }, UnorderedWsMessage::Ping(bytes) => { - log::trace!("sending ping"); + log::info!("sending ping"); if let Err(e) = ws.feed(WsMessage::Ping(bytes)).await { log::warn!("error sending ping: {e:#}"); break; } }, UnorderedWsMessage::Error(err) => { - log::trace!("sending error result"); + log::info!("sending error result"); let (msg_alloc, res) = send_message( &state.database, config, @@ -1057,7 +1058,7 @@ async fn ws_send_loop( if n == 0 { continue; } - log::trace!("sending {n} outgoing messages"); + log::info!("sending {n} outgoing messages"); for msg in messages_buf.drain(..n) { let (msg_alloc, res) = send_message( &state.database, diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index cf3e5511fff..fe1c6eb3afe 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -181,6 +181,11 @@ impl ClientConnectionSender { // we've hit CLIENT_CHANNEL_CAPACITY messages backed up in // the channel, so forcibly kick the client tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded"); + log::warn!( + "client channel capacity exceeded for identity {}, connection id {}", + self.id.identity, + self.id.connection_id + ); self.abort_handle.abort(); self.cancelled.store(true, Ordering::Relaxed); return Err(ClientSendError::Cancelled); @@ -455,10 +460,16 @@ impl ClientConnection { let _gauge_guard = module_info.metrics.connected_clients.inc_scope(); module_info.metrics.ws_clients_spawned.inc(); - scopeguard::defer! { + scopeguard::defer_on_success! { let database_identity = module_info.database_identity; let client_identity = id.identity; - log::warn!("websocket connection aborted for client identity `{client_identity}` and database identity `{database_identity}`"); + log::warn!("websocket connection aborted (defer_on_success) for client identity `{client_identity}` and database identity `{database_identity}`"); + module_info.metrics.ws_clients_aborted.inc(); + } + scopeguard::defer_on_unwind! { + let database_identity = module_info.database_identity; + let client_identity = id.identity; + log::warn!("websocket connection aborted (defer_on_unwind) for client identity `{client_identity}` and database identity `{database_identity}`"); module_info.metrics.ws_clients_aborted.inc(); };