Skip to content

Commit 6fffab6

Browse files
committed
Merge tag 'dlm-6.10' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
Pull dlm updates from David Teigland: "This set includes some small fixes, and some big internal changes: - Fix a long standing race between the unlock callback for the last lkb struct, and removing the rsb that became unused after the final unlock. This could lead different nodes to inconsistent info about the rsb master node. - Remove unnecessary refcounting on callback structs, returning to the way things were done in the past. - Do message processing in softirq context. This allows dlm messages to be cleared more quickly and efficiently, reducing long lists of incomplete requests. A future change to run callbacks directly from this context will make this more effective. - The softirq message processing involved a number of patches changing mutexes to spinlocks and rwlocks, and a fair amount of code re-org in preparation. - Use an rhashtable for rsb structs, rather than our old internal hash table implementation. This also required some re-org of lists and locks preparation for the change. - Drop the dlm_scand kthread, and use timers to clear unused rsb structs. Scanning all rsb's periodically was a lot of wasted work. - Fix recent regression in logic for copying LVB data in user space lock requests" * tag 'dlm-6.10' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: (34 commits) dlm: return -ENOMEM if ls_recover_buf fails dlm: fix sleep in atomic context dlm: use rwlock for lkbidr dlm: use rwlock for rsb hash table dlm: drop dlm_scand kthread and use timers dlm: do not use ref counts for rsb in the toss state dlm: switch to use rhashtable for rsbs dlm: add rsb lists for iteration dlm: merge toss and keep hash table lists into one list dlm: change to single hashtable lock dlm: increment ls_count for dlm_scand dlm: do message processing in softirq context dlm: use spin_lock_bh for message processing dlm: remove schedule in receive path dlm: convert ls_recv_active from rw_semaphore to rwlock dlm: avoid blocking receive at the end of recovery dlm: convert res_lock to spinlock dlm: convert ls_waiters_mutex to spinlock dlm: drop mutex use in waiters recovery dlm: add new struct to save position in dlm_copy_master_names ...
2 parents a3d1f54 + 7b72ab2 commit 6fffab6

25 files changed

+1382
-1509
lines changed

fs/dlm/ast.c

Lines changed: 86 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -12,47 +12,50 @@
1212
#include <trace/events/dlm.h>
1313

1414
#include "dlm_internal.h"
15+
#include "lvb_table.h"
1516
#include "memory.h"
1617
#include "lock.h"
1718
#include "user.h"
1819
#include "ast.h"
1920

20-
void dlm_release_callback(struct kref *ref)
21+
static void dlm_callback_work(struct work_struct *work)
2122
{
22-
struct dlm_callback *cb = container_of(ref, struct dlm_callback, ref);
23+
struct dlm_callback *cb = container_of(work, struct dlm_callback, work);
24+
25+
if (cb->flags & DLM_CB_BAST) {
26+
trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
27+
cb->res_length);
28+
cb->bastfn(cb->astparam, cb->mode);
29+
} else if (cb->flags & DLM_CB_CAST) {
30+
trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
31+
cb->sb_flags, cb->res_name, cb->res_length);
32+
cb->lkb_lksb->sb_status = cb->sb_status;
33+
cb->lkb_lksb->sb_flags = cb->sb_flags;
34+
cb->astfn(cb->astparam);
35+
}
2336

2437
dlm_free_cb(cb);
2538
}
2639

