Skip to content

Commit d00725c

Browse files
Alexander Aringteigland
authored andcommitted
fs: dlm: handle sequence numbers as atomic
Currently seq_next is only be read on the receive side which processed in an ordered way. The seq_send is being protected by locks. To being able to read the seq_next value on send side as well we convert it to an atomic_t value. The atomic_cmpxchg() is probably not necessary, however the atomic_inc() depends on a if coniditional and this should be handled in an atomic context. Signed-off-by: Alexander Aring <[email protected]> Signed-off-by: David Teigland <[email protected]>
1 parent 75a7d60 commit d00725c

File tree

1 file changed

+25
-15
lines changed

1 file changed

+25
-15
lines changed

fs/dlm/midcomms.c

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@
152152
struct midcomms_node {
153153
int nodeid;
154154
uint32_t version;
155-
uint32_t seq_send;
156-
uint32_t seq_next;
155+
atomic_t seq_send;
156+
atomic_t seq_next;
157157
/* These queues are unbound because we cannot drop any message in dlm.
158158
* We could send a fence signal for a specific node to the cluster
159159
* manager if queues hits some maximum value, however this handling
@@ -317,8 +317,8 @@ static void midcomms_node_reset(struct midcomms_node *node)
317317
{
318318
pr_debug("reset node %d\n", node->nodeid);
319319

320-
node->seq_next = DLM_SEQ_INIT;
321-
node->seq_send = DLM_SEQ_INIT;
320+
atomic_set(&node->seq_next, DLM_SEQ_INIT);
321+
atomic_set(&node->seq_send, DLM_SEQ_INIT);
322322
node->version = DLM_VERSION_NOT_SET;
323323
node->flags = 0;
324324

@@ -492,9 +492,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
492492
struct midcomms_node *node,
493493
uint32_t seq)
494494
{
495-
if (seq == node->seq_next) {
496-
node->seq_next++;
495+
bool is_expected_seq;
496+
uint32_t oval, nval;
497497

498+
do {
499+
oval = atomic_read(&node->seq_next);
500+
is_expected_seq = (oval == seq);
501+
if (!is_expected_seq)
502+
break;
503+
504+
nval = oval + 1;
505+
} while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval);
506+
507+
if (is_expected_seq) {
498508
switch (p->header.h_cmd) {
499509
case DLM_FIN:
500510
spin_lock(&node->state_lock);
@@ -503,7 +513,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
503513

504514
switch (node->state) {
505515
case DLM_ESTABLISHED:
506-
dlm_send_ack(node->nodeid, node->seq_next);
516+
dlm_send_ack(node->nodeid, nval);
507517

508518
/* passive shutdown DLM_LAST_ACK case 1
509519
* additional we check if the node is used by
@@ -522,14 +532,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
522532
}
523533
break;
524534
case DLM_FIN_WAIT1:
525-
dlm_send_ack(node->nodeid, node->seq_next);
535+
dlm_send_ack(node->nodeid, nval);
526536
node->state = DLM_CLOSING;
527537
set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
528538
pr_debug("switch node %d to state %s\n",
529539
node->nodeid, dlm_state_str(node->state));
530540
break;
531541
case DLM_FIN_WAIT2:
532-
dlm_send_ack(node->nodeid, node->seq_next);
542+
dlm_send_ack(node->nodeid, nval);
533543
midcomms_node_reset(node);
534544
pr_debug("switch node %d to state %s\n",
535545
node->nodeid, dlm_state_str(node->state));
@@ -557,11 +567,11 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
557567
/* retry to ack message which we already have by sending back
558568
* current node->seq_next number as ack.
559569
*/
560-
if (seq < node->seq_next)
561-
dlm_send_ack(node->nodeid, node->seq_next);
570+
if (seq < oval)
571+
dlm_send_ack(node->nodeid, oval);
562572

563573
log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
564-
seq, node->seq_next, node->nodeid);
574+
seq, oval, node->nodeid);
565575
}
566576
}
567577

@@ -992,7 +1002,7 @@ void dlm_midcomms_receive_done(int nodeid)
9921002
switch (node->state) {
9931003
case DLM_ESTABLISHED:
9941004
spin_unlock(&node->state_lock);
995-
dlm_send_ack(node->nodeid, node->seq_next);
1005+
dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
9961006
break;
9971007
default:
9981008
spin_unlock(&node->state_lock);
@@ -1058,7 +1068,7 @@ static void midcomms_new_msg_cb(void *data)
10581068
list_add_tail_rcu(&mh->list, &mh->node->send_queue);
10591069
spin_unlock_bh(&mh->node->send_queue_lock);
10601070

1061-
mh->seq = mh->node->seq_send++;
1071+
mh->seq = atomic_fetch_inc(&mh->node->seq_send);
10621072
}
10631073

10641074
static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
@@ -1530,7 +1540,7 @@ static void midcomms_new_rawmsg_cb(void *data)
15301540
switch (h->h_cmd) {
15311541
case DLM_OPTS:
15321542
if (!h->u.h_seq)
1533-
h->u.h_seq = cpu_to_le32(rd->node->seq_send++);
1543+
h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send));
15341544
break;
15351545
default:
15361546
break;

0 commit comments

Comments
 (0)