@@ -392,7 +392,7 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
392
392
unsigned int pos ;
393
393
int nsegs ;
394
394
395
- if (rtype == rpcrdma_noch )
395
+ if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped )
396
396
goto done ;
397
397
398
398
pos = rqst -> rq_snd_buf .head [0 ].iov_len ;
@@ -691,6 +691,72 @@ static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req,
691
691
return false;
692
692
}
693
693
694
+ /* Copy the tail to the end of the head buffer.
695
+ */
696
+ static void rpcrdma_pullup_tail_iov (struct rpcrdma_xprt * r_xprt ,
697
+ struct rpcrdma_req * req ,
698
+ struct xdr_buf * xdr )
699
+ {
700
+ unsigned char * dst ;
701
+
702
+ dst = (unsigned char * )xdr -> head [0 ].iov_base ;
703
+ dst += xdr -> head [0 ].iov_len + xdr -> page_len ;
704
+ memmove (dst , xdr -> tail [0 ].iov_base , xdr -> tail [0 ].iov_len );
705
+ r_xprt -> rx_stats .pullup_copy_count += xdr -> tail [0 ].iov_len ;
706
+ }
707
+
708
+ /* Copy pagelist content into the head buffer.
709
+ */
710
+ static void rpcrdma_pullup_pagelist (struct rpcrdma_xprt * r_xprt ,
711
+ struct rpcrdma_req * req ,
712
+ struct xdr_buf * xdr )
713
+ {
714
+ unsigned int len , page_base , remaining ;
715
+ struct page * * ppages ;
716
+ unsigned char * src , * dst ;
717
+
718
+ dst = (unsigned char * )xdr -> head [0 ].iov_base ;
719
+ dst += xdr -> head [0 ].iov_len ;
720
+ ppages = xdr -> pages + (xdr -> page_base >> PAGE_SHIFT );
721
+ page_base = offset_in_page (xdr -> page_base );
722
+ remaining = xdr -> page_len ;
723
+ while (remaining ) {
724
+ src = page_address (* ppages );
725
+ src += page_base ;
726
+ len = min_t (unsigned int , PAGE_SIZE - page_base , remaining );
727
+ memcpy (dst , src , len );
728
+ r_xprt -> rx_stats .pullup_copy_count += len ;
729
+
730
+ ppages ++ ;
731
+ dst += len ;
732
+ remaining -= len ;
733
+ page_base = 0 ;
734
+ }
735
+ }
736
+
737
+ /* Copy the contents of @xdr into @rl_sendbuf and DMA sync it.
738
+ * When the head, pagelist, and tail are small, a pull-up copy
739
+ * is considerably less costly than DMA mapping the components
740
+ * of @xdr.
741
+ *
742
+ * Assumptions:
743
+ * - the caller has already verified that the total length
744
+ * of the RPC Call body will fit into @rl_sendbuf.
745
+ */
746
+ static bool rpcrdma_prepare_noch_pullup (struct rpcrdma_xprt * r_xprt ,
747
+ struct rpcrdma_req * req ,
748
+ struct xdr_buf * xdr )
749
+ {
750
+ if (unlikely (xdr -> tail [0 ].iov_len ))
751
+ rpcrdma_pullup_tail_iov (r_xprt , req , xdr );
752
+
753
+ if (unlikely (xdr -> page_len ))
754
+ rpcrdma_pullup_pagelist (r_xprt , req , xdr );
755
+
756
+ /* The whole RPC message resides in the head iovec now */
757
+ return rpcrdma_prepare_head_iov (r_xprt , req , xdr -> len );
758
+ }
759
+
694
760
static bool rpcrdma_prepare_noch_mapped (struct rpcrdma_xprt * r_xprt ,
695
761
struct rpcrdma_req * req ,
696
762
struct xdr_buf * xdr )
@@ -779,7 +845,11 @@ inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
779
845
goto out_unmap ;
780
846
781
847
switch (rtype ) {
782
- case rpcrdma_noch :
848
+ case rpcrdma_noch_pullup :
849
+ if (!rpcrdma_prepare_noch_pullup (r_xprt , req , xdr ))
850
+ goto out_unmap ;
851
+ break ;
852
+ case rpcrdma_noch_mapped :
783
853
if (!rpcrdma_prepare_noch_mapped (r_xprt , req , xdr ))
784
854
goto out_unmap ;
785
855
break ;
@@ -827,6 +897,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
827
897
struct rpcrdma_req * req = rpcr_to_rdmar (rqst );
828
898
struct xdr_stream * xdr = & req -> rl_stream ;
829
899
enum rpcrdma_chunktype rtype , wtype ;
900
+ struct xdr_buf * buf = & rqst -> rq_snd_buf ;
830
901
bool ddp_allowed ;
831
902
__be32 * p ;
832
903
int ret ;
@@ -884,8 +955,9 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
884
955
*/
885
956
if (rpcrdma_args_inline (r_xprt , rqst )) {
886
957
* p ++ = rdma_msg ;
887
- rtype = rpcrdma_noch ;
888
- } else if (ddp_allowed && rqst -> rq_snd_buf .flags & XDRBUF_WRITE ) {
958
+ rtype = buf -> len < rdmab_length (req -> rl_sendbuf ) ?
959
+ rpcrdma_noch_pullup : rpcrdma_noch_mapped ;
960
+ } else if (ddp_allowed && buf -> flags & XDRBUF_WRITE ) {
889
961
* p ++ = rdma_msg ;
890
962
rtype = rpcrdma_readch ;
891
963
} else {
@@ -927,7 +999,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
927
999
goto out_err ;
928
1000
929
1001
ret = rpcrdma_prepare_send_sges (r_xprt , req , req -> rl_hdrbuf .len ,
930
- & rqst -> rq_snd_buf , rtype );
1002
+ buf , rtype );
931
1003
if (ret )
932
1004
goto out_err ;
933
1005
0 commit comments