27-
void dlm_callback_set_last_ptr(struct dlm_callback **from,
28-
struct dlm_callback *to)
29-
{
30-
if (*from)
31-
kref_put(&(*from)->ref, dlm_release_callback);
32-
33-
if (to)
34-
kref_get(&to->ref);
35-
36-
*from = to;
37-
}
38-
39-
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
40-
int status, uint32_t sbflags)
40+
int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
41+
int status, uint32_t sbflags,
42+
struct dlm_callback **cb)
4143
{
42-
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
44+
struct dlm_rsb *rsb = lkb->lkb_resource;
4345
int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
44-
struct dlm_callback *cb;
46+
struct dlm_ls *ls = rsb->res_ls;
47+
int copy_lvb = 0;
4548
int prev_mode;
4649

4750
if (flags & DLM_CB_BAST) {
4851
/* if cb is a bast, it should be skipped if the blocking mode is
4952
* compatible with the last granted mode
5053
*/
51-
if (lkb->lkb_last_cast) {
52-
if (dlm_modes_compat(mode, lkb->lkb_last_cast->mode)) {
54+
if (lkb->lkb_last_cast_cb_mode != -1) {
55+
if (dlm_modes_compat(mode, lkb->lkb_last_cast_cb_mode)) {
5356
log_debug(ls, "skip %x bast mode %d for cast mode %d",
5457
lkb->lkb_id, mode,
55-
lkb->lkb_last_cast->mode);
58+
lkb->lkb_last_cast_cb_mode);
5659
goto out;
5760
}
5861
}
@@ -63,8 +66,9 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
6366
* is a bast for the same mode or a more restrictive mode.
6467
* (the addional > PR check is needed for PR/CW inversion)
6568
*/
66-
if (lkb->lkb_last_cb && lkb->lkb_last_cb->flags & DLM_CB_BAST) {
67-
prev_mode = lkb->lkb_last_cb->mode;
69+
if (lkb->lkb_last_cb_mode != -1 &&
70+
lkb->lkb_last_cb_flags & DLM_CB_BAST) {
71+
prev_mode = lkb->lkb_last_cb_mode;
6872

6973
if ((prev_mode == mode) ||
7074
(prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
@@ -73,142 +77,92 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
7377
goto out;
7478
}
7579
}
80+
81+
lkb->lkb_last_bast_time = ktime_get();
82+
lkb->lkb_last_bast_cb_mode = mode;
83+
} else if (flags & DLM_CB_CAST) {
84+
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
85+
prev_mode = lkb->lkb_last_cast_cb_mode;
86+
87+
if (!status && lkb->lkb_lksb->sb_lvbptr &&
88+
dlm_lvb_operations[prev_mode + 1][mode + 1])
89+
copy_lvb = 1;
90+
}
91+
92+
lkb->lkb_last_cast_cb_mode = mode;
93+
lkb->lkb_last_cast_time = ktime_get();
7694
}
7795

78-
cb = dlm_allocate_cb();
79-
if (!cb) {
96+
lkb->lkb_last_cb_mode = mode;
97+
lkb->lkb_last_cb_flags = flags;
98+
99+
*cb = dlm_allocate_cb();
100+
if (!*cb) {
80101
rv = DLM_ENQUEUE_CALLBACK_FAILURE;
81102
goto out;
82103
}
83104

84-
cb->flags = flags;
85-
cb->mode = mode;
86-
cb->sb_status = status;
87-
cb->sb_flags = (sbflags & 0x000000FF);
88-
kref_init(&cb->ref);
89-
if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags))
90-
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
91-
92-
list_add_tail(&cb->list, &lkb->lkb_callbacks);
105+
/* for tracing */
106+
(*cb)->lkb_id = lkb->lkb_id;
107+
(*cb)->ls_id = ls->ls_global_id;
108+
memcpy((*cb)->res_name, rsb->res_name, rsb->res_length);
109+
(*cb)->res_length = rsb->res_length;
93110

94-
if (flags & DLM_CB_CAST)
95-
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb);
111+
(*cb)->flags = flags;
112+
(*cb)->mode = mode;
113+
(*cb)->sb_status = status;
114+
(*cb)->sb_flags = (sbflags & 0x000000FF);
115+
(*cb)->copy_lvb = copy_lvb;
116+
(*cb)->lkb_lksb = lkb->lkb_lksb;
96117

97-
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb);
118+
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
98119

99-
out:
120+
out:
100121
return rv;
101122
}
102123

103-
int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
104-
{
105-
/* oldest undelivered cb is callbacks first entry */
106-
*cb = list_first_entry_or_null(&lkb->lkb_callbacks,
107-
struct dlm_callback, list);
108-
if (!*cb)
109-
return DLM_DEQUEUE_CALLBACK_EMPTY;
110-
111-
/* remove it from callbacks so shift others down */
112-
list_del(&(*cb)->list);
113-
if (list_empty(&lkb->lkb_callbacks))
114-
return DLM_DEQUEUE_CALLBACK_LAST;
115-
116-
return DLM_DEQUEUE_CALLBACK_SUCCESS;
117-
}
118-
119124
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
120-
uint32_t sbflags)
125+
uint32_t sbflags)
121126
{
122127
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
128+
struct dlm_callback *cb;
123129
int rv;
124130

125131
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
126132
dlm_user_add_ast(lkb, flags, mode, status, sbflags);
127133
return;
128134
}
129135

