Skip to content

Commit 68bde2a

Browse files
Alexander Aringteigland
authored andcommitted
dlm: implement LSFL_SOFTIRQ_SAFE
When a lockspace user allows it, run callback functions directly from softirq context, instead of queueing callbacks to be run from the dlm_callback workqueue context. Signed-off-by: Alexander Aring <[email protected]> Signed-off-by: David Teigland <[email protected]>
1 parent f328a26 commit 68bde2a

File tree

5 files changed

+126
-84
lines changed

5 files changed

+126
-84
lines changed

fs/dlm/ast.c

Lines changed: 101 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,52 @@
1818
#include "user.h"
1919
#include "ast.h"
2020

21-
static void dlm_callback_work(struct work_struct *work)
21+
static void dlm_run_callback(uint32_t ls_id, uint32_t lkb_id, int8_t mode,
22+
uint32_t flags, uint8_t sb_flags, int sb_status,
23+
struct dlm_lksb *lksb,
24+
void (*astfn)(void *astparam),
25+
void (*bastfn)(void *astparam, int mode),
26+
void *astparam, const char *res_name,
27+
size_t res_length)
2228
{
23-
struct dlm_callback *cb = container_of(work, struct dlm_callback, work);
24-
25-
if (cb->flags & DLM_CB_BAST) {
26-
trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
27-
cb->res_length);
28-
cb->bastfn(cb->astparam, cb->mode);
29-
} else if (cb->flags & DLM_CB_CAST) {
30-
trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
31-
cb->sb_flags, cb->res_name, cb->res_length);
32-
cb->lkb_lksb->sb_status = cb->sb_status;
33-
cb->lkb_lksb->sb_flags = cb->sb_flags;
34-
cb->astfn(cb->astparam);
29+
if (flags & DLM_CB_BAST) {
30+
trace_dlm_bast(ls_id, lkb_id, mode, res_name, res_length);
31+
bastfn(astparam, mode);
32+
} else if (flags & DLM_CB_CAST) {
33+
trace_dlm_ast(ls_id, lkb_id, sb_status, sb_flags, res_name,
34+
res_length);
35+
lksb->sb_status = sb_status;
36+
lksb->sb_flags = sb_flags;
37+
astfn(astparam);
3538
}
39+
}
3640

41+
static void dlm_do_callback(struct dlm_callback *cb)
42+
{
43+
dlm_run_callback(cb->ls_id, cb->lkb_id, cb->mode, cb->flags,
44+
cb->sb_flags, cb->sb_status, cb->lkb_lksb,
45+
cb->astfn, cb->bastfn, cb->astparam,
46+
cb->res_name, cb->res_length);
3747
dlm_free_cb(cb);
3848
}
3949

