Skip to content

Commit eae15c3

Browse files
committed
[Networking] Do not spawn an RPC responder task if message_router dropped the message
This also adds a new metric/counter to count the bytes we dropped after receiving due to a lossy service.
1 parent c88a464 commit eae15c3

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

crates/core/src/network/io/reactor.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ use restate_types::{Version, Versioned};
3434

3535
use crate::network::incoming::{RawRpc, RawUnary, RpcReplyPort};
3636
use crate::network::io::EgressMessage;
37-
use crate::network::metric_definitions::NETWORK_MESSAGE_RECEIVED_BYTES;
37+
use crate::network::metric_definitions::{
38+
NETWORK_MESSAGE_RECEIVED_BYTES, NETWORK_MESSAGE_RECEIVED_DROPPED_BYTES,
39+
};
3840
use crate::network::protobuf::network::message::{Body, Signal};
3941
use crate::network::protobuf::network::{Datagram, RpcReply, datagram, rpc_reply};
4042
use crate::network::protobuf::network::{Header, Message};
@@ -332,7 +334,7 @@ impl ConnectionReactor {
332334
};
333335
let target_service = rpc_call.service();
334336

335-
let encoded_len = rpc_call.payload.len();
337+
let encoded_len = rpc_call.payload.len() as u64;
336338
let (reply_port, reply_rx) = RpcReplyPort::new();
337339
let raw_rpc = RawRpc {
338340
reply_port,
@@ -352,22 +354,21 @@ impl ConnectionReactor {
352354
"Received RPC call: {target_service}::{}",
353355
incoming.msg_type()
354356
);
357+
355358
// ship to the service router, dropping the reply port will close the responder
356359
// task.
357360
match tokio::task::unconstrained(self.router.call_rpc(target_service, incoming))
358361
.await
359362
{
360-
Ok(()) => { /* spawn reply task */ }
363+
Ok(()) => {
364+
counter!(NETWORK_MESSAGE_RECEIVED_BYTES, "target" => target_service.as_str_name()).increment(encoded_len);
365+
spawn_rpc_responder(tx.clone(), rpc_call.id, reply_rx, target_service);
366+
}
361367
Err(err) => {
362368
send_rpc_error(tx, err, rpc_call.id);
369+
counter!(NETWORK_MESSAGE_RECEIVED_DROPPED_BYTES, "target" => target_service.as_str_name()).increment(encoded_len);
363370
}
364371
}
365-
366-
counter!(NETWORK_MESSAGE_RECEIVED_BYTES, "target" => target_service.as_str_name())
367-
.increment(encoded_len as u64);
368-
369-
spawn_rpc_responder(tx.clone(), rpc_call.id, reply_rx, target_service);
370-
371372
Decision::Continue
372373
}
373374
// UNARY MESSAGE
@@ -376,7 +377,7 @@ impl ConnectionReactor {
376377
}) => {
377378
let metadata_versions = PeerMetadataVersion::from(header);
378379
let target = unary.service();
379-
let encoded_len = unary.payload.len();
380+
let encoded_len = unary.payload.len() as u64;
380381
let incoming = Incoming::new(
381382
self.connection.protocol_version,
382383
RawUnary {
@@ -392,7 +393,7 @@ impl ConnectionReactor {
392393
let _ = tokio::task::unconstrained(self.router.call_unary(target, incoming)).await;
393394

394395
counter!(NETWORK_MESSAGE_RECEIVED_BYTES, "target" => target.as_str_name())
395-
.increment(encoded_len as u64);
396+
.increment(encoded_len);
396397
Decision::Continue
397398
}
398399
// RPC REPLY

crates/core/src/network/metric_definitions.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use metrics::{Unit, describe_counter, describe_histogram};
1313
pub const NETWORK_CONNECTION_CREATED: &str = "restate.network.connection_created.total";
1414
pub const NETWORK_CONNECTION_DROPPED: &str = "restate.network.connection_dropped.total";
1515
pub const NETWORK_MESSAGE_RECEIVED_BYTES: &str = "restate.network.message_received_bytes.total";
16+
pub const NETWORK_MESSAGE_RECEIVED_DROPPED_BYTES: &str =
17+
"restate.network.message_received_dropped_bytes.total";
1618

1719
pub const NETWORK_MESSAGE_PROCESSING_DURATION: &str =
1820
"restate.network.message_processing_duration.seconds";
@@ -31,7 +33,13 @@ pub fn describe_metrics() {
3133
describe_counter!(
3234
NETWORK_MESSAGE_RECEIVED_BYTES,
3335
Unit::Bytes,
34-
"Number of bytes received by message name"
36+
"Number of bytes received by service name"
37+
);
38+
39+
describe_counter!(
40+
NETWORK_MESSAGE_RECEIVED_DROPPED_BYTES,
41+
Unit::Bytes,
42+
"Number of bytes received and dropped/rejected by service name"
3543
);
3644

3745
describe_histogram!(

0 commit comments

Comments
 (0)