@@ -6,7 +6,7 @@ use crate::{
66 model:: {
77 CancelledNotification , CancelledNotificationParam , JsonRpcBatchRequestItem ,
88 JsonRpcBatchResponseItem , JsonRpcError , JsonRpcMessage , JsonRpcNotification ,
9- JsonRpcRequest , JsonRpcResponse , RequestId ,
9+ JsonRpcRequest , JsonRpcResponse , ProgressToken , RequestId , RequestMeta , WithMeta ,
1010 } ,
1111 transport:: IntoTransport ,
1212} ;
@@ -58,12 +58,12 @@ impl<T> TransferObject for T where
5858
5959#[ allow( private_bounds, reason = "there's no the third implementation" ) ]
6060pub trait ServiceRole : std:: fmt:: Debug + Send + Sync + ' static + Copy + Clone {
61- type Req : TransferObject ;
61+ type Req : TransferObject + WithMeta < RequestMeta > ;
6262 type Resp : TransferObject ;
6363 type Not : TryInto < CancelledNotification , Error = Self :: Not >
6464 + From < CancelledNotification >
6565 + TransferObject ;
66- type PeerReq : TransferObject ;
66+ type PeerReq : TransferObject + WithMeta < RequestMeta > ;
6767 type PeerResp : TransferObject ;
6868 type PeerNot : TryInto < CancelledNotification , Error = Self :: PeerNot >
6969 + From < CancelledNotification >
@@ -201,17 +201,30 @@ pub trait RequestIdProvider: Send + Sync + 'static {
201201 fn next_request_id ( & self ) -> RequestId ;
202202}
203203
204+ pub trait ProgressTokenProvider : Send + Sync + ' static {
205+ fn next_progress_token ( & self ) -> ProgressToken ;
206+ }
207+
208+ pub type AtomicU32RequestIdProvider = AtomicU32Provider ;
209+ pub type AtomicU32ProgressTokenProvider = AtomicU32Provider ;
210+
204211#[ derive( Debug , Default ) ]
205- pub struct AtomicU32RequestIdProvider {
212+ pub struct AtomicU32Provider {
206213 id : AtomicU32 ,
207214}
208215
209- impl RequestIdProvider for AtomicU32RequestIdProvider {
216+ impl RequestIdProvider for AtomicU32Provider {
210217 fn next_request_id ( & self ) -> RequestId {
211218 RequestId :: Number ( self . id . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) )
212219 }
213220}
214221
222+ impl ProgressTokenProvider for AtomicU32Provider {
223+ fn next_progress_token ( & self ) -> RequestId {
224+ RequestId :: Number ( self . id . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) )
225+ }
226+ }
227+
215228type Responder < T > = tokio:: sync:: oneshot:: Sender < T > ;
216229
217230/// A handle to a remote request
@@ -225,6 +238,7 @@ pub struct RequestHandle<R: ServiceRole> {
225238 pub options : PeerRequestOptions ,
226239 pub peer : Peer < R > ,
227240 pub id : RequestId ,
241+ pub progress_token : ProgressToken ,
228242}
229243
230244impl < R : ServiceRole > RequestHandle < R > {
@@ -275,13 +289,16 @@ impl<R: ServiceRole> RequestHandle<R> {
275289}
276290
277291#[ derive( Debug ) ]
278- pub enum PeerSinkMessage < R : ServiceRole > {
279- Request (
280- R :: Req ,
281- RequestId ,
282- Responder < Result < R :: PeerResp , ServiceError > > ,
283- ) ,
284- Notification ( R :: Not , Responder < Result < ( ) , ServiceError > > ) ,
292+ pub ( crate ) enum PeerSinkMessage < R : ServiceRole > {
293+ Request {
294+ request : R :: Req ,
295+ id : RequestId ,
296+ responder : Responder < Result < R :: PeerResp , ServiceError > > ,
297+ } ,
298+ Notification {
299+ notification : R :: Not ,
300+ responder : Responder < Result < ( ) , ServiceError > > ,
301+ } ,
285302}
286303
287304/// An interface to fetch the remote client or server
@@ -293,6 +310,7 @@ pub enum PeerSinkMessage<R: ServiceRole> {
293310pub struct Peer < R : ServiceRole > {
294311 tx : mpsc:: Sender < PeerSinkMessage < R > > ,
295312 request_id_provider : Arc < dyn RequestIdProvider > ,
313+ progress_token_provider : Arc < dyn ProgressTokenProvider > ,
296314 info : Arc < R :: PeerInfo > ,
297315}
298316
@@ -320,7 +338,7 @@ impl PeerRequestOptions {
320338
321339impl < R : ServiceRole > Peer < R > {
322340 const CLIENT_CHANNEL_BUFFER_SIZE : usize = 1024 ;
323- pub fn new (
341+ pub ( crate ) fn new (
324342 request_id_provider : Arc < dyn RequestIdProvider > ,
325343 peer_info : R :: PeerInfo ,
326344 ) -> ( Peer < R > , ProxyOutbound < R > ) {
@@ -329,6 +347,7 @@ impl<R: ServiceRole> Peer<R> {
329347 Self {
330348 tx,
331349 request_id_provider,
350+ progress_token_provider : Arc :: new ( AtomicU32ProgressTokenProvider :: default ( ) ) ,
332351 info : peer_info. into ( ) ,
333352 } ,
334353 rx,
@@ -337,7 +356,10 @@ impl<R: ServiceRole> Peer<R> {
337356 pub async fn send_notification ( & self , notification : R :: Not ) -> Result < ( ) , ServiceError > {
338357 let ( responder, receiver) = tokio:: sync:: oneshot:: channel ( ) ;
339358 self . tx
340- . send ( PeerSinkMessage :: Notification ( notification, responder) )
359+ . send ( PeerSinkMessage :: Notification {
360+ notification,
361+ responder,
362+ } )
341363 . await
342364 . map_err ( |_m| ServiceError :: Transport ( std:: io:: Error :: other ( "disconnected" ) ) ) ?;
343365 receiver
@@ -352,18 +374,27 @@ impl<R: ServiceRole> Peer<R> {
352374 }
353375 pub async fn send_cancellable_request (
354376 & self ,
355- request : R :: Req ,
377+ mut request : R :: Req ,
356378 options : PeerRequestOptions ,
357379 ) -> Result < RequestHandle < R > , ServiceError > {
358380 let id = self . request_id_provider . next_request_id ( ) ;
381+ let progress_token = self . progress_token_provider . next_progress_token ( ) ;
382+ request. set_meta ( Some ( RequestMeta {
383+ progress_token : progress_token. clone ( ) ,
384+ } ) ) ;
359385 let ( responder, receiver) = tokio:: sync:: oneshot:: channel ( ) ;
360386 self . tx
361- . send ( PeerSinkMessage :: Request ( request, id. clone ( ) , responder) )
387+ . send ( PeerSinkMessage :: Request {
388+ request,
389+ id : id. clone ( ) ,
390+ responder,
391+ } )
362392 . await
363393 . map_err ( |_m| ServiceError :: Transport ( std:: io:: Error :: other ( "disconnected" ) ) ) ?;
364394 Ok ( RequestHandle {
365395 id,
366396 rx : receiver,
397+ progress_token,
367398 options,
368399 peer : self . clone ( ) ,
369400 } )
@@ -419,6 +450,7 @@ pub struct RequestContext<R: ServiceRole> {
419450 /// this token will be cancelled when the [`CancelledNotification`] is received.
420451 pub ct : CancellationToken ,
421452 pub id : RequestId ,
453+ pub meta : Option < RequestMeta > ,
422454 /// An interface to fetch the remote client or server
423455 pub peer : Peer < R > ,
424456}
@@ -459,7 +491,7 @@ async fn serve_inner<R, S, T, E, A>(
459491 mut service : S ,
460492 transport : T ,
461493 peer_info : R :: PeerInfo ,
462- id_provider : Arc < AtomicU32RequestIdProvider > ,
494+ id_provider : Arc < AtomicU32Provider > ,
463495 ct : CancellationToken ,
464496) -> Result < RunningService < R , S > , E >
465497where
@@ -555,7 +587,11 @@ where
555587 }
556588 }
557589 }
558- Event :: ProxyMessage ( PeerSinkMessage :: Request ( request, id, responder) ) => {
590+ Event :: ProxyMessage ( PeerSinkMessage :: Request {
591+ request,
592+ id,
593+ responder,
594+ } ) => {
559595 local_responder_pool. insert ( id. clone ( ) , responder) ;
560596 let send_result = sink
561597 . send ( JsonRpcMessage :: request ( request, id. clone ( ) ) )
@@ -567,7 +603,10 @@ where
567603 }
568604 }
569605 }
570- Event :: ProxyMessage ( PeerSinkMessage :: Notification ( notification, responder) ) => {
606+ Event :: ProxyMessage ( PeerSinkMessage :: Notification {
607+ notification,
608+ responder,
609+ } ) => {
571610 // catch cancellation notification
572611 let mut cancellation_param = None ;
573612 let notification = match notification. try_into ( ) {
@@ -605,6 +644,7 @@ where
605644 ct : context_ct,
606645 id : id. clone ( ) ,
607646 peer : peer. clone ( ) ,
647+ meta : request. get_meta ( ) . cloned ( ) ,
608648 } ;
609649 tokio:: spawn ( async move {
610650 let result = service. handle_request ( request, context) . await ;
0 commit comments