40-
int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
41-
int status, uint32_t sbflags,
42-
struct dlm_callback **cb)
50+
static void dlm_callback_work(struct work_struct *work)
51+
{
52+
struct dlm_callback *cb = container_of(work, struct dlm_callback, work);
53+
54+
dlm_do_callback(cb);
55+
}
56+
57+
bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
58+
int status, uint32_t sbflags, int *copy_lvb)
4359
{
4460
struct dlm_rsb *rsb = lkb->lkb_resource;
45-
int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
4661
struct dlm_ls *ls = rsb->res_ls;
47-
int copy_lvb = 0;
4862
int prev_mode;
4963

64+
if (copy_lvb)
65+
*copy_lvb = 0;
66+
5067
if (flags & DLM_CB_BAST) {
5168
/* if cb is a bast, it should be skipped if the blocking mode is
5269
* compatible with the last granted mode
@@ -56,7 +73,7 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
5673
log_debug(ls, "skip %x bast mode %d for cast mode %d",
5774
lkb->lkb_id, mode,
5875
lkb->lkb_last_cast_cb_mode);
59-
goto out;
76+
return true;
6077
}
6178
}
6279

@@ -74,7 +91,7 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
7491
(prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
7592
log_debug(ls, "skip %x add bast mode %d for bast mode %d",
7693
lkb->lkb_id, mode, prev_mode);
77-
goto out;
94+
return true;
7895
}
7996
}
8097

@@ -85,8 +102,10 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
85102
prev_mode = lkb->lkb_last_cast_cb_mode;
86103

87104
if (!status && lkb->lkb_lksb->sb_lvbptr &&
88-
dlm_lvb_operations[prev_mode + 1][mode + 1])
89-
copy_lvb = 1;
105+
dlm_lvb_operations[prev_mode + 1][mode + 1]) {
106+
if (copy_lvb)
107+
*copy_lvb = 1;
108+
}
90109
}
91110

92111
lkb->lkb_last_cast_cb_mode = mode;
@@ -96,11 +115,19 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
96115
lkb->lkb_last_cb_mode = mode;
97116
lkb->lkb_last_cb_flags = flags;
98117

118+
return false;
119+
}
120+
121+
int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode,
122+
int status, uint32_t sbflags,
123+
struct dlm_callback **cb)
124+
{
125+
struct dlm_rsb *rsb = lkb->lkb_resource;
126+
struct dlm_ls *ls = rsb->res_ls;
127+
99128
*cb = dlm_allocate_cb();
100-
if (!*cb) {
101-
rv = DLM_ENQUEUE_CALLBACK_FAILURE;
102-
goto out;
103-
}
129+
if (WARN_ON_ONCE(!*cb))
130+
return -ENOMEM;
104131

105132
/* for tracing */
106133
(*cb)->lkb_id = lkb->lkb_id;
@@ -112,19 +139,34 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
112139
(*cb)->mode = mode;
113140
(*cb)->sb_status = status;
114141
(*cb)->sb_flags = (sbflags & 0x000000FF);
115-
(*cb)->copy_lvb = copy_lvb;
116142
(*cb)->lkb_lksb = lkb->lkb_lksb;
117143

118-
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
144+
return 0;
145+
}
146+
147+
static int dlm_get_queue_cb(struct dlm_lkb *lkb, uint32_t flags, int mode,
148+
int status, uint32_t sbflags,
149+
struct dlm_callback **cb)
150+
{
151+
int rv;
119152

120-
out:
121-
return rv;
153+
rv = dlm_get_cb(lkb, flags, mode, status, sbflags, cb);
154+
if (rv)
155+
return rv;
156+
157+
(*cb)->astfn = lkb->lkb_astfn;
158+
(*cb)->bastfn = lkb->lkb_bastfn;
159+
(*cb)->astparam = lkb->lkb_astparam;
160+
INIT_WORK(&(*cb)->work, dlm_callback_work);
161+
162+
return 0;
122163
}
123164

124165
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
125-
uint32_t sbflags)
166+
uint32_t sbflags)
126167
{
127-
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
168+
struct dlm_rsb *rsb = lkb->lkb_resource;
169+
struct dlm_ls *ls = rsb->res_ls;
128170
struct dlm_callback *cb;
129171
int rv;
130172

@@ -133,35 +175,34 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
133175
return;
134176
}
135177

136-
rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags,
137-
&cb);
138-
switch (rv) {
139-
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
140-
cb->astfn = lkb->lkb_astfn;
141-
cb->bastfn = lkb->lkb_bastfn;
142-
cb->astparam = lkb->lkb_astparam;
143-
INIT_WORK(&cb->work, dlm_callback_work);
144-
145-
spin_lock_bh(&ls->ls_cb_lock);
146-
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags))
178+
if (dlm_may_skip_callback(lkb, flags, mode, status, sbflags, NULL))
179+
return;
180+
181+
spin_lock_bh(&ls->ls_cb_lock);
182+
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
183+
rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb);
184+
if (!rv)
147185
list_add(&cb->list, &ls->ls_cb_delay);
148-
else
149-
queue_work(ls->ls_callback_wq, &cb->work);
150-
spin_unlock_bh(&ls->ls_cb_lock);
151-
break;
152-
case DLM_ENQUEUE_CALLBACK_SUCCESS:
153-
break;
154-
case DLM_ENQUEUE_CALLBACK_FAILURE:
155-
fallthrough;
156-
default:
157-
WARN_ON_ONCE(1);
158-
break;
186+
} else {
187+
if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) {
188+
dlm_run_callback(ls->ls_global_id, lkb->lkb_id, mode, flags,
189+
sbflags, status, lkb->lkb_lksb,
190+
lkb->lkb_astfn, lkb->lkb_bastfn,
191+
lkb->lkb_astparam, rsb->res_name,
192+
rsb->res_length);
193+
} else {
194+
rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb);
195+
if (!rv)
196+
queue_work(ls->ls_callback_wq, &cb->work);
197+
}
159198
}
199+
spin_unlock_bh(&ls->ls_cb_lock);
160200
}
161201

