Skip to content

Commit a77a523

Browse files
Bob Pearsonjgunthorpe
authored andcommitted
RDMA/rxe: Fix missing memory barriers in rxe_queue.h
An earlier patch which introduced smp_load_acquire/smp_store_release into rxe_queue.h incorrectly assumed that surrounding spin-locks in rxe_verbs.c around queue updates for kernel ulps was sufficient to protect the passing of data through the queues between the ulp and the rxe tasklets. But this was incorrect. The typical sequence was ulp rxe requester tasklet ------------------------ --------------------- spin_lock_irqsave() wqe = queue_head(queue) if (!queue_full(q)) { if (!wqe) spin_unlock_irqrestore return; return -ENOMEM } <process wqe> wqe = queue_producer_addr(q) <fill in wqe> queue_advance_consumer(queue) queue_advance_producer(q) spin_unlock_irqrestore() queue_head() calls queue_empty() which calls smp_load_acquire() For user space apps queue_advance_producer() calls smp_store_release() so that there is a memory barrier between the producer and the consumer but for kernel ulps queue_advance_produce() just incremented the producer index because the lock function is a release function. But to work the barrier has to come between filling in the wqe and updating the producer index. This patch adds the missing barriers. It also changes the enum names for the ulp queue types to QUEUE_TYPE_FROM/TO_ULP instead of QUEUE_TYPE_TO/FROM_DRIVER which is very ambiguous. This bug is suspected as the cause of very rare lockups in a very high scale storage application. It is a bug in any case and should be corrected. Fixes: 0a67c46 ("RDMA/rxe: Protect user space index loads/stores") Link: https://lore.kernel.org/r/[email protected] Signed-off-by: Bob Pearson <[email protected]> Signed-off-by: Jason Gunthorpe <[email protected]>
1 parent 89d42b8 commit a77a523

File tree

2 files changed

+76
-52
lines changed

2 files changed

+76
-52
lines changed

drivers/infiniband/sw/rxe/rxe_queue.h

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,26 @@
3535
/**
3636
* enum queue_type - type of queue
3737
* @QUEUE_TYPE_TO_CLIENT: Queue is written by rxe driver and
38-
* read by client. Used by rxe driver only.
38+
* read by client which may be a user space
39+
* application or a kernel ulp.
40+
* Used by rxe internals only.
3941
* @QUEUE_TYPE_FROM_CLIENT: Queue is written by client and
40-
* read by rxe driver. Used by rxe driver only.
41-
* @QUEUE_TYPE_TO_DRIVER: Queue is written by client and
42-
* read by rxe driver. Used by kernel client only.
43-
* @QUEUE_TYPE_FROM_DRIVER: Queue is written by rxe driver and
44-
* read by client. Used by kernel client only.
42+
* read by rxe driver.
43+
* Used by rxe internals only.
44+
* @QUEUE_TYPE_FROM_ULP: Queue is written by kernel ulp and
45+
* read by rxe driver.
46+
* Used by kernel verbs APIs only on
47+
* behalf of ulps.
48+
* @QUEUE_TYPE_TO_ULP: Queue is written by rxe driver and
49+
* read by kernel ulp.
50+
* Used by kernel verbs APIs only on
51+
* behalf of ulps.
4552
*/
4653
enum queue_type {
4754
QUEUE_TYPE_TO_CLIENT,
4855
QUEUE_TYPE_FROM_CLIENT,
49-
QUEUE_TYPE_TO_DRIVER,
50-
QUEUE_TYPE_FROM_DRIVER,
56+
QUEUE_TYPE_FROM_ULP,
57+
QUEUE_TYPE_TO_ULP,
5158
};
5259

