Skip to content

Commit 4a9bfb9

Browse files
bsberndMiklos Szeredi
authored andcommitted
fuse: {io-uring} Handle teardown of ring entries
On teardown struct file_operations::uring_cmd requests need to be completed by calling io_uring_cmd_done(). Not completing all ring entries would result in busy io-uring tasks giving warning messages in intervals and unreleased struct file. Additionally the fuse connection and with that the ring can only get released when all io-uring commands are completed. Completion is done with ring entries that are a) in waiting state for new fuse requests - io_uring_cmd_done is needed b) already in userspace - io_uring_cmd_done through teardown is not needed, the request can just get released. If fuse server is still active and commits such a ring entry, fuse_uring_cmd() already checks if the connection is active and then complete the io-uring itself with -ENOTCONN. I.e. special handling is not needed. This scheme is basically represented by the ring entry state FRRS_WAIT and FRRS_USERSPACE. Entries in state: - FRRS_INIT: No action needed, do not contribute to ring->queue_refs yet - All other states: Are currently processed by other tasks, async teardown is needed and it has to wait for the two states above. It could be also solved without an async teardown task, but would require additional if conditions in hot code paths. Also in my personal opinion the code looks cleaner with async teardown. Signed-off-by: Bernd Schubert <[email protected]> Reviewed-by: Pavel Begunkov <[email protected]> # io_uring Reviewed-by: Luis Henriques <[email protected]> Signed-off-by: Miklos Szeredi <[email protected]>
1 parent c090c8a commit 4a9bfb9

File tree

3 files changed

+267
-0
lines changed

3 files changed

+267
-0
lines changed

fs/fuse/dev.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
See the file COPYING.
77
*/
88

9+
#include "dev_uring_i.h"
910
#include "fuse_i.h"
1011
#include "fuse_dev_i.h"
1112

@@ -2291,6 +2292,12 @@ void fuse_abort_conn(struct fuse_conn *fc)
22912292
spin_unlock(&fc->lock);
22922293

22932294
fuse_dev_end_requests(&to_end);
2295+
2296+
/*
2297+
* fc->lock must not be taken to avoid conflicts with io-uring
2298+
* locks
2299+
*/
2300+
fuse_uring_abort(fc);
22942301
} else {
22952302
spin_unlock(&fc->lock);
22962303
}
@@ -2302,6 +2309,8 @@ void fuse_wait_aborted(struct fuse_conn *fc)
23022309
/* matches implicit memory barrier in fuse_drop_waiting() */
23032310
smp_mb();
23042311
wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0);
2312+
2313+
fuse_uring_wait_stopped_queues(fc);
23052314
}
23062315

23072316
int fuse_dev_release(struct inode *inode, struct file *file)

fs/fuse/dev_uring.c

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,37 @@ static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req,
3535
fuse_request_end(req);
3636
}
3737

38+
/* Abort all list queued request on the given ring queue */
39+
static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
40+
{
41+
struct fuse_req *req;
42+
LIST_HEAD(req_list);
43+
44+
spin_lock(&queue->lock);
45+
list_for_each_entry(req, &queue->fuse_req_queue, list)
46+
clear_bit(FR_PENDING, &req->flags);
47+
list_splice_init(&queue->fuse_req_queue, &req_list);
48+
spin_unlock(&queue->lock);
49+
50+
/* must not hold queue lock to avoid order issues with fi->lock */
51+
fuse_dev_end_requests(&req_list);
52+
}
53+
54+
void fuse_uring_abort_end_requests(struct fuse_ring *ring)
55+
{
56+
int qid;
57+
struct fuse_ring_queue *queue;
58+
59+
for (qid = 0; qid < ring->nr_queues; qid++) {
60+
queue = READ_ONCE(ring->queues[qid]);
61+
if (!queue)
62+
continue;
63+
64+
queue->stopped = true;
65+
fuse_uring_abort_end_queue_requests(queue);
66+
}
67+
}
68+
3869
void fuse_uring_destruct(struct fuse_conn *fc)
3970
{
4071
struct fuse_ring *ring = fc->ring;
@@ -94,10 +125,13 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc)
94125
goto out_err;
95126
}
96127

128+
init_waitqueue_head(&ring->stop_waitq);
129+
97130
fc->ring = ring;
98131
ring->nr_queues = nr_queues;
99132
ring->fc = fc;
100133
ring->max_payload_sz = max_payload_size;
134+
atomic_set(&ring->queue_refs, 0);
101135

102136
spin_unlock(&fc->lock);
103137
return ring;
@@ -154,6 +188,175 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
154188
return queue;
155189
}
156190

