@@ -163,14 +163,15 @@ void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
163
163
cc -> cc_sqecount = 0 ;
164
164
}
165
165
166
- /*
167
- * The consumed rw_ctx's are cleaned and placed on a local llist so
168
- * that only one atomic llist operation is needed to put them all
169
- * back on the free list.
166
+ /**
167
+ * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
168
+ * @rdma: controlling transport instance
169
+ * @cc: svc_rdma_chunk_ctxt to be released
170
+ * @dir: DMA direction
170
171
*/
171
- static void svc_rdma_cc_release (struct svcxprt_rdma * rdma ,
172
- struct svc_rdma_chunk_ctxt * cc ,
173
- enum dma_data_direction dir )
172
+ void svc_rdma_cc_release (struct svcxprt_rdma * rdma ,
173
+ struct svc_rdma_chunk_ctxt * cc ,
174
+ enum dma_data_direction dir )
174
175
{
175
176
struct llist_node * first , * last ;
176
177
struct svc_rdma_rw_ctxt * ctxt ;
@@ -300,23 +301,35 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
300
301
container_of (cqe , struct svc_rdma_chunk_ctxt , cc_cqe );
301
302
struct svc_rdma_recv_ctxt * ctxt ;
302
303
304
+ svc_rdma_wake_send_waiters (rdma , cc -> cc_sqecount );
305
+
306
+ ctxt = container_of (cc , struct svc_rdma_recv_ctxt , rc_cc );
303
307
switch (wc -> status ) {
304
308
case IB_WC_SUCCESS :
305
- ctxt = container_of (cc , struct svc_rdma_recv_ctxt , rc_cc );
306
309
trace_svcrdma_wc_read (wc , & cc -> cc_cid , ctxt -> rc_readbytes ,
307
310
cc -> cc_posttime );
308
- break ;
311
+
312
+ spin_lock (& rdma -> sc_rq_dto_lock );
313
+ list_add_tail (& ctxt -> rc_list , & rdma -> sc_read_complete_q );
314
+ /* the unlock pairs with the smp_rmb in svc_xprt_ready */
315
+ set_bit (XPT_DATA , & rdma -> sc_xprt .xpt_flags );
316
+ spin_unlock (& rdma -> sc_rq_dto_lock );
317
+ svc_xprt_enqueue (& rdma -> sc_xprt );
318
+ return ;
309
319
case IB_WC_WR_FLUSH_ERR :
310
320
trace_svcrdma_wc_read_flush (wc , & cc -> cc_cid );
311
321
break ;
312
322
default :
313
323
trace_svcrdma_wc_read_err (wc , & cc -> cc_cid );
314
324
}
315
325
316
- svc_rdma_wake_send_waiters (rdma , cc -> cc_sqecount );
317
- cc -> cc_status = wc -> status ;
318
- complete (& cc -> cc_done );
319
- return ;
326
+ /* The RDMA Read has flushed, so the incoming RPC message
327
+ * cannot be constructed and must be dropped. Signal the
328
+ * loss to the client by closing the connection.
329
+ */
330
+ svc_rdma_cc_release (rdma , cc , DMA_FROM_DEVICE );
331
+ svc_rdma_recv_ctxt_put (rdma , ctxt );
332
+ svc_xprt_deferred_close (& rdma -> sc_xprt );
320
333
}
321
334
322
335
/*
@@ -823,7 +836,6 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
823
836
struct svc_rdma_recv_ctxt * head )
824
837
{
825
838
const struct svc_rdma_pcl * pcl = & head -> rc_read_pcl ;
826
- struct xdr_buf * buf = & rqstp -> rq_arg ;
827
839
struct svc_rdma_chunk * chunk , * next ;
828
840
unsigned int start , length ;
829
841
int ret ;
@@ -853,18 +865,7 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
853
865
854
866
start += length ;
855
867
length = head -> rc_byte_len - start ;
856
- ret = svc_rdma_copy_inline_range (rqstp , head , start , length );
857
- if (ret < 0 )
858
- return ret ;
859
-
860
- buf -> len += head -> rc_readbytes ;
861
- buf -> buflen += head -> rc_readbytes ;
862
-
863
- buf -> head [0 ].iov_base = page_address (rqstp -> rq_pages [0 ]);
864
- buf -> head [0 ].iov_len = min_t (size_t , PAGE_SIZE , head -> rc_readbytes );
865
- buf -> pages = & rqstp -> rq_pages [1 ];
866
- buf -> page_len = head -> rc_readbytes - buf -> head [0 ].iov_len ;
867
- return 0 ;
868
+ return svc_rdma_copy_inline_range (rqstp , head , start , length );
868
869
}
869
870
870
871
/**
@@ -888,42 +889,8 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
888
889
static int svc_rdma_read_data_item (struct svc_rqst * rqstp ,
889
890
struct svc_rdma_recv_ctxt * head )
890
891
{
891
- struct xdr_buf * buf = & rqstp -> rq_arg ;
892
- struct svc_rdma_chunk * chunk ;
893
- unsigned int length ;
894
- int ret ;
895
-
896
- chunk = pcl_first_chunk (& head -> rc_read_pcl );
897
- ret = svc_rdma_build_read_chunk (rqstp , head , chunk );
898
- if (ret < 0 )
899
- goto out ;
900
-
901
- /* Split the Receive buffer between the head and tail
902
- * buffers at Read chunk's position. XDR roundup of the
903
- * chunk is not included in either the pagelist or in
904
- * the tail.
905
- */
906
- buf -> tail [0 ].iov_base = buf -> head [0 ].iov_base + chunk -> ch_position ;
907
- buf -> tail [0 ].iov_len = buf -> head [0 ].iov_len - chunk -> ch_position ;
908
- buf -> head [0 ].iov_len = chunk -> ch_position ;
909
-
910
- /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
911
- *
912
- * If the client already rounded up the chunk length, the
913
- * length does not change. Otherwise, the length of the page
914
- * list is increased to include XDR round-up.
915
- *
916
- * Currently these chunks always start at page offset 0,
917
- * thus the rounded-up length never crosses a page boundary.
918
- */
919
- buf -> pages = & rqstp -> rq_pages [0 ];
920
- length = xdr_align_size (chunk -> ch_length );
921
- buf -> page_len = length ;
922
- buf -> len += length ;
923
- buf -> buflen += length ;
924
-
925
- out :
926
- return ret ;
892
+ return svc_rdma_build_read_chunk (rqstp , head ,
893
+ pcl_first_chunk (& head -> rc_read_pcl ));
927
894
}
928
895
929
896
/**
@@ -1051,23 +1018,28 @@ static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
1051
1018
static noinline int svc_rdma_read_special (struct svc_rqst * rqstp ,
1052
1019
struct svc_rdma_recv_ctxt * head )
1053
1020
{
1054
- struct xdr_buf * buf = & rqstp -> rq_arg ;
1055
- int ret ;
1056
-
1057
- ret = svc_rdma_read_call_chunk (rqstp , head );
1058
- if (ret < 0 )
1059
- goto out ;
1060
-
1061
- buf -> len += head -> rc_readbytes ;
1062
- buf -> buflen += head -> rc_readbytes ;
1021
+ return svc_rdma_read_call_chunk (rqstp , head );
1022
+ }
1063
1023
1064
- buf -> head [0 ].iov_base = page_address (rqstp -> rq_pages [0 ]);
1065
- buf -> head [0 ].iov_len = min_t (size_t , PAGE_SIZE , head -> rc_readbytes );
1066
- buf -> pages = & rqstp -> rq_pages [1 ];
1067
- buf -> page_len = head -> rc_readbytes - buf -> head [0 ].iov_len ;
1024
+ /* Pages under I/O have been copied to head->rc_pages. Ensure that
1025
+ * svc_xprt_release() does not put them when svc_rdma_recvfrom()
1026
+ * returns. This has to be done after all Read WRs are constructed
1027
+ * to properly handle a page that happens to be part of I/O on behalf
1028
+ * of two different RDMA segments.
1029
+ *
1030
+ * Note: if the subsequent post_send fails, these pages have already
1031
+ * been moved to head->rc_pages and thus will be cleaned up by
1032
+ * svc_rdma_recv_ctxt_put().
1033
+ */
1034
+ static void svc_rdma_clear_rqst_pages (struct svc_rqst * rqstp ,
1035
+ struct svc_rdma_recv_ctxt * head )
1036
+ {
1037
+ unsigned int i ;
1068
1038
1069
- out :
1070
- return ret ;
1039
+ for (i = 0 ; i < head -> rc_page_count ; i ++ ) {
1040
+ head -> rc_pages [i ] = rqstp -> rq_pages [i ];
1041
+ rqstp -> rq_pages [i ] = NULL ;
1042
+ }
1071
1043
}
1072
1044
1073
1045
/**
@@ -1113,30 +1085,11 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1113
1085
ret = svc_rdma_read_multiple_chunks (rqstp , head );
1114
1086
} else
1115
1087
ret = svc_rdma_read_special (rqstp , head );
1088
+ svc_rdma_clear_rqst_pages (rqstp , head );
1116
1089
if (ret < 0 )
1117
- goto out_err ;
1090
+ return ret ;
1118
1091
1119
1092
trace_svcrdma_post_read_chunk (& cc -> cc_cid , cc -> cc_sqecount );
1120
- init_completion (& cc -> cc_done );
1121
1093
ret = svc_rdma_post_chunk_ctxt (rdma , cc );
1122
- if (ret < 0 )
1123
- goto out_err ;
1124
-
1125
- ret = 1 ;
1126
- wait_for_completion (& cc -> cc_done );
1127
- if (cc -> cc_status != IB_WC_SUCCESS )
1128
- ret = - EIO ;
1129
-
1130
- /* rq_respages starts after the last arg page */
1131
- rqstp -> rq_respages = & rqstp -> rq_pages [head -> rc_page_count ];
1132
- rqstp -> rq_next_page = rqstp -> rq_respages + 1 ;
1133
-
1134
- /* Ensure svc_rdma_recv_ctxt_put() does not release pages
1135
- * left in @rc_pages while I/O proceeds.
1136
- */
1137
- head -> rc_page_count = 0 ;
1138
-
1139
- out_err :
1140
- svc_rdma_cc_release (rdma , cc , DMA_FROM_DEVICE );
1141
- return ret ;
1094
+ return ret < 0 ? ret : 1 ;
1142
1095
}
0 commit comments