162202
int dlm_callback_start(struct dlm_ls *ls)
163203
{
164-
if (!test_bit(LSFL_FS, &ls->ls_flags))
204+
if (!test_bit(LSFL_FS, &ls->ls_flags) ||
205+
test_bit(LSFL_SOFTIRQ, &ls->ls_flags))
165206
return 0;
166207

167208
ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback",
@@ -207,7 +248,11 @@ void dlm_callback_resume(struct dlm_ls *ls)
207248
spin_lock_bh(&ls->ls_cb_lock);
208249
list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
209250
list_del(&cb->list);
210-
queue_work(ls->ls_callback_wq, &cb->work);
251+
if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags))
252+
dlm_do_callback(cb);
253+
else
254+
queue_work(ls->ls_callback_wq, &cb->work);
255+
211256
count++;
212257
if (count == MAX_CB_QUEUE)
213258
break;

fs/dlm/ast.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@
1111
#ifndef __ASTD_DOT_H__
1212
#define __ASTD_DOT_H__
1313

14-
#define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1
15-
#define DLM_ENQUEUE_CALLBACK_SUCCESS 0
16-
#define DLM_ENQUEUE_CALLBACK_FAILURE -1
17-
int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
18-
int status, uint32_t sbflags,
19-
struct dlm_callback **cb);
14+
bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
15+
int status, uint32_t sbflags, int *copy_lvb);
16+
int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode,
17+
int status, uint32_t sbflags,
18+
struct dlm_callback **cb);
2019
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
2120
uint32_t sbflags);
2221

fs/dlm/dlm_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,7 @@ struct dlm_ls {
699699
#define LSFL_NODIR 10
700700
#define LSFL_RECV_MSG_BLOCKED 11
701701
#define LSFL_FS 12
702+
#define LSFL_SOFTIRQ 13
702703

703704
#define DLM_PROC_FLAGS_CLOSING 1
704705
#define DLM_PROC_FLAGS_COMPAT 2

fs/dlm/lockspace.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,9 @@ static int new_lockspace(const char *name, const char *cluster,
407407
ls->ls_ops_arg = ops_arg;
408408
}
409409

410+
if (flags & DLM_LSFL_SOFTIRQ)
411+
set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
412+
410413
/* ls_exflags are forced to match among nodes, and we don't
411414
* need to require all nodes to have some flags set
412415
*/

fs/dlm/user.c

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
182182
struct dlm_user_args *ua;
183183
struct dlm_user_proc *proc;
184184
struct dlm_callback *cb;
185-
int rv;
185+
int rv, copy_lvb;
186186

187187
if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
188188
test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags))
@@ -213,28 +213,22 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
213213

214214
spin_lock_bh(&proc->asts_spin);
215215

216-
rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb);
217-
switch (rv) {
218-
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
219-
cb->ua = *ua;
220-
cb->lkb_lksb = &cb->ua.lksb;
221-
if (cb->copy_lvb) {
222-
memcpy(cb->lvbptr, ua->lksb.sb_lvbptr,
223-
DLM_USER_LVB_LEN);
224-
cb->lkb_lksb->sb_lvbptr = cb->lvbptr;
216+
if (!dlm_may_skip_callback(lkb, flags, mode, status, sbflags,
217+
&copy_lvb)) {
218+
rv = dlm_get_cb(lkb, flags, mode, status, sbflags, &cb);
219+
if (!rv) {
220+
cb->copy_lvb = copy_lvb;
221+
cb->ua = *ua;
222+
cb->lkb_lksb = &cb->ua.lksb;
223+
if (copy_lvb) {
224+
memcpy(cb->lvbptr, ua->lksb.sb_lvbptr,
225+
DLM_USER_LVB_LEN);
226+
cb->lkb_lksb->sb_lvbptr = cb->lvbptr;
227+
}
228+
229+
list_add_tail(&cb->list, &proc->asts);
230+
wake_up_interruptible(&proc->wait);
225231
}
226-
227-
list_add_tail(&cb->list, &proc->asts);
228-
wake_up_interruptible(&proc->wait);
229-
break;
230-
case DLM_ENQUEUE_CALLBACK_SUCCESS:
231-
break;
232-
case DLM_ENQUEUE_CALLBACK_FAILURE:
233-
fallthrough;
234-
default:
235-
spin_unlock_bh(&proc->asts_spin);
236-
WARN_ON_ONCE(1);
237-
goto out;
238232
}
239233
spin_unlock_bh(&proc->asts_spin);
240234

0 commit comments

Comments
 (0)