Skip to content

Commit f9c8e72

Browse files
fix incoming message queue length metric (#3172)
# Description of Changes <!-- Please describe your change, mention any related tickets, and so on here. --> We stopped incrementing the incoming queue length metric. This patch increments it again and adds a regression test. # API and ABI breaking changes <!-- If this is an API or ABI breaking change, please apply the corresponding GitHub label. --> None # Expected complexity level and risk <!-- How complicated do you think these changes are? Grade on a scale from 1 to 5, where 1 is a trivial change, and 5 is a deep-reaching and complex change. This complexity rating applies not only to the complexity apparent in the diff, but also to its interactions with existing and future code. If you answered more than a 2, explain what is complex about the PR, and what other components it interacts with in potentially concerning ways. --> 1 # Testing <!-- Describe any testing you've done, and any testing you'd like your reviewers to do, so that you're confident that all the changes work as expected! --> - [x] Regression test
1 parent 3feed08 commit f9c8e72

File tree

3 files changed

+59
-13
lines changed

3 files changed

+59
-13
lines changed

crates/client-api/src/routes/subscribe.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use spacetimedb::client::messages::{
2626
};
2727
use spacetimedb::client::{
2828
ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageExecutionError, MessageHandleError,
29-
MeteredReceiver, Protocol,
29+
MeteredReceiver, MeteredSender, Protocol,
3030
};
3131
use spacetimedb::host::module_host::ClientConnectedError;
3232
use spacetimedb::host::NoSuchModule;
@@ -676,7 +676,10 @@ async fn ws_recv_task<MessageHandler>(
676676
) where
677677
MessageHandler: Future<Output = Result<(), MessageHandleError>>,
678678
{
679-
let recv_queue = ws_recv_queue(state.clone(), unordered_tx.clone(), ws);
679+
let recv_queue_gauge = WORKER_METRICS
680+
.total_incoming_queue_length
681+
.with_label_values(&state.database);
682+
let recv_queue = ws_recv_queue(state.clone(), unordered_tx.clone(), recv_queue_gauge, ws);
680683
let recv_loop = pin!(ws_recv_loop(state.clone(), idle_tx, recv_queue));
681684
let recv_handler = ws_client_message_handler(state.clone(), client_closed_metric, recv_loop);
682685
pin_mut!(recv_handler);
@@ -816,6 +819,7 @@ fn ws_recv_loop(
816819
fn ws_recv_queue(
817820
state: Arc<ActorState>,
818821
unordered_tx: mpsc::UnboundedSender<UnorderedWsMessage>,
822+
recv_queue_gauge: IntGauge,
819823
mut ws: impl Stream<Item = Result<WsMessage, WsError>> + Unpin + Send + 'static,
820824
) -> impl Stream<Item = Result<WsMessage, WsError>> {
821825
const CLOSE: UnorderedWsMessage = UnorderedWsMessage::Close(CloseFrame {
@@ -829,14 +833,10 @@ fn ws_recv_queue(
829833
let max_incoming_queue_length = state.config.incoming_queue_length.get();
830834

831835
let (tx, rx) = mpsc::channel(max_incoming_queue_length);
832-
let rx = MeteredReceiverStream {
833-
inner: MeteredReceiver::with_gauge(
834-
rx,
835-
WORKER_METRICS
836-
.total_incoming_queue_length
837-
.with_label_values(&state.database),
838-
),
839-
};
836+
837+
let mut tx = MeteredSender::with_gauge(tx, recv_queue_gauge.clone());
838+
let rx = MeteredReceiver::with_gauge(rx, recv_queue_gauge);
839+
let rx = MeteredReceiverStream { inner: rx };
840840

841841
tokio::spawn(async move {
842842
while let Some(item) = ws.next().await {
@@ -1692,8 +1692,14 @@ mod tests {
16921692
let (unordered_tx, mut unordered_rx) = mpsc::unbounded_channel();
16931693
let input = stream::iter((0..20).map(|i| Ok(WsMessage::text(format!("message {i}")))));
16941694

1695-
let received = ws_recv_queue(state, unordered_tx, input).collect::<Vec<_>>().await;
1695+
let metric = IntGauge::new("bleep", "unhelpful").unwrap();
1696+
let received = ws_recv_queue(state, unordered_tx, metric.clone(), input)
1697+
.collect::<Vec<_>>()
1698+
.await;
1699+
16961700
assert_matches!(unordered_rx.recv().await, Some(UnorderedWsMessage::Close(_)));
1701+
// Queue length metric should be zero
1702+
assert_eq!(metric.get(), 0);
16971703
// Should have received all of the input.
16981704
assert_eq!(received.len(), 20);
16991705
}
@@ -1708,10 +1714,14 @@ mod tests {
17081714
let (unordered_tx, _) = mpsc::unbounded_channel();
17091715
let input = stream::iter((0..20).map(|i| Ok(WsMessage::text(format!("message {i}")))));
17101716

1711-
let received = ws_recv_queue(state.clone(), unordered_tx, input)
1717+
let metric = IntGauge::new("bleep", "unhelpful").unwrap();
1718+
let received = ws_recv_queue(state.clone(), unordered_tx, metric.clone(), input)
17121719
.collect::<Vec<_>>()
17131720
.await;
1721+
17141722
assert!(state.closed());
1723+
// Queue length metric should be zero
1724+
assert_eq!(metric.get(), 0);
17151725
// Should have received up to capacity.
17161726
assert_eq!(received.len(), 10);
17171727
}

crates/core/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub mod messages;
88

99
pub use client_connection::{
1010
ClientConfig, ClientConnection, ClientConnectionSender, ClientSendError, DataMessage, MeteredDeque,
11-
MeteredReceiver, Protocol,
11+
MeteredReceiver, MeteredSender, Protocol,
1212
};
1313
pub use client_connection_index::ClientActorIndex;
1414
pub use message_handlers::{MessageExecutionError, MessageHandleError};

crates/core/src/client/client_connection.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use spacetimedb_client_api_messages::websocket::{
2828
use spacetimedb_lib::identity::RequestId;
2929
use spacetimedb_lib::metrics::ExecutionMetrics;
3030
use spacetimedb_lib::Identity;
31+
use tokio::sync::mpsc::error::{SendError, TrySendError};
3132
use tokio::sync::{mpsc, oneshot, watch};
3233
use tokio::task::AbortHandle;
3334

@@ -386,6 +387,41 @@ impl<T> Drop for MeteredReceiver<T> {
386387
}
387388
}
388389

390+
/// Wraps the transmitting end of a channel with a gauge for tracking the size of the channel.
391+
pub struct MeteredSender<T> {
392+
inner: mpsc::Sender<T>,
393+
gauge: Option<IntGauge>,
394+
}
395+
396+
impl<T> MeteredSender<T> {
397+
pub fn new(inner: mpsc::Sender<T>) -> Self {
398+
Self { inner, gauge: None }
399+
}
400+
401+
pub fn with_gauge(inner: mpsc::Sender<T>, gauge: IntGauge) -> Self {
402+
Self {
403+
inner,
404+
gauge: Some(gauge),
405+
}
406+
}
407+
408+
pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
409+
self.inner.send(value).await?;
410+
if let Some(gauge) = &self.gauge {
411+
gauge.inc();
412+
}
413+
Ok(())
414+
}
415+
416+
pub fn try_send(&mut self, value: T) -> Result<(), TrySendError<T>> {
417+
self.inner.try_send(value)?;
418+
if let Some(gauge) = &self.gauge {
419+
gauge.inc();
420+
}
421+
Ok(())
422+
}
423+
}
424+
389425
// if a client racks up this many messages in the queue without ACK'ing
390426
// anything, we boot 'em.
391427
const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB;

0 commit comments

Comments
 (0)