130-
spin_lock(&lkb->lkb_cb_lock);
131-
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
136+
rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags,
137+
&cb);
132138
switch (rv) {
133139
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
134-
kref_get(&lkb->lkb_ref);
135-
136-
spin_lock(&ls->ls_cb_lock);
137-
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
138-
list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
139-
} else {
140-
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
141-
}
142-
spin_unlock(&ls->ls_cb_lock);
143-
break;
144-
case DLM_ENQUEUE_CALLBACK_FAILURE:
145-
WARN_ON_ONCE(1);
140+
cb->astfn = lkb->lkb_astfn;
141+
cb->bastfn = lkb->lkb_bastfn;
142+
cb->astparam = lkb->lkb_astparam;
143+
INIT_WORK(&cb->work, dlm_callback_work);
144+
145+
spin_lock_bh(&ls->ls_cb_lock);
146+
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags))
147+
list_add(&cb->list, &ls->ls_cb_delay);
148+
else
149+
queue_work(ls->ls_callback_wq, &cb->work);
150+
spin_unlock_bh(&ls->ls_cb_lock);
146151
break;
147152
case DLM_ENQUEUE_CALLBACK_SUCCESS:
148153
break;
154+
case DLM_ENQUEUE_CALLBACK_FAILURE:
155+
fallthrough;
149156
default:
150157
WARN_ON_ONCE(1);
151158
break;
152159
}
153-
spin_unlock(&lkb->lkb_cb_lock);
154-
}
155-
156-
void dlm_callback_work(struct work_struct *work)
157-
{
158-
struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
159-
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
160-
void (*castfn) (void *astparam);
161-
void (*bastfn) (void *astparam, int mode);
162-
struct dlm_callback *cb;
163-
int rv;
164-
165-
spin_lock(&lkb->lkb_cb_lock);
166-
rv = dlm_dequeue_lkb_callback(lkb, &cb);
167-
if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
168-
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
169-
spin_unlock(&lkb->lkb_cb_lock);
170-
goto out;
171-
}
172-
spin_unlock(&lkb->lkb_cb_lock);
173-
174-
for (;;) {
175-
castfn = lkb->lkb_astfn;
176-
bastfn = lkb->lkb_bastfn;
177-
178-
if (cb->flags & DLM_CB_BAST) {
179-
trace_dlm_bast(ls, lkb, cb->mode);
180-
lkb->lkb_last_bast_time = ktime_get();
181-
lkb->lkb_last_bast_mode = cb->mode;
182-
bastfn(lkb->lkb_astparam, cb->mode);
183-
} else if (cb->flags & DLM_CB_CAST) {
184-
lkb->lkb_lksb->sb_status = cb->sb_status;
185-
lkb->lkb_lksb->sb_flags = cb->sb_flags;
186-
trace_dlm_ast(ls, lkb);
187-
lkb->lkb_last_cast_time = ktime_get();
188-
castfn(lkb->lkb_astparam);
189-
}
190-
191-
kref_put(&cb->ref, dlm_release_callback);
192-
193-
spin_lock(&lkb->lkb_cb_lock);
194-
rv = dlm_dequeue_lkb_callback(lkb, &cb);
195-
if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
196-
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
197-
spin_unlock(&lkb->lkb_cb_lock);
198-
break;
199-
}
200-
spin_unlock(&lkb->lkb_cb_lock);
201-
}
202-
203-
out:
204-
/* undo kref_get from dlm_add_callback, may cause lkb to be freed */
205-
dlm_put_lkb(lkb);
206160
}
207161

