Skip to content

Commit c288745

Browse files
Alexander Aringteigland
authored andcommitted
dlm: avoid blocking receive at the end of recovery
The end of the recovery process transitioned to normal message processing by temporarily blocking the receiving context, processing saved messages, then unblocking the receiving context. To avoid blocking the receiving context, the old wait_queue and mutex are replaced by a new rwlock and the new RECV_MSG_BLOCKED flag. Received messages are added to the list of saved messages, protected by the rwlock, until the flag is cleared, which happens when all saved messages have been processed. Signed-off-by: Alexander Aring <[email protected]> Signed-off-by: David Teigland <[email protected]>
1 parent cc396e2 commit c288745

File tree

5 files changed

+30
-41
lines changed

5 files changed

+30
-41
lines changed

fs/dlm/dlm_internal.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -655,9 +655,7 @@ struct dlm_ls {
655655
struct rw_semaphore ls_in_recovery; /* block local requests */
656656
struct rw_semaphore ls_recv_active; /* block dlm_recv */
657657
struct list_head ls_requestqueue;/* queue remote requests */
658-
atomic_t ls_requestqueue_cnt;
659-
wait_queue_head_t ls_requestqueue_wait;
660-
struct mutex ls_requestqueue_mutex;
658+
rwlock_t ls_requestqueue_lock;
661659
struct dlm_rcom *ls_recover_buf;
662660
int ls_recover_nodeid; /* for debugging */
663661
unsigned int ls_recover_locks_in; /* for log info */
@@ -717,6 +715,7 @@ struct dlm_ls {
717715
#define LSFL_UEVENT_WAIT 7
718716
#define LSFL_CB_DELAY 9
719717
#define LSFL_NODIR 10
718+
#define LSFL_RECV_MSG_BLOCKED 11
720719

721720
#define DLM_PROC_FLAGS_CLOSING 1
722721
#define DLM_PROC_FLAGS_COMPAT 2

fs/dlm/lock.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4752,20 +4752,32 @@ static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
47524752
static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
47534753
int nodeid)
47544754
{
4755-
if (dlm_locking_stopped(ls)) {
4755+
try_again:
4756+
read_lock(&ls->ls_requestqueue_lock);
4757+
if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
47564758
/* If we were a member of this lockspace, left, and rejoined,
47574759
other nodes may still be sending us messages from the
47584760
lockspace generation before we left. */
47594761
if (WARN_ON_ONCE(!ls->ls_generation)) {
4762+
read_unlock(&ls->ls_requestqueue_lock);
47604763
log_limit(ls, "receive %d from %d ignore old gen",
47614764
le32_to_cpu(ms->m_type), nodeid);
47624765
return;
47634766
}
47644767

4768+
read_unlock(&ls->ls_requestqueue_lock);
4769+
write_lock(&ls->ls_requestqueue_lock);
4770+
/* recheck because we hold writelock now */
4771+
if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4772+
write_unlock_bh(&ls->ls_requestqueue_lock);
4773+
goto try_again;
4774+
}
4775+
47654776
dlm_add_requestqueue(ls, nodeid, ms);
4777+
write_unlock(&ls->ls_requestqueue_lock);
47664778
} else {
4767-
dlm_wait_requestqueue(ls);
47684779
_receive_message(ls, ms, 0);
4780+
read_unlock(&ls->ls_requestqueue_lock);
47694781
}
47704782
}
47714783