191+
static void fuse_uring_stop_fuse_req_end(struct fuse_req *req)
192+
{
193+
clear_bit(FR_SENT, &req->flags);
194+
req->out.h.error = -ECONNABORTED;
195+
fuse_request_end(req);
196+
}
197+
198+
/*
199+
* Release a request/entry on connection tear down
200+
*/
201+
static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent)
202+
{
203+
struct fuse_req *req;
204+
struct io_uring_cmd *cmd;
205+
206+
struct fuse_ring_queue *queue = ent->queue;
207+
208+
spin_lock(&queue->lock);
209+
cmd = ent->cmd;
210+
ent->cmd = NULL;
211+
req = ent->fuse_req;
212+
ent->fuse_req = NULL;
213+
if (req) {
214+
/* remove entry from queue->fpq->processing */
215+
list_del_init(&req->list);
216+
}
217+
spin_unlock(&queue->lock);
218+
219+
if (cmd)
220+
io_uring_cmd_done(cmd, -ENOTCONN, 0, IO_URING_F_UNLOCKED);
221+
222+
if (req)
223+
fuse_uring_stop_fuse_req_end(req);
224+
225+
list_del_init(&ent->list);
226+
kfree(ent);
227+
}
228+
229+
static void fuse_uring_stop_list_entries(struct list_head *head,
230+
struct fuse_ring_queue *queue,
231+
enum fuse_ring_req_state exp_state)
232+
{
233+
struct fuse_ring *ring = queue->ring;
234+
struct fuse_ring_ent *ent, *next;
235+
ssize_t queue_refs = SSIZE_MAX;
236+
LIST_HEAD(to_teardown);
237+
238+
spin_lock(&queue->lock);
239+
list_for_each_entry_safe(ent, next, head, list) {
240+
if (ent->state != exp_state) {
241+
pr_warn("entry teardown qid=%d state=%d expected=%d",
242+
queue->qid, ent->state, exp_state);
243+
continue;
244+
}
245+
246+
list_move(&ent->list, &to_teardown);
247+
}
248+
spin_unlock(&queue->lock);
249+
250+
/* no queue lock to avoid lock order issues */
251+
list_for_each_entry_safe(ent, next, &to_teardown, list) {
252+
fuse_uring_entry_teardown(ent);
253+
queue_refs = atomic_dec_return(&ring->queue_refs);
254+
WARN_ON_ONCE(queue_refs < 0);
255+
}
256+
}
257+
258+
static void fuse_uring_teardown_entries(struct fuse_ring_queue *queue)
259+
{
260+
fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue,
261+
FRRS_USERSPACE);
262+
fuse_uring_stop_list_entries(&queue->ent_avail_queue, queue,
263+
FRRS_AVAILABLE);
264+
}
265+
266+
/*
267+
* Log state debug info
268+
*/
269+
static void fuse_uring_log_ent_state(struct fuse_ring *ring)
270+
{
271+
int qid;
272+
struct fuse_ring_ent *ent;
273+
274+
for (qid = 0; qid < ring->nr_queues; qid++) {
275+
struct fuse_ring_queue *queue = ring->queues[qid];
276+
277+
if (!queue)
278+
continue;
279+
280+
spin_lock(&queue->lock);
281+
/*
282+
* Log entries from the intermediate queue, the other queues
283+
* should be empty
284+
*/
285+
list_for_each_entry(ent, &queue->ent_w_req_queue, list) {
286+
pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n",
287+
ring, qid, ent, ent->state);
288+
}
289+
list_for_each_entry(ent, &queue->ent_commit_queue, list) {
290+
pr_info(" ent-commit-queue ring=%p qid=%d ent=%p state=%d\n",
291+
ring, qid, ent, ent->state);
292+
}
293+
spin_unlock(&queue->lock);
294+
}
295+
ring->stop_debug_log = 1;
296+
}
297+
298+
static void fuse_uring_async_stop_queues(struct work_struct *work)
299+
{
300+
int qid;
301+
struct fuse_ring *ring =
302+
container_of(work, struct fuse_ring, async_teardown_work.work);
303+
304+
/* XXX code dup */
305+
for (qid = 0; qid < ring->nr_queues; qid++) {
306+
struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]);
307+
308+
if (!queue)
309+
continue;
310+
311+
fuse_uring_teardown_entries(queue);
312+
}
313+
314+
/*
315+
* Some ring entries might be in the middle of IO operations,
316+
* i.e. in process to get handled by file_operations::uring_cmd
317+
* or on the way to userspace - we could handle that with conditions in
318+
* run time code, but easier/cleaner to have an async tear down handler
319+
* If there are still queue references left
320+
*/
321+
if (atomic_read(&ring->queue_refs) > 0) {
322+
if (time_after(jiffies,
323+
ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT))
324+
fuse_uring_log_ent_state(ring);
325+
326+
schedule_delayed_work(&ring->async_teardown_work,
327+
FUSE_URING_TEARDOWN_INTERVAL);
328+
} else {
329+
wake_up_all(&ring->stop_waitq);
330+
}
331+
}
332+
333+
/*
334+
* Stop the ring queues
335+
*/
336+
void fuse_uring_stop_queues(struct fuse_ring *ring)
337+
{
338+
int qid;
339+
340+
for (qid = 0; qid < ring->nr_queues; qid++) {
341+
struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]);
342+
343+
if (!queue)
344+
continue;
345+
346+
fuse_uring_teardown_entries(queue);
347+
}
348+
349+
if (atomic_read(&ring->queue_refs) > 0) {
350+
ring->teardown_time = jiffies;
351+
INIT_DELAYED_WORK(&ring->async_teardown_work,
352+
fuse_uring_async_stop_queues);
353+
schedule_delayed_work(&ring->async_teardown_work,
354+
FUSE_URING_TEARDOWN_INTERVAL);
355+
} else {
356+
wake_up_all(&ring->stop_waitq);
357+
}
358+
}
359+
157360
/*
158361
* Checks for errors and stores it into the request
159362
*/
@@ -525,6 +728,9 @@ static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags,
525728
return err;
526729
fpq = &queue->fpq;
527730

