Skip to content

Commit c18b291

Browse files
authored
Add additional logging to subscribe route and simplify calling client_connected (#2998)
# Description of Changes Out-of-band discussions with the BitCraft team brought up questions about whether it was possible for a rejected client connection to start an expensive computation like a subscription before their connection was killed, e.g. by sending a `Subscribe` message along the WebSocket before `client_connected` had finished returning `Err`. I don't believe this was actually possible, as `ClientConnection::spawn` called and awaited `call_identity_connected` before invoking its `actor` closure, and it was that `actor` which processed `Subscribe` messages. But it was somewhat difficult to verify that behavior, and so I re-organized the code so that the outer layer of the `subscribe` handler obviously had that property without having to step into `ClientConnection::spawn`. I also added some additional logging to the subscribe route, including the `X-Forwarded-For` header in more messages, as the BitCraft team complained about having difficulty correlating IP addresses with connections. The log levels remain the same as before, just with additional information added: - Successful connections are at `debug` level, - Rejected connections are at `info` level (these are the ones BitCraft cares about in this case). - Failed connections are at `warn`. As the levels are unchanged, this should not add undesirable log noise. # API and ABI breaking changes N/a # Expected complexity level and risk 3? The `subscribe` route was complex and remains so. I believe this change simplifies the code and makes the logic more obvious, but reviewers should take care to verify that the behavior actually is equivalent as I believe. # Testing - [ ] I would like a test deployment of BitCraft to staging or something, or to include this patch in the next bot test that we do anyways, just for sanity's sake.
1 parent 797eea8 commit c18b291

File tree

2 files changed

+64
-17
lines changed

2 files changed

+64
-17
lines changed

crates/client-api/src/routes/subscribe.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ where
145145

146146
let identity_token = auth.creds.token().into();
147147

148-
let module_rx = leader.module_watcher().await.map_err(log_and_500)?;
148+
let mut module_rx = leader.module_watcher().await.map_err(log_and_500)?;
149149

150150
let client_id = ClientActorId {
151151
identity: auth.identity,
@@ -163,32 +163,46 @@ where
163163
let ws = match ws_upgrade.upgrade(ws_config).await {
164164
Ok(ws) => ws,
165165
Err(err) => {
166-
log::error!("WebSocket init error: {err}");
166+
log::error!("websocket: WebSocket init error: {err}");
167167
return;
168168
}
169169
};
170170

171-
match forwarded_for {
171+
let identity = client_id.identity;
172+
let client_log_string = match forwarded_for {
172173
Some(TypedHeader(XForwardedFor(ip))) => {
173-
log::debug!("New client connected from ip {ip}")
174+
format!("ip {ip} with Identity {identity} and ConnectionId {connection_id}")
174175
}
175-
None => log::debug!("New client connected from unknown ip"),
176-
}
176+
None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"),
177+
};
177178

178-
let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
179-
let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await
180-
{
181-
Ok(s) => s,
179+
log::debug!("websocket: New client connected from {client_log_string}");
180+
181+
let connected = match ClientConnection::call_client_connected_maybe_reject(&mut module_rx, client_id).await {
182+
Ok(connected) => {
183+
log::debug!("websocket: client_connected returned Ok for {client_log_string}");
184+
connected
185+
}
182186
Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => {
183-
log::info!("{e}");
187+
log::info!(
188+
"websocket: Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}"
189+
);
184190
return;
185191
}
186192
Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => {
187-
log::warn!("ModuleHost died while we were connecting: {e:#}");
193+
log::warn!("websocket: ModuleHost died while {client_log_string} was connecting: {e:#}");
188194
return;
189195
}
190196
};
191197

198+
log::debug!(
199+
"websocket: Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection"
200+
);
201+
202+
let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
203+
let client =
204+
ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor, connected).await;
205+
192206
// Send the client their identity token message as the first message
193207
// NOTE: We're adding this to the protocol because some client libraries are
194208
// unable to access the http response headers.
@@ -200,7 +214,7 @@ where
200214
connection_id,
201215
};
202216
if let Err(e) = client.send_message(message) {
203-
log::warn!("{e}, before identity token was sent")
217+
log::warn!("websocket: Error sending IdentityToken message to {client_log_string}: {e}");
204218
}
205219
});
206220