fs/dlm/lockspace.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -554,9 +554,7 @@ static int new_lockspace(const char *name, const char *cluster,
554554
init_rwsem(&ls->ls_in_recovery);
555555
init_rwsem(&ls->ls_recv_active);
556556
INIT_LIST_HEAD(&ls->ls_requestqueue);
557-
atomic_set(&ls->ls_requestqueue_cnt, 0);
558-
init_waitqueue_head(&ls->ls_requestqueue_wait);
559-
mutex_init(&ls->ls_requestqueue_mutex);
557+
rwlock_init(&ls->ls_requestqueue_lock);
560558
spin_lock_init(&ls->ls_clear_proc_locks);
561559

562560
/* Due backwards compatibility with 3.1 we need to use maximum

fs/dlm/member.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,11 @@ int dlm_ls_stop(struct dlm_ls *ls)
642642
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
643643
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
644644
ls->ls_recover_seq++;
645+
646+
/* activate requestqueue and stop processing */
647+
write_lock(&ls->ls_requestqueue_lock);
648+
set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
649+
write_unlock(&ls->ls_requestqueue_lock);
645650
spin_unlock(&ls->ls_recover_lock);
646651

647652
/*

fs/dlm/requestqueue.c

Lines changed: 8 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
4848
memcpy(&e->request, ms, sizeof(*ms));
4949
memcpy(&e->request.m_extra, ms->m_extra, length);
5050

51-
atomic_inc(&ls->ls_requestqueue_cnt);
52-
mutex_lock(&ls->ls_requestqueue_mutex);
5351
list_add_tail(&e->list, &ls->ls_requestqueue);
54-
mutex_unlock(&ls->ls_requestqueue_mutex);
5552
}
5653

5754
/*
@@ -71,16 +68,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
7168
struct dlm_message *ms;
7269
int error = 0;
7370

74-
mutex_lock(&ls->ls_requestqueue_mutex);
75-
71+
write_lock(&ls->ls_requestqueue_lock);
7672
for (;;) {
7773
if (list_empty(&ls->ls_requestqueue)) {
78-
mutex_unlock(&ls->ls_requestqueue_mutex);
74+
clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
7975
error = 0;
8076
break;
8177
}
82-
e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
83-
mutex_unlock(&ls->ls_requestqueue_mutex);
78+
e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list);
8479

8580
ms = &e->request;
8681

@@ -93,41 +88,23 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
9388
e->recover_seq);
9489

9590
dlm_receive_message_saved(ls, &e->request, e->recover_seq);
96-
97-
mutex_lock(&ls->ls_requestqueue_mutex);
9891
list_del(&e->list);
99-
if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
100-
wake_up(&ls->ls_requestqueue_wait);
10192
kfree(e);
10293

10394
if (dlm_locking_stopped(ls)) {
10495
log_debug(ls, "process_requestqueue abort running");
105-
mutex_unlock(&ls->ls_requestqueue_mutex);
10696
error = -EINTR;
10797
break;
10898
}
99+
write_unlock(&ls->ls_requestqueue_lock);
109100
schedule();
101+
write_lock(&ls->ls_requestqueue_lock);
110102
}
103+
write_unlock(&ls->ls_requestqueue_lock);
111104

112105
return error;
113106
}
114107

115-
/*
116-
* After recovery is done, locking is resumed and dlm_recoverd takes all the
117-
* saved requests and processes them as they would have been by dlm_recv. At
118-
* the same time, dlm_recv will start receiving new requests from remote nodes.
119-
* We want to delay dlm_recv processing new requests until dlm_recoverd has
120-
* finished processing the old saved requests. We don't check for locking
121-
* stopped here because dlm_ls_stop won't stop locking until it's suspended us
122-
* (dlm_recv).
123-
*/
124-
125-
void dlm_wait_requestqueue(struct dlm_ls *ls)
126-
{
127-
wait_event(ls->ls_requestqueue_wait,
128-
atomic_read(&ls->ls_requestqueue_cnt) == 0);
129-
}
130-
131108
static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
132109
{
133110
__le32 type = ms->m_type;
@@ -158,17 +135,15 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
158135
struct dlm_message *ms;
159136
struct rq_entry *e, *safe;
160137

161-
mutex_lock(&ls->ls_requestqueue_mutex);
138+
write_lock(&ls->ls_requestqueue_lock);
162139
list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
163140
ms = &e->request;
164141

165142
if (purge_request(ls, ms, e->nodeid)) {
166143
list_del(&e->list);
167-
if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
168-
wake_up(&ls->ls_requestqueue_wait);
169144
kfree(e);
170145
}
171146
}
172-
mutex_unlock(&ls->ls_requestqueue_mutex);
147+
write_unlock(&ls->ls_requestqueue_lock);
173148
}
174149

0 commit comments

Comments
 (0)