731+
if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped))
732+
return err;
733+
528734
spin_lock(&queue->lock);
529735
/* Find a request based on the unique ID of the fuse request
530736
* This should get revised, as it needs a hash calculation and list
@@ -652,6 +858,7 @@ fuse_uring_create_ring_ent(struct io_uring_cmd *cmd,
652858
ent->headers = iov[0].iov_base;
653859
ent->payload = iov[1].iov_base;
654860

861+
atomic_inc(&ring->queue_refs);
655862
return ent;
656863
}
657864

fs/fuse/dev_uring_i.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111

1212
#ifdef CONFIG_FUSE_IO_URING
1313

14+
#define FUSE_URING_TEARDOWN_TIMEOUT (5 * HZ)
15+
#define FUSE_URING_TEARDOWN_INTERVAL (HZ/20)
16+
1417
enum fuse_ring_req_state {
1518
FRRS_INVALID = 0,
1619

@@ -80,6 +83,8 @@ struct fuse_ring_queue {
8083
struct list_head fuse_req_queue;
8184

8285
struct fuse_pqueue fpq;
86+
87+
bool stopped;
8388
};
8489

8590
/**
@@ -97,12 +102,51 @@ struct fuse_ring {
97102
size_t max_payload_sz;
98103

99104
struct fuse_ring_queue **queues;
105+
106+
/*
107+
* Log ring entry states on stop when entries cannot be released
108+
*/
109+
unsigned int stop_debug_log : 1;
110+
111+
wait_queue_head_t stop_waitq;
112+
113+
/* async tear down */
114+
struct delayed_work async_teardown_work;
115+
116+
/* log */
117+
unsigned long teardown_time;
118+
119+
atomic_t queue_refs;
100120
};
101121

102122
bool fuse_uring_enabled(void);
103123
void fuse_uring_destruct(struct fuse_conn *fc);
124+
void fuse_uring_stop_queues(struct fuse_ring *ring);
125+
void fuse_uring_abort_end_requests(struct fuse_ring *ring);
104126
int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
105127

128+
static inline void fuse_uring_abort(struct fuse_conn *fc)
129+
{
130+
struct fuse_ring *ring = fc->ring;
131+
132+
if (ring == NULL)
133+
return;
134+
135+
if (atomic_read(&ring->queue_refs) > 0) {
136+
fuse_uring_abort_end_requests(ring);
137+
fuse_uring_stop_queues(ring);
138+
}
139+
}
140+
141+
static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
142+
{
143+
struct fuse_ring *ring = fc->ring;
144+
145+
if (ring)
146+
wait_event(ring->stop_waitq,
147+
atomic_read(&ring->queue_refs) == 0);
148+
}
149+
106150
#else /* CONFIG_FUSE_IO_URING */
107151

108152
struct fuse_ring;
@@ -120,6 +164,13 @@ static inline bool fuse_uring_enabled(void)
120164
return false;
121165
}
122166

167+
static inline void fuse_uring_abort(struct fuse_conn *fc)
168+
{
169+
}
170+
171+
static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
172+
{
173+
}
123174
#endif /* CONFIG_FUSE_IO_URING */
124175

125176
#endif /* _FS_FUSE_DEV_URING_I_H */

0 commit comments

Comments
 (0)