@@ -541,3 +541,149 @@ where
541541 }
542542 }
543543}
544+
545+ /// A wrapper around [http_body::Body], which allows attaching arbitrary state to it
546+ pub ( crate ) struct BodyWithState < T , U > {
547+ body : T ,
548+ _state : U ,
549+ }
550+
551+ impl < T , U > http_body:: Body for BodyWithState < T , U >
552+ where
553+ T : http_body:: Body + Unpin ,
554+ U : Unpin ,
555+ {
556+ type Data = T :: Data ;
557+ type Error = T :: Error ;
558+
559+ #[ inline]
560+ fn poll_frame (
561+ self : Pin < & mut Self > ,
562+ cx : & mut Context < ' _ > ,
563+ ) -> Poll < Option < Result < http_body:: Frame < Self :: Data > , Self :: Error > > > {
564+ Pin :: new ( & mut self . get_mut ( ) . body ) . poll_frame ( cx)
565+ }
566+
567+ #[ inline]
568+ fn is_end_stream ( & self ) -> bool {
569+ self . body . is_end_stream ( )
570+ }
571+
572+ #[ inline]
573+ fn size_hint ( & self ) -> http_body:: SizeHint {
574+ self . body . size_hint ( )
575+ }
576+ }
577+
578+ /// A wrapper around [http_body::Body], which validates `Content-Length`
579+ pub ( crate ) struct BodyWithContentLength < T , E > {
580+ body : T ,
581+ error_tx : Option < oneshot:: Sender < E > > ,
582+ make_error : fn ( Option < u64 > ) -> E ,
583+ /// Limit of bytes to be sent
584+ limit : u64 ,
585+ /// Number of bytes sent
586+ sent : u64 ,
587+ }
588+
589+ impl < T , E > BodyWithContentLength < T , E > {
590+ /// Sends the error constructed by [Self::make_error] on [Self::error_tx].
591+ /// Does nothing if an error has already been sent on [Self::error_tx].
592+ fn send_error < V > ( & mut self , sent : Option < u64 > ) -> Poll < Option < Result < V , E > > > {
593+ if let Some ( error_tx) = self . error_tx . take ( ) {
594+ _ = error_tx. send ( ( self . make_error ) ( sent) ) ;
595+ }
596+ Poll :: Ready ( Some ( Err ( ( self . make_error ) ( sent) ) ) )
597+ }
598+ }
599+
600+ impl < T , E > http_body:: Body for BodyWithContentLength < T , E >
601+ where
602+ T : http_body:: Body < Data = Bytes , Error = E > + Unpin ,
603+ {
604+ type Data = T :: Data ;
605+ type Error = T :: Error ;
606+
607+ #[ inline]
608+ fn poll_frame (
609+ mut self : Pin < & mut Self > ,
610+ cx : & mut Context < ' _ > ,
611+ ) -> Poll < Option < Result < http_body:: Frame < Self :: Data > , Self :: Error > > > {
612+ match ready ! ( Pin :: new( & mut self . as_mut( ) . body) . poll_frame( cx) ) {
613+ Some ( Ok ( frame) ) => {
614+ let Some ( data) = frame. data_ref ( ) else {
615+ return Poll :: Ready ( Some ( Ok ( frame) ) ) ;
616+ } ;
617+ let Ok ( sent) = data. len ( ) . try_into ( ) else {
618+ return self . send_error ( None ) ;
619+ } ;
620+ let Some ( sent) = self . sent . checked_add ( sent) else {
621+ return self . send_error ( None ) ;
622+ } ;
623+ if sent > self . limit {
624+ return self . send_error ( Some ( sent) ) ;
625+ }
626+ self . sent = sent;
627+ Poll :: Ready ( Some ( Ok ( frame) ) )
628+ }
629+ Some ( Err ( err) ) => Poll :: Ready ( Some ( Err ( err) ) ) ,
630+ None if self . limit != self . sent => {
631+ // short write
632+ let sent = self . sent ;
633+ self . send_error ( Some ( sent) )
634+ }
635+ None => Poll :: Ready ( None ) ,
636+ }
637+ }
638+
639+ #[ inline]
640+ fn is_end_stream ( & self ) -> bool {
641+ self . body . is_end_stream ( )
642+ }
643+
644+ #[ inline]
645+ fn size_hint ( & self ) -> http_body:: SizeHint {
646+ let n = self . limit . saturating_sub ( self . sent ) ;
647+ let mut hint = self . body . size_hint ( ) ;
648+ if hint. lower ( ) >= n {
649+ hint. set_exact ( n)
650+ } else if let Some ( max) = hint. upper ( ) {
651+ hint. set_upper ( n. min ( max) )
652+ } else {
653+ hint. set_upper ( n)
654+ }
655+ hint
656+ }
657+ }
658+
659+ pub ( crate ) trait BodyExt {
660+ fn with_state < T > ( self , state : T ) -> BodyWithState < Self , T >
661+ where
662+ Self : Sized ,
663+ {
664+ BodyWithState {
665+ body : self ,
666+ _state : state,
667+ }
668+ }
669+
670+ fn with_content_length < E > (
671+ self ,
672+ limit : u64 ,
673+ error_tx : oneshot:: Sender < E > ,
674+ make_error : fn ( Option < u64 > ) -> E ,
675+ ) -> BodyWithContentLength < Self , E >
676+ where
677+ Self : Sized ,
678+ {
679+ BodyWithContentLength {
680+ body : self ,
681+ error_tx : Some ( error_tx) ,
682+ make_error,
683+ limit,
684+ sent : 0 ,
685+ }
686+ }
687+ }
688+
689+ impl < T > BodyExt for T { }
0 commit comments