208162
int dlm_callback_start(struct dlm_ls *ls)
209163
{
210-
ls->ls_callback_wq = alloc_workqueue("dlm_callback",
211-
WQ_HIGHPRI | WQ_MEM_RECLAIM, 0);
164+
ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback",
165+
WQ_HIGHPRI | WQ_MEM_RECLAIM);
212166
if (!ls->ls_callback_wq) {
213167
log_print("can't start dlm_callback workqueue");
214168
return -ENOMEM;
@@ -225,9 +179,9 @@ void dlm_callback_stop(struct dlm_ls *ls)
225179
void dlm_callback_suspend(struct dlm_ls *ls)
226180
{
227181
if (ls->ls_callback_wq) {
228-
spin_lock(&ls->ls_cb_lock);
182+
spin_lock_bh(&ls->ls_cb_lock);
229183
set_bit(LSFL_CB_DELAY, &ls->ls_flags);
230-
spin_unlock(&ls->ls_cb_lock);
184+
spin_unlock_bh(&ls->ls_cb_lock);
231185

232186
flush_workqueue(ls->ls_callback_wq);
233187
}
@@ -237,26 +191,26 @@ void dlm_callback_suspend(struct dlm_ls *ls)
237191

238192
void dlm_callback_resume(struct dlm_ls *ls)
239193
{
240-
struct dlm_lkb *lkb, *safe;
194+
struct dlm_callback *cb, *safe;
241195
int count = 0, sum = 0;
242196
bool empty;
243197

244198
if (!ls->ls_callback_wq)
245199
return;
246200

247201
more:
248-
spin_lock(&ls->ls_cb_lock);
249-
list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
250-
list_del_init(&lkb->lkb_cb_list);
251-
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
202+
spin_lock_bh(&ls->ls_cb_lock);
203+
list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
204+
list_del(&cb->list);
205+
queue_work(ls->ls_callback_wq, &cb->work);
252206
count++;
253207
if (count == MAX_CB_QUEUE)
254208
break;
255209
}
256210
empty = list_empty(&ls->ls_cb_delay);
257211
if (empty)
258212
clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
259-
spin_unlock(&ls->ls_cb_lock);
213+
spin_unlock_bh(&ls->ls_cb_lock);
260214

261215
sum += count;
262216
if (!empty) {

fs/dlm/ast.h

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,12 @@
1414
#define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1
1515
#define DLM_ENQUEUE_CALLBACK_SUCCESS 0
1616
#define DLM_ENQUEUE_CALLBACK_FAILURE -1
17-
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
18-
int status, uint32_t sbflags);
19-
#define DLM_DEQUEUE_CALLBACK_EMPTY 2
20-
#define DLM_DEQUEUE_CALLBACK_LAST 1
21-
#define DLM_DEQUEUE_CALLBACK_SUCCESS 0
22-
int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb);
17+
int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
18+
int status, uint32_t sbflags,
19+
struct dlm_callback **cb);
2320
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
2421
uint32_t sbflags);
25-
void dlm_callback_set_last_ptr(struct dlm_callback **from,
26-
struct dlm_callback *to);
2722

28-
void dlm_release_callback(struct kref *ref);
29-
void dlm_callback_work(struct work_struct *work);
3023
int dlm_callback_start(struct dlm_ls *ls);
3124
void dlm_callback_stop(struct dlm_ls *ls);
3225
void dlm_callback_suspend(struct dlm_ls *ls);

fs/dlm/config.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ static void release_node(struct config_item *);
6363
static struct configfs_attribute *comm_attrs[];
6464
static struct configfs_attribute *node_attrs[];
6565

66+
const struct rhashtable_params dlm_rhash_rsb_params = {
67+
.nelem_hint = 3, /* start small */
68+
.key_len = DLM_RESNAME_MAXLEN,
69+
.key_offset = offsetof(struct dlm_rsb, res_name),
70+
.head_offset = offsetof(struct dlm_rsb, res_node),
71+
.automatic_shrinking = true,
72+
};
73+
6674
struct dlm_cluster {
6775
struct config_group group;
6876
unsigned int cl_tcp_port;

fs/dlm/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ struct dlm_config_node {
2121
uint32_t comm_seq;
2222
};
2323

24+
extern const struct rhashtable_params dlm_rhash_rsb_params;
25+
2426
#define DLM_MAX_ADDR_COUNT 3
2527

2628
#define DLM_PROTO_TCP 0

0 commit comments

Comments
 (0)