crates/core/src/client/client_connection.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -392,15 +392,49 @@ impl<T> Drop for MeteredReceiver<T> {
392392
const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB;
393393
const KB: usize = 1024;
394394

395+
/// Value returned by [`ClientConnection::call_client_connected_maybe_reject`]
396+
/// and consumed by [`ClientConnection::spawn`] which acts as a proof that the client is authorized.
397+
///
398+
/// Because this struct does not capture the module or database info or the client connection info,
399+
/// a malicious caller could [`ClientConnected::call_client_connected_maybe_reject`] for one client
400+
/// and then use the resulting `Connected` token to [`ClientConnection::spawn`] for a different client.
401+
/// We're not particularly worried about that.
402+
/// This token exists as a sanity check that non-malicious callers don't accidentally [`ClientConnection::spawn`]
403+
/// for an unauthorized client.
404+
#[non_exhaustive]
405+
pub struct Connected {
406+
_private: (),
407+
}
408+
395409
impl ClientConnection {
396-
/// Returns an error if ModuleHost closed
410+
/// Call the database at `module_rx`'s `client_connection` reducer, if any,
411+
/// and return `Err` if it signals rejecting this client's connection.
412+
///
413+
/// Call this method before [`Self::spawn`]
414+
/// and pass the returned [`Connected`] to [`Self::spawn`] as proof that the client is authorized.
415+
pub async fn call_client_connected_maybe_reject(
416+
module_rx: &mut watch::Receiver<ModuleHost>,
417+
id: ClientActorId,
418+
) -> Result<Connected, ClientConnectedError> {
419+
let module = module_rx.borrow_and_update().clone();
420+
module.call_identity_connected(id.identity, id.connection_id).await?;
421+
Ok(Connected { _private: () })
422+
}
423+
424+
/// Spawn a new [`ClientConnection`] for a WebSocket subscriber.
425+
///
426+
/// Callers should first call [`Self::call_client_connected_maybe_reject`]
427+
/// to verify that the database at `module_rx` approves of this connection,
428+
/// and should not invoke this method if that call returns an error,
429+
/// and pass the returned [`Connected`] as `_proof_of_client_connected_call`.
397430
pub async fn spawn<Fut>(
398431
id: ClientActorId,
399432
config: ClientConfig,
400433
replica_id: u64,
401434
mut module_rx: watch::Receiver<ModuleHost>,
402435
actor: impl FnOnce(ClientConnection, MeteredReceiver<SerializableMessage>) -> Fut,
403-
) -> Result<ClientConnection, ClientConnectedError>
436+
_proof_of_client_connected_call: Connected,
437+
) -> ClientConnection
404438
where
405439
Fut: Future<Output = ()> + Send + 'static,
406440
{
@@ -409,7 +443,6 @@ impl ClientConnection {
409443
// logically subscribed to the database, not any particular replica. We should handle failover for
410444
// them and stuff. Not right now though.
411445
let module = module_rx.borrow_and_update().clone();
412-
module.call_identity_connected(id.identity, id.connection_id).await?;
413446

414447
let (sendtx, sendrx) = mpsc::channel::<SerializableMessage>(CLIENT_CHANNEL_CAPACITY);
415448

@@ -455,7 +488,7 @@ impl ClientConnection {
455488
// if this fails, the actor() function called .abort(), which like... okay, I guess?
456489
let _ = fut_tx.send(actor_fut);
457490

458-
Ok(this)
491+
this
459492
}
460493

461494
pub fn dummy(

0 commit comments

Comments
 (0)