@@ -34,7 +34,9 @@ use restate_types::{Version, Versioned};
3434
3535use crate :: network:: incoming:: { RawRpc , RawUnary , RpcReplyPort } ;
3636use crate :: network:: io:: EgressMessage ;
37- use crate :: network:: metric_definitions:: NETWORK_MESSAGE_RECEIVED_BYTES ;
37+ use crate :: network:: metric_definitions:: {
38+ NETWORK_MESSAGE_RECEIVED_BYTES , NETWORK_MESSAGE_RECEIVED_DROPPED_BYTES ,
39+ } ;
3840use crate :: network:: protobuf:: network:: message:: { Body , Signal } ;
3941use crate :: network:: protobuf:: network:: { Datagram , RpcReply , datagram, rpc_reply} ;
4042use crate :: network:: protobuf:: network:: { Header , Message } ;
@@ -332,7 +334,7 @@ impl ConnectionReactor {
332334 } ;
333335 let target_service = rpc_call. service ( ) ;
334336
335- let encoded_len = rpc_call. payload . len ( ) ;
337+ let encoded_len = rpc_call. payload . len ( ) as u64 ;
336338 let ( reply_port, reply_rx) = RpcReplyPort :: new ( ) ;
337339 let raw_rpc = RawRpc {
338340 reply_port,
@@ -352,22 +354,21 @@ impl ConnectionReactor {
352354 "Received RPC call: {target_service}::{}" ,
353355 incoming. msg_type( )
354356 ) ;
357+
355358 // ship to the service router, dropping the reply port will close the responder
356359 // task.
357360 match tokio:: task:: unconstrained ( self . router . call_rpc ( target_service, incoming) )
358361 . await
359362 {
360- Ok ( ( ) ) => { /* spawn reply task */ }
363+ Ok ( ( ) ) => {
364+ counter ! ( NETWORK_MESSAGE_RECEIVED_BYTES , "target" => target_service. as_str_name( ) ) . increment ( encoded_len) ;
365+ spawn_rpc_responder ( tx. clone ( ) , rpc_call. id , reply_rx, target_service) ;
366+ }
361367 Err ( err) => {
362368 send_rpc_error ( tx, err, rpc_call. id ) ;
369+ counter ! ( NETWORK_MESSAGE_RECEIVED_DROPPED_BYTES , "target" => target_service. as_str_name( ) ) . increment ( encoded_len) ;
363370 }
364371 }
365-
366- counter ! ( NETWORK_MESSAGE_RECEIVED_BYTES , "target" => target_service. as_str_name( ) )
367- . increment ( encoded_len as u64 ) ;
368-
369- spawn_rpc_responder ( tx. clone ( ) , rpc_call. id , reply_rx, target_service) ;
370-
371372 Decision :: Continue
372373 }
373374 // UNARY MESSAGE
@@ -376,7 +377,7 @@ impl ConnectionReactor {
376377 } ) => {
377378 let metadata_versions = PeerMetadataVersion :: from ( header) ;
378379 let target = unary. service ( ) ;
379- let encoded_len = unary. payload . len ( ) ;
380+ let encoded_len = unary. payload . len ( ) as u64 ;
380381 let incoming = Incoming :: new (
381382 self . connection . protocol_version ,
382383 RawUnary {
@@ -392,7 +393,7 @@ impl ConnectionReactor {
392393 let _ = tokio:: task:: unconstrained ( self . router . call_unary ( target, incoming) ) . await ;
393394
394395 counter ! ( NETWORK_MESSAGE_RECEIVED_BYTES , "target" => target. as_str_name( ) )
395- . increment ( encoded_len as u64 ) ;
396+ . increment ( encoded_len) ;
396397 Decision :: Continue
397398 }
398399 // RPC REPLY
0 commit comments