Skip to content

Commit 6d43078

Browse files
committed
Merge pull request #32 from maralla/sndbuf-full
fix bug of client writable events cannot be triggered when sndbuf full
2 parents e543e82 + ac09b69 commit 6d43078

File tree

9 files changed

+72
-18
lines changed

9 files changed

+72
-18
lines changed

src/client.c

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,24 @@ int client_trigger_event(struct connection *client, struct mbuf *buf)
2020
return CORVUS_OK;
2121
}
2222

23-
int client_read(struct connection *client, int read_socket)
23+
int client_read(struct connection *client)
2424
{
2525
struct command *cmd;
2626
struct mbuf *buf;
2727
int status = CORVUS_OK, limit = 16;
2828

29-
if (!STAILQ_EMPTY(&client->info->cmd_queue)
30-
&& STAILQ_FIRST(&client->info->cmd_queue)->parse_done)
31-
{
32-
event_reregister(&client->ctx->loop, client, E_WRITABLE);
33-
return CORVUS_OK;
29+
cmd = STAILQ_FIRST(&client->info->cmd_queue);
30+
if (cmd != NULL && cmd->parse_done) {
31+
int size = socket_sndbuf_size(client->fd);
32+
LOG(DEBUG, "%p %d %d", cmd, client->info->sndbuf, size);
33+
if (size >= 0 && client->info->sndbuf > size) {
34+
return CORVUS_OK;
35+
}
3436
}
3537

3638
do {
3739
buf = conn_get_buf(client);
3840
if (mbuf_read_size(buf) <= 0) {
39-
if (!read_socket) break;
4041
buf = conn_get_buf(client);
4142
status = conn_read(client, buf);
4243
if (status != CORVUS_OK) return status;
@@ -89,6 +90,10 @@ int client_write(struct connection *client)
8990
}
9091

9192
if (info->iov.len <= 0) {
93+
if (!STAILQ_EMPTY(&info->cmd_queue) && conn_register(client) == CORVUS_ERR) {
94+
LOG(ERROR, "client_write: fail to reregister client %d", client->fd);
95+
return CORVUS_ERR;
96+
}
9297
cmd_iov_reset(&info->iov);
9398
return CORVUS_OK;
9499
}
@@ -102,7 +107,7 @@ int client_write(struct connection *client)
102107
if (status == CORVUS_AGAIN) return CORVUS_OK;
103108

104109
if (info->iov.cursor >= info->iov.len) {
105-
cmd_iov_reset(&info->iov);
110+
cmd_iov_free(&info->iov);
106111
if (event_reregister(&ctx->loop, client, E_READABLE) == CORVUS_ERR) {
107112
LOG(ERROR, "client_write: fail to reregister client %d", client->fd);
108113
return CORVUS_ERR;
@@ -112,7 +117,7 @@ int client_write(struct connection *client)
112117
client->fd, client->ev->fd);
113118
return CORVUS_ERR;
114119
}
115-
} else if (event_reregister(&ctx->loop, client, E_WRITABLE) == CORVUS_ERR) {
120+
} else if (conn_register(client) == CORVUS_ERR) {
116121
LOG(ERROR, "client_write: fail to reregister client %d", client->fd);
117122
return CORVUS_ERR;
118123
}
@@ -147,7 +152,7 @@ void client_ready(struct connection *self, uint32_t mask)
147152
if (mask & E_READABLE) {
148153
LOG(DEBUG, "client readable");
149154

150-
int status = client_read(self, 1);
155+
int status = client_read(self);
151156
if (status == CORVUS_ERR || status == CORVUS_EOF) {
152157
client_eof(self);
153158
return;
@@ -178,7 +183,7 @@ void client_event_ready(struct connection *self, uint32_t mask)
178183
client->info->last_active = time(NULL);
179184

180185
if (mask & E_READABLE) {
181-
if (client_read(client, 0) == CORVUS_ERR) {
186+
if (client_read(client) == CORVUS_ERR) {
182187
client_eof(client);
183188
return;
184189
}
@@ -208,6 +213,12 @@ struct connection *client_create(struct context *ctx, int fd)
208213
return NULL;
209214
}
210215

216+
client->info->sndbuf = socket_get_sndbuf(client->fd);
217+
if (client->info->sndbuf == -1) {
218+
client->info->sndbuf = DEFAULT_SNDBUF;
219+
}
220+
client->info->sndbuf >>= 1;
221+
211222
int evfd = socket_create_eventfd();
212223
client->ev = conn_create(ctx);
213224
if (evfd == -1 || client->ev == NULL) {
@@ -248,7 +259,7 @@ void client_eof(struct connection *client)
248259

249260
// don't care response any more
250261
cmd_iov_clear(client->ctx, &client->info->iov);
251-
cmd_iov_reset(&client->info->iov);
262+
cmd_iov_free(&client->info->iov);
252263

253264
// request may not write
254265
if (client->info->refcount <= 0) {

src/command.c

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ int cmd_extra(struct command *cmd, struct redis_data *data)
591591

592592
int cmd_forward(struct command *cmd, struct redis_data *data)
593593
{
594-
LOG(DEBUG, "forward command %d", cmd->cmd_type);
594+
LOG(DEBUG, "forward command %p(%d)", cmd, cmd->cmd_type);
595595
switch (cmd->request_type) {
596596
case CMD_BASIC:
597597
cmd->slot = cmd_get_slot(data);
@@ -798,9 +798,7 @@ void cmd_mark(struct command *cmd, int fail)
798798
}
799799
}
800800

801-
if (root != NULL && event_reregister(&cmd->ctx->loop,
802-
root->client, E_WRITABLE) == CORVUS_ERR)
803-
{
801+
if (root != NULL && conn_register(root->client) == CORVUS_ERR) {
804802
LOG(ERROR, "fail to reregister client %d", root->client->fd);
805803
client_eof(root->client);
806804
}

src/connection.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ void conn_info_init(struct conn_info *info)
9696
reader_init(&info->reader);
9797

9898
info->last_active = -1;
99+
info->sndbuf = 0;
99100

100101
STAILQ_INIT(&info->cmd_queue);
101102
STAILQ_INIT(&info->ready_queue);

src/connection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ struct conn_info {
4444
struct reader reader;
4545

4646
int64_t last_active;
47+
int sndbuf;
4748

4849
struct cmd_tqh cmd_queue;
4950
struct cmd_tqh ready_queue;

src/mbuf.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include "corvus.h"
44
#include "logging.h"
55

6+
#define RECYCLE_LENGTH 8192 // 128mb
7+
68
static struct mbuf *_mbuf_get(struct context *ctx)
79
{
810
struct mbuf *mbuf;
@@ -69,6 +71,11 @@ void mbuf_recycle(struct context *ctx, struct mbuf *mbuf)
6971
{
7072
ATOMIC_DEC(ctx->mstats.buffers, 1);
7173

74+
if (ATOMIC_GET(ctx->mstats.free_buffers) > RECYCLE_LENGTH) {
75+
mbuf_free(ctx, mbuf);
76+
return;
77+
}
78+
7279
TAILQ_NEXT(mbuf, next) = NULL;
7380
TAILQ_INSERT_HEAD(&ctx->free_mbufq, mbuf, next);
7481

src/server.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ int server_write(struct connection *server)
8585
ATOMIC_INC(info->send_bytes, status);
8686

8787
if (info->iov.cursor >= info->iov.len) {
88-
cmd_iov_reset(&info->iov);
88+
cmd_iov_free(&info->iov);
8989
} else if (conn_register(server) == CORVUS_ERR) {
9090
LOG(ERROR, "server_write: fail to reregister server %d", server->fd);
9191
return CORVUS_ERR;
@@ -320,7 +320,7 @@ void server_eof(struct connection *server, const char *reason)
320320
event_deregister(&server->ctx->loop, server);
321321

322322
// drop all unsent requests
323-
cmd_iov_reset(&server->info->iov);
323+
cmd_iov_free(&server->info->iov);
324324
conn_free(server);
325325
slot_create_job(SLOT_UPDATE);
326326
}

src/socket.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <netinet/tcp.h>
44
#include <sys/uio.h>
55
#include <sys/eventfd.h>
6+
#include <sys/ioctl.h>
67
#include <netdb.h>
78
#include <stdio.h>
89
#include <stdlib.h>
@@ -400,3 +401,25 @@ int socket_trigger_event(int evfd)
400401
}
401402
return CORVUS_OK;
402403
}
404+
405+
int socket_get_sndbuf(int fd)
406+
{
407+
int n;
408+
socklen_t len = sizeof(n);
409+
410+
if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &n, &len) == -1) {
411+
LOG(WARN, "getsockopt SO_SNDBUF: %s", strerror(errno));
412+
return CORVUS_ERR;
413+
}
414+
return n;
415+
}
416+
417+
int socket_sndbuf_size(int fd)
418+
{
419+
int size = 0;
420+
if (ioctl(fd, TIOCOUTQ, &size) == -1) {
421+
LOG(WARN, "ioctl TIOCOUTQ: %s", strerror(errno));
422+
return CORVUS_ERR;
423+
}
424+
return size;
425+
}

src/socket.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <limits.h>
66
#include "mbuf.h"
77

8+
#define DEFAULT_SNDBUF 32768
89
#define IP_LEN 45
910
#define DSN_LEN (IP_LEN + 8)
1011

@@ -33,5 +34,7 @@ int socket_parse_ip(char *addr, struct address *address);
3334
void socket_get_key(struct address *addr, char *dst);
3435
int socket_create_eventfd();
3536
int socket_trigger_event(int evfd);
37+
int socket_sndbuf_size(int fd);
38+
int socket_get_sndbuf(int fd);
3639

3740
#endif /* end of include guard: __SOCKET_H */

tests/test_corvus.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3126,3 +3126,13 @@ def test_delete_node(delete_keys):
31263126
cluster.wait()
31273127

31283128
assert r.get("hello") == "123"
3129+
3130+
3131+
def test_writable(delete_keys):
3132+
delete_keys.keys("hello")
3133+
3134+
with r.pipeline(transaction=False) as p:
3135+
for u in range(20):
3136+
p.set('hello', 'x' * 1024 * 1024 * 17)
3137+
p.get('hello')
3138+
p.execute()

0 commit comments

Comments
 (0)