Skip to content

Commit a470cb2

Browse files
Alexander Aringteigland
authored andcommitted
dlm: slow down filling up processing queue
If there is a burst of message the receive worker will filling up the processing queue but where are too slow to process dlm messages. This patch will slow down the receiver worker to keep the buffer on the socket layer to tell the sender to backoff. This is done by a threshold to get the next buffers from the socket after all messages were processed done by a flush_workqueue(). This however only occurs when we have a message burst when we e.g. create 1 million locks. If we put more and more new messages to process in the processqueue we will soon run out of memory. Signed-off-by: Alexander Aring <[email protected]> Signed-off-by: David Teigland <[email protected]>
1 parent 6212e45 commit a470cb2

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

fs/dlm/lowcomms.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
#include "config.h"
6464

6565
#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
66+
#define DLM_MAX_PROCESS_BUFFERS 24
6667
#define NEEDED_RMEM (4*1024*1024)
6768

6869
struct connection {
@@ -194,6 +195,7 @@ static const struct dlm_proto_ops *dlm_proto_ops;
194195
#define DLM_IO_END 1
195196
#define DLM_IO_EOF 2
196197
#define DLM_IO_RESCHED 3
198+
#define DLM_IO_FLUSH 4
197199

198200
static void process_recv_sockets(struct work_struct *work);
199201
static void process_send_sockets(struct work_struct *work);
@@ -202,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work);
202204
static DECLARE_WORK(process_work, process_dlm_messages);
203205
static DEFINE_SPINLOCK(processqueue_lock);
204206
static bool process_dlm_messages_pending;
207+
static atomic_t processqueue_count;
205208
static LIST_HEAD(processqueue);
206209

207210
bool dlm_lowcomms_is_running(void)
@@ -874,6 +877,7 @@ static void process_dlm_messages(struct work_struct *work)
874877
}
875878

876879
list_del(&pentry->list);
880+
atomic_dec(&processqueue_count);
877881
spin_unlock(&processqueue_lock);
878882

879883
for (;;) {
@@ -891,6 +895,7 @@ static void process_dlm_messages(struct work_struct *work)
891895
}
892896

893897
list_del(&pentry->list);
898+
atomic_dec(&processqueue_count);
894899
spin_unlock(&processqueue_lock);
895900
}
896901
}
@@ -962,13 +967,17 @@ static int receive_from_sock(struct connection *con, int buflen)
962967
con->rx_leftover);
963968

964969
spin_lock(&processqueue_lock);
970+
ret = atomic_inc_return(&processqueue_count);
965971
list_add_tail(&pentry->list, &processqueue);
966972
if (!process_dlm_messages_pending) {
967973
process_dlm_messages_pending = true;
968974
queue_work(process_workqueue, &process_work);
969975
}
970976
spin_unlock(&processqueue_lock);
971977

978+
if (ret > DLM_MAX_PROCESS_BUFFERS)
979+
return DLM_IO_FLUSH;
980+
972981
return DLM_IO_SUCCESS;
973982
}
974983

@@ -1503,6 +1512,9 @@ static void process_recv_sockets(struct work_struct *work)
15031512
wake_up(&con->shutdown_wait);
15041513
/* CF_RECV_PENDING cleared */
15051514
break;
1515+
case DLM_IO_FLUSH:
1516+
flush_workqueue(process_workqueue);
1517+
fallthrough;
15061518
case DLM_IO_RESCHED:
15071519
cond_resched();
15081520
queue_work(io_workqueue, &con->rwork);

0 commit comments

Comments
 (0)