5360
struct rxe_queue_buf;
@@ -62,9 +69,9 @@ struct rxe_queue {
6269
u32 index_mask;
6370
enum queue_type type;
6471
/* private copy of index for shared queues between
65-
* kernel space and user space. Kernel reads and writes
72+
* driver and clients. Driver reads and writes
6673
* this copy and then replicates to rxe_queue_buf
67-
* for read access by user space.
74+
* for read access by clients.
6875
*/
6976
u32 index;
7077
};
@@ -97,19 +104,21 @@ static inline u32 queue_get_producer(const struct rxe_queue *q,
97104

98105
switch (type) {
99106
case QUEUE_TYPE_FROM_CLIENT:
100-
/* protect user index */
107+
/* used by rxe, client owns the index */
101108
prod = smp_load_acquire(&q->buf->producer_index);
102109
break;
103110
case QUEUE_TYPE_TO_CLIENT:
111+
/* used by rxe which owns the index */
104112
prod = q->index;
105113
break;
106-
case QUEUE_TYPE_FROM_DRIVER:
107-
/* protect driver index */
108-
prod = smp_load_acquire(&q->buf->producer_index);
109-
break;
110-
case QUEUE_TYPE_TO_DRIVER:
114+
case QUEUE_TYPE_FROM_ULP:
115+
/* used by ulp which owns the index */
111116
prod = q->buf->producer_index;
112117
break;
118+
case QUEUE_TYPE_TO_ULP:
119+
/* used by ulp, rxe owns the index */
120+
prod = smp_load_acquire(&q->buf->producer_index);
121+
break;
113122
}
114123

115124
return prod;
@@ -122,19 +131,21 @@ static inline u32 queue_get_consumer(const struct rxe_queue *q,
122131

123132
switch (type) {
124133
case QUEUE_TYPE_FROM_CLIENT:
134+
/* used by rxe which owns the index */
125135
cons = q->index;
126136
break;
127137
case QUEUE_TYPE_TO_CLIENT:
128-
/* protect user index */
138+
/* used by rxe, client owns the index */
129139
cons = smp_load_acquire(&q->buf->consumer_index);
130140
break;
131-
case QUEUE_TYPE_FROM_DRIVER:
132-
cons = q->buf->consumer_index;
133-
break;
134-
case QUEUE_TYPE_TO_DRIVER:
135-
/* protect driver index */
141+
case QUEUE_TYPE_FROM_ULP:
142+
/* used by ulp, rxe owns the index */
136143
cons = smp_load_acquire(&q->buf->consumer_index);
137144
break;
145+
case QUEUE_TYPE_TO_ULP:
146+
/* used by ulp which owns the index */
147+
cons = q->buf->consumer_index;
148+
break;
138149
}
139150

140151
return cons;
@@ -172,24 +183,31 @@ static inline void queue_advance_producer(struct rxe_queue *q,
172183

173184
switch (type) {
174185
case QUEUE_TYPE_FROM_CLIENT:
175-
pr_warn("%s: attempt to advance client index\n",
176-
__func__);
186+
/* used by rxe, client owns the index */
187+
if (WARN_ON(1))
188+
pr_warn("%s: attempt to advance client index\n",
189+
__func__);
177190
break;
178191
case QUEUE_TYPE_TO_CLIENT:
192+
/* used by rxe which owns the index */
179193
prod = q->index;
180194
prod = (prod + 1) & q->index_mask;
181195
q->index = prod;
182-
/* protect user index */
196+
/* release so client can read it safely */
183197
smp_store_release(&q->buf->producer_index, prod);
184198
break;
185-
case QUEUE_TYPE_FROM_DRIVER:
186-
pr_warn("%s: attempt to advance driver index\n",
187-
__func__);
188-
break;
189-
case QUEUE_TYPE_TO_DRIVER:
199+
case QUEUE_TYPE_FROM_ULP:
200+
/* used by ulp which owns the index */
190201
prod = q->buf->producer_index;
191202
prod = (prod + 1) & q->index_mask;
192-
q->buf->producer_index = prod;
203+
/* release so rxe can read it safely */
204+
smp_store_release(&q->buf->producer_index, prod);
205+
break;
206+
case QUEUE_TYPE_TO_ULP:
207+
/* used by ulp, rxe owns the index */
208+
if (WARN_ON(1))
209+
pr_warn("%s: attempt to advance driver index\n",
210+
__func__);
193211
break;
194212
}
195213
}
@@ -201,24 +219,30 @@ static inline void queue_advance_consumer(struct rxe_queue *q,
201219

202220
switch (type) {
203221
case QUEUE_TYPE_FROM_CLIENT:
204-
cons = q->index;
205-
cons = (cons + 1) & q->index_mask;
222+
/* used by rxe which owns the index */
223+
cons = (q->index + 1) & q->index_mask;
206224
q->index = cons;
207-
/* protect user index */
225+
/* release so client can read it safely */
208226
smp_store_release(&q->buf->consumer_index, cons);
209227
break;
210228
case QUEUE_TYPE_TO_CLIENT:
211-
pr_warn("%s: attempt to advance client index\n",
212-
__func__);
229+
/* used by rxe, client owns the index */
230+
if (WARN_ON(1))
231+
pr_warn("%s: attempt to advance client index\n",
232+
__func__);
233+
break;
234+
case QUEUE_TYPE_FROM_ULP:
235+
/* used by ulp, rxe owns the index */
236+
if (WARN_ON(1))
237+
pr_warn("%s: attempt to advance driver index\n",
238+
__func__);
213239
break;
214-
case QUEUE_TYPE_FROM_DRIVER:
240+
case QUEUE_TYPE_TO_ULP:
241+
/* used by ulp which owns the index */
215242
cons = q->buf->consumer_index;
216243
cons = (cons + 1) & q->index_mask;
217-
q->buf->consumer_index = cons;
218-
break;
219-
case QUEUE_TYPE_TO_DRIVER:
220-
pr_warn("%s: attempt to advance driver index\n",
221-
__func__);
244+
/* release so rxe can read it safely */
245+
smp_store_release(&q->buf->consumer_index, cons);
222246
break;
223247
}
224248
}

drivers/infiniband/sw/rxe/rxe_verbs.c

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ static int post_one_recv(struct rxe_rq *rq, const struct ib_recv_wr *ibwr)
245245
int num_sge = ibwr->num_sge;
246246
int full;
247247

248-
full = queue_full(rq->queue, QUEUE_TYPE_TO_DRIVER);
248+
full = queue_full(rq->queue, QUEUE_TYPE_FROM_ULP);
249249
if (unlikely(full))
250250
return -ENOMEM;
251251

@@ -256,7 +256,7 @@ static int post_one_recv(struct rxe_rq *rq, const struct ib_recv_wr *ibwr)
256256
for (i = 0; i < num_sge; i++)
257257
length += ibwr->sg_list[i].length;
258258

259-
recv_wqe = queue_producer_addr(rq->queue, QUEUE_TYPE_TO_DRIVER);
259+
recv_wqe = queue_producer_addr(rq->queue, QUEUE_TYPE_FROM_ULP);
260260
recv_wqe->wr_id = ibwr->wr_id;
261261

262262
memcpy(recv_wqe->dma.sge, ibwr->sg_list,
@@ -268,7 +268,7 @@ static int post_one_recv(struct rxe_rq *rq, const struct ib_recv_wr *ibwr)
268268
recv_wqe->dma.cur_sge = 0;
269269
recv_wqe->dma.sge_offset = 0;
270270

271-
queue_advance_producer(rq->queue, QUEUE_TYPE_TO_DRIVER);
271+
queue_advance_producer(rq->queue, QUEUE_TYPE_FROM_ULP);
272272

273273
return 0;
274274
}
@@ -623,17 +623,17 @@ static int post_one_send(struct rxe_qp *qp, const struct ib_send_wr *ibwr,
623623

624624
spin_lock_irqsave(&qp->sq.sq_lock, flags);
625625

626-
full = queue_full(sq->queue, QUEUE_TYPE_TO_DRIVER);
626+
full = queue_full(sq->queue, QUEUE_TYPE_FROM_ULP);
627627

628628
if (unlikely(full)) {
629629
spin_unlock_irqrestore(&qp->sq.sq_lock, flags);
630630
return -ENOMEM;
631631
}
632632

633-
send_wqe = queue_producer_addr(sq->queue, QUEUE_TYPE_TO_DRIVER);
633+
send_wqe = queue_producer_addr(sq->queue, QUEUE_TYPE_FROM_ULP);
634634
init_send_wqe(qp, ibwr, mask, length, send_wqe);
635635

636-
queue_advance_producer(sq->queue, QUEUE_TYPE_TO_DRIVER);
636+
queue_advance_producer(sq->queue, QUEUE_TYPE_FROM_ULP);
637637

638638
spin_unlock_irqrestore(&qp->sq.sq_lock, flags);
639639

@@ -821,12 +821,12 @@ static int rxe_poll_cq(struct ib_cq *ibcq, int num_entries, struct ib_wc *wc)
821821

822822
spin_lock_irqsave(&cq->cq_lock, flags);
823823
for (i = 0; i < num_entries; i++) {
824-
cqe = queue_head(cq->queue, QUEUE_TYPE_FROM_DRIVER);
824+
cqe = queue_head(cq->queue, QUEUE_TYPE_TO_ULP);
825825
if (!cqe)
826826
break;
827827

828828
memcpy(wc++, &cqe->ibwc, sizeof(*wc));
829-
queue_advance_consumer(cq->queue, QUEUE_TYPE_FROM_DRIVER);
829+
queue_advance_consumer(cq->queue, QUEUE_TYPE_TO_ULP);
830830
}
831831
spin_unlock_irqrestore(&cq->cq_lock, flags);
832832

@@ -838,7 +838,7 @@ static int rxe_peek_cq(struct ib_cq *ibcq, int wc_cnt)
838838
struct rxe_cq *cq = to_rcq(ibcq);
839839
int count;
840840

841-
count = queue_count(cq->queue, QUEUE_TYPE_FROM_DRIVER);
841+
count = queue_count(cq->queue, QUEUE_TYPE_TO_ULP);
842842

843843
return (count > wc_cnt) ? wc_cnt : count;
844844
}
@@ -854,7 +854,7 @@ static int rxe_req_notify_cq(struct ib_cq *ibcq, enum ib_cq_notify_flags flags)
854854
if (cq->notify != IB_CQ_NEXT_COMP)
855855
cq->notify = flags & IB_CQ_SOLICITED_MASK;
856856

857-
empty = queue_empty(cq->queue, QUEUE_TYPE_FROM_DRIVER);
857+
empty = queue_empty(cq->queue, QUEUE_TYPE_TO_ULP);
858858

859859
if ((flags & IB_CQ_REPORT_MISSED_EVENTS) && !empty)
860860
ret = 1;

0 commit comments

Comments
 (0)