Skip to content

Commit 1696c75

Browse files
Alexander Aringteigland
authored andcommitted
fs: dlm: add send ack threshold and append acks to msgs
This patch changes the time when we sending an ack back to tell the other side it can free some message because it is arrived on the receiver node, due random reconnects e.g. TCP resets this is handled as well on application layer to not let DLM run into a deadlock state. The current handling has the following problems: 1. We end in situations that we only send an ack back message of 16 bytes out and no other messages. Whereas DLM has logic to combine so much messages as it can in one send() socket call. This behaviour can be discovered by "trace-cmd start -e dlm_recv" and observing the ret field being 16 bytes. 2. When processing of DLM messages will never end because we receive a lot of messages, we will not send an ack back as it happens when the processing loop ends. This patch introduces a likely and unlikely threshold case. The likely case will send an ack back on a transmit path if the threshold is triggered of amount of processed upper layer protocol. This will solve issue 1 because it will be send when another normal DLM message will be sent. It solves issue 2 because it is not part of the processing loop. There is however a unlikely case, the unlikely case has a bigger threshold and will be triggered when we only receive messages and do not sent any message back. This case avoids that the sending node will keep a lot of message for a long time as we send sometimes ack backs to tell the sender to finally release messages. The atomic cmpxchg() is there to provide a atomically ack send with reset of the upper layer protocol delivery counter. Signed-off-by: Alexander Aring <[email protected]> Signed-off-by: David Teigland <[email protected]>
1 parent d00725c commit 1696c75

File tree

2 files changed

+31
-75
lines changed

2 files changed

+31
-75
lines changed

fs/dlm/lowcomms.c

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -860,30 +860,8 @@ struct dlm_processed_nodes {
860860
struct list_head list;
861861
};
862862

863-
static void add_processed_node(int nodeid, struct list_head *processed_nodes)
864-
{
865-
struct dlm_processed_nodes *n;
866-
867-
list_for_each_entry(n, processed_nodes, list) {
868-
/* we already remembered this node */
869-
if (n->nodeid == nodeid)
870-
return;
871-
}
872-
873-
/* if it's fails in worst case we simple don't send an ack back.
874-
* We try it next time.
875-
*/
876-
n = kmalloc(sizeof(*n), GFP_NOFS);
877-
if (!n)
878-
return;
879-
880-
n->nodeid = nodeid;
881-
list_add(&n->list, processed_nodes);
882-
}
883-
884863
static void process_dlm_messages(struct work_struct *work)
885864
{
886-
struct dlm_processed_nodes *n, *n_tmp;
887865
struct processqueue_entry *pentry;
888866
LIST_HEAD(processed_nodes);
889867

@@ -902,7 +880,6 @@ static void process_dlm_messages(struct work_struct *work)
902880
for (;;) {
903881
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
904882
pentry->buflen);
905-
add_processed_node(pentry->nodeid, &processed_nodes);
906883
free_processqueue_entry(pentry);
907884

908885
spin_lock(&processqueue_lock);
@@ -917,13 +894,6 @@ static void process_dlm_messages(struct work_struct *work)
917894
list_del(&pentry->list);
918895
spin_unlock(&processqueue_lock);
919896
}
920-
921-
/* send ack back after we processed couple of messages */
922-
list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
923-
list_del(&n->list);
924-
dlm_midcomms_receive_done(n->nodeid);
925-
kfree(n);
926-
}
927897
}
928898

929899
/* Data received from remote end */

fs/dlm/midcomms.c

Lines changed: 31 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@
148148
/* 5 seconds wait to sync ending of dlm */
149149
#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000)
150150
#define DLM_VERSION_NOT_SET 0
151+
#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32
152+
#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8)
151153

152154
struct midcomms_node {
153155
int nodeid;
@@ -165,7 +167,7 @@ struct midcomms_node {
165167
#define DLM_NODE_FLAG_CLOSE 1
166168
#define DLM_NODE_FLAG_STOP_TX 2
167169
#define DLM_NODE_FLAG_STOP_RX 3
168-
#define DLM_NODE_ULP_DELIVERED 4
170+
atomic_t ulp_delivered;
169171
unsigned long flags;
170172
wait_queue_head_t shutdown_wait;
171173

@@ -319,6 +321,7 @@ static void midcomms_node_reset(struct midcomms_node *node)
319321

320322
atomic_set(&node->seq_next, DLM_SEQ_INIT);
321323
atomic_set(&node->seq_send, DLM_SEQ_INIT);
324+
atomic_set(&node->ulp_delivered, 0);
322325
node->version = DLM_VERSION_NOT_SET;
323326
node->flags = 0;
324327

@@ -393,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
393396
return 0;
394397
}
395398

399+
static void dlm_send_ack_threshold(struct midcomms_node *node,
400+
uint32_t threshold)
401+
{
402+
uint32_t oval, nval;
403+
bool send_ack;
404+
405+
/* let only send one user trigger threshold to send ack back */
406+
do {
407+
oval = atomic_read(&node->ulp_delivered);
408+
send_ack = (oval > threshold);
409+
/* abort if threshold is not reached */
410+
if (!send_ack)
411+
break;
412+
413+
nval = 0;
414+
/* try to reset ulp_delivered counter */
415+
} while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval);
416+
417+
if (send_ack)
418+
dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
419+
}
420+
396421
static int dlm_send_fin(struct midcomms_node *node,
397422
void (*ack_rcv)(struct midcomms_node *node))
398423
{
@@ -560,7 +585,9 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
560585
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
561586
dlm_receive_buffer_3_2_trace(seq, p);
562587
dlm_receive_buffer(p, node->nodeid);
563-
set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
588+
atomic_inc(&node->ulp_delivered);
589+
/* unlikely case to send ack back when we don't transmit */
590+
dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD);
564591
break;
565592
}
566593
} else {
@@ -969,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
969996
return ret;
970997
}
971998

972-
void dlm_midcomms_receive_done(int nodeid)
973-
{
974-
struct midcomms_node *node;
975-
int idx;
976-
977-
idx = srcu_read_lock(&nodes_srcu);
978-
node = nodeid2node(nodeid, 0);
979-
if (!node) {
980-
srcu_read_unlock(&nodes_srcu, idx);
981-
return;
982-
}
983-
984-
/* old protocol, we do nothing */
985-
switch (node->version) {
986-
case DLM_VERSION_3_2:
987-
break;
988-
default:
989-
srcu_read_unlock(&nodes_srcu, idx);
990-
return;
991-
}
992-
993-
/* do nothing if we didn't delivered stateful to ulp */
994-
if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
995-
&node->flags)) {
996-
srcu_read_unlock(&nodes_srcu, idx);
997-
return;
998-
}
999-
1000-
spin_lock(&node->state_lock);
1001-
/* we only ack if state is ESTABLISHED */
1002-
switch (node->state) {
1003-
case DLM_ESTABLISHED:
1004-
spin_unlock(&node->state_lock);
1005-
dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
1006-
break;
1007-
default:
1008-
spin_unlock(&node->state_lock);
1009-
/* do nothing FIN has it's own ack send */
1010-
break;
1011-
}
1012-
srcu_read_unlock(&nodes_srcu, idx);
1013-
}
1014-
1015999
void dlm_midcomms_unack_msg_resend(int nodeid)
10161000
{
10171001
struct midcomms_node *node;
@@ -1142,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
11421126
goto err;
11431127
}
11441128

1129+
/* send ack back if necessary */
1130+
dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
11451131
break;
11461132
default:
11471133
dlm_free_mhandle(mh);

0 commit comments

Comments
 (0)