Skip to content

Commit e543e82

Browse files
committed
Merge pull request #31 from maralla/atomic-stats
send incremental stats, closes #23
2 parents 082b232 + 1d28cda commit e543e82

File tree

10 files changed

+211
-113
lines changed

10 files changed

+211
-113
lines changed

src/client.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ void client_eof(struct connection *client)
239239
cmd_set_stale(cmd);
240240
}
241241

242-
client->ctx->stats.connected_clients--;
242+
ATOMIC_DEC(client->ctx->stats.connected_clients, 1);
243243

244244
event_deregister(&client->ctx->loop, client);
245245
if (client->ev != NULL && !client->event_triggered) {

src/command.c

Lines changed: 90 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,20 @@
147147
/* misc */ \
148148
HANDLER(AUTH, UNIMPL) \
149149
HANDLER(ECHO, UNIMPL) \
150-
HANDLER(PING, PROXY) \
151-
HANDLER(INFO, PROXY) \
150+
HANDLER(PING, EXTRA) \
151+
HANDLER(INFO, EXTRA) \
152+
HANDLER(PROXY, EXTRA) \
152153
HANDLER(QUIT, UNIMPL) \
153154
HANDLER(SELECT, UNIMPL)
154155

156+
#define CMD_INCREF(cmd) \
157+
do { \
158+
(cmd)->rep_buf[0].buf->refcount++; \
159+
if ((cmd)->rep_buf[1].buf != (cmd)->rep_buf[0].buf) { \
160+
(cmd)->rep_buf[1].buf->refcount++; \
161+
} \
162+
} while (0)
163+
155164
enum {
156165
CMD_DO(CMD_DEFINE)
157166
};
@@ -160,7 +169,7 @@ enum {
160169
CMD_UNIMPL,
161170
CMD_BASIC,
162171
CMD_COMPLEX,
163-
CMD_PROXY,
172+
CMD_EXTRA,
164173
};
165174

166175
struct cmd_item {
@@ -223,13 +232,15 @@ static void cmd_init(struct context *ctx, struct command *cmd)
223232

224233
static void cmd_recycle(struct context *ctx, struct command *cmd)
225234
{
226-
ctx->stats.cmds--;
235+
ATOMIC_DEC(ctx->mstats.cmds, 1);
236+
227237
STAILQ_NEXT(cmd, cmd_next) = NULL;
228238
STAILQ_NEXT(cmd, ready_next) = NULL;
229239
STAILQ_NEXT(cmd, waiting_next) = NULL;
230240
STAILQ_NEXT(cmd, sub_cmd_next) = NULL;
231241
STAILQ_INSERT_HEAD(&ctx->free_cmdq, cmd, cmd_next);
232-
ctx->nfree_cmdq++;
242+
243+
ATOMIC_INC(ctx->mstats.free_cmds, 1);
233244
}
234245

235246
static int cmd_in_queue(struct command *cmd, struct connection *server)
@@ -287,30 +298,14 @@ static int cmd_format_stats(char *dest, size_t n, struct stats *stats, char *lat
287298
"remote_latency:%.6f\r\n"
288299
"total_latency:%.6f\r\n"
289300
"last_command_latency:%s\r\n"
290-
"in_use_buffers:%lld\r\n"
291-
"free_buffers:%lld\r\n"
292-
"in_use_cmds:%lld\r\n"
293-
"free_cmds:%lld\r\n"
294-
"in_use_conns:%lld\r\n"
295-
"free_conns:%lld\r\n"
296-
"in_use_conn_info:%lld\r\n"
297-
"free_conn_info:%lld\r\n"
298301
"remotes:%s\r\n",
299302
config.cluster, VERSION, stats->pid, stats->threads,
300303
stats->used_cpu_sys, stats->used_cpu_user,
301304
stats->basic.connected_clients,
302305
stats->basic.completed_commands,
303306
stats->basic.recv_bytes, stats->basic.send_bytes,
304-
stats->basic.remote_latency,
305-
stats->basic.total_latency, latency,
306-
stats->basic.buffers,
307-
stats->free_buffers,
308-
stats->basic.cmds,
309-
stats->free_cmds,
310-
stats->basic.conns,
311-
stats->free_conns,
312-
stats->basic.conn_info,
313-
stats->free_conn_info,
307+
stats->basic.remote_latency / 1000000.0,
308+
stats->basic.total_latency / 1000000.0, latency,
314309
stats->remote_nodes);
315310
}
316311

@@ -484,11 +479,7 @@ int cmd_ping(struct command *cmd)
484479
{
485480
conn_add_data(cmd->client, (uint8_t*)rep_ping, 7,
486481
&cmd->rep_buf[0], &cmd->rep_buf[1]);
487-
488-
cmd->rep_buf[0].buf->refcount++;
489-
if (cmd->rep_buf[1].buf != cmd->rep_buf[0].buf) {
490-
cmd->rep_buf[1].buf->refcount++;
491-
}
482+
CMD_INCREF(cmd);
492483

493484
cmd_mark_done(cmd);
494485
return CORVUS_OK;
@@ -505,7 +496,7 @@ int cmd_info(struct command *cmd)
505496
memset(latency, 0, sizeof(latency));
506497

507498
for (i = 0; i < stats.threads; i++) {
508-
n = snprintf(latency + size, 16, "%.6f", stats.last_command_latency[i]);
499+
n = snprintf(latency + size, 16, "%.6f", stats.last_command_latency[i] / 1000000.0);
509500
size += n;
510501
if (i < stats.threads - 1) {
511502
latency[size] = ',';
@@ -522,23 +513,75 @@ int cmd_info(struct command *cmd)
522513
snprintf(head, sizeof(head), fmt, n);
523514

524515
conn_add_data(cmd->client, (uint8_t*)head, size, &cmd->rep_buf[0], NULL);
525-
cmd->rep_buf[0].buf->refcount++;
526516
conn_add_data(cmd->client, (uint8_t*)info, n, NULL, NULL);
527517
conn_add_data(cmd->client, (uint8_t*)"\r\n", 2, NULL, &cmd->rep_buf[1]);
528-
if (cmd->rep_buf[1].buf != cmd->rep_buf[0].buf) {
529-
cmd->rep_buf[1].buf->refcount++;
530-
}
518+
CMD_INCREF(cmd);
519+
520+
cmd_mark_done(cmd);
521+
return CORVUS_OK;
522+
}
523+
524+
int cmd_proxy_info(struct command *cmd)
525+
{
526+
struct memory_stats stats;
527+
memset(&stats, 0, sizeof(stats));
528+
stats_get_memory(&stats);
529+
530+
int n = 1024;
531+
char data[n + 1];
532+
snprintf(data, n,
533+
"+"
534+
"in_use_buffers:%lld\n"
535+
"free_buffers:%lld\n"
536+
"in_use_cmds:%lld\n"
537+
"free_cmds:%lld\n"
538+
"in_use_conns:%lld\n"
539+
"free_conns:%lld\n"
540+
"in_use_conn_info:%lld\n"
541+
"free_conn_info:%lld"
542+
"\r\n",
543+
stats.buffers, stats.free_buffers, stats.cmds, stats.free_cmds,
544+
stats.conns, stats.free_conns, stats.conn_info, stats.free_conn_info);
545+
546+
conn_add_data(cmd->client, (uint8_t*)data, strlen(data),
547+
&cmd->rep_buf[0], &cmd->rep_buf[1]);
548+
CMD_INCREF(cmd);
549+
531550
cmd_mark_done(cmd);
532551
return CORVUS_OK;
533552
}
534553

535-
int cmd_proxy(struct command *cmd)
554+
int cmd_proxy(struct command *cmd, struct redis_data *data)
555+
{
556+
ASSERT_TYPE(data, REP_ARRAY);
557+
ASSERT_ELEMENTS(data->elements >= 2, data);
558+
559+
struct redis_data *op = &data->element[1];
560+
ASSERT_TYPE(op, REP_STRING);
561+
562+
char type[op->pos.str_len + 1];
563+
if (pos_to_str(&op->pos, type) == CORVUS_ERR) {
564+
LOG(ERROR, "cmd_proxy: parse error");
565+
return CORVUS_ERR;
566+
}
567+
568+
if (strcasecmp(type, "INFO") == 0) {
569+
return cmd_proxy_info(cmd);
570+
} else {
571+
cmd_mark_fail(cmd, rep_err);
572+
}
573+
return CORVUS_OK;
574+
}
575+
576+
int cmd_extra(struct command *cmd, struct redis_data *data)
536577
{
537578
switch (cmd->cmd_type) {
538579
case CMD_PING:
539580
return cmd_ping(cmd);
540581
case CMD_INFO:
541582
return cmd_info(cmd);
583+
case CMD_PROXY:
584+
return cmd_proxy(cmd, data);
542585
default:
543586
LOG(ERROR, "%s: unknown command type %d", __func__, cmd->cmd_type);
544587
return CORVUS_ERR;
@@ -555,8 +598,8 @@ int cmd_forward(struct command *cmd, struct redis_data *data)
555598
return cmd_forward_basic(cmd);
556599
case CMD_COMPLEX:
557600
return cmd_forward_complex(cmd, data);
558-
case CMD_PROXY:
559-
return cmd_proxy(cmd);
601+
case CMD_EXTRA:
602+
return cmd_extra(cmd, data);
560603
case CMD_UNIMPL:
561604
return CORVUS_ERR;
562605
}
@@ -786,13 +829,13 @@ struct command *cmd_create(struct context *ctx)
786829
LOG(DEBUG, "cmd get cache");
787830
cmd = STAILQ_FIRST(&ctx->free_cmdq);
788831
STAILQ_REMOVE_HEAD(&ctx->free_cmdq, cmd_next);
789-
ctx->nfree_cmdq--;
832+
ATOMIC_DEC(ctx->mstats.free_cmds, 1);
790833
STAILQ_NEXT(cmd, cmd_next) = NULL;
791834
} else {
792835
cmd = malloc(sizeof(struct command));
793836
}
794837
cmd_init(ctx, cmd);
795-
ctx->stats.cmds++;
838+
ATOMIC_INC(ctx->mstats.cmds, 1);
796839
return cmd;
797840
}
798841

@@ -923,22 +966,23 @@ void cmd_stats(struct command *cmd)
923966
{
924967
struct context *ctx = cmd->ctx;
925968
struct command *last, *first;
926-
double latency;
969+
long long latency;
970+
971+
ATOMIC_INC(ctx->stats.completed_commands, 1);
927972

928-
ctx->stats.completed_commands++;
973+
latency = cmd->req_time[1] - cmd->req_time[0];
929974

930-
latency = (cmd->req_time[1] - cmd->req_time[0]) / 1000000.0;
931-
ctx->stats.total_latency += latency;
932-
ctx->last_command_latency = latency;
975+
ATOMIC_INC(ctx->stats.total_latency, latency);
976+
ATOMIC_SET(ctx->last_command_latency, latency);
933977

934978
if (!STAILQ_EMPTY(&cmd->sub_cmds)) {
935979
first = STAILQ_FIRST(&cmd->sub_cmds);
936980
last = STAILQ_LAST(&cmd->sub_cmds, command, sub_cmd_next);
937-
latency = (last->rep_time[1] - first->rep_time[0]) / 1000000.0;
981+
latency = last->rep_time[1] - first->rep_time[0];
938982
} else {
939-
latency = (cmd->rep_time[1] - cmd->rep_time[0]) / 1000000.0;
983+
latency = cmd->rep_time[1] - cmd->rep_time[0];
940984
}
941-
ctx->stats.remote_latency += latency;
985+
ATOMIC_INC(ctx->stats.remote_latency, latency);
942986
}
943987

944988
void cmd_set_stale(struct command *cmd)

src/connection.c

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ void conn_info_init(struct conn_info *info)
103103
TAILQ_INIT(&info->data);
104104
TAILQ_INIT(&info->local_data);
105105

106-
info->send_bytes = 0;
107-
info->recv_bytes = 0;
108-
info->completed_commands = 0;
106+
ATOMIC_SET(info->send_bytes, 0);
107+
ATOMIC_SET(info->recv_bytes, 0);
108+
ATOMIC_SET(info->completed_commands, 0);
109109
info->status = DISCONNECTED;
110110
}
111111

@@ -123,12 +123,12 @@ struct connection *conn_create(struct context *ctx)
123123
if ((conn = TAILQ_FIRST(&ctx->conns)) != NULL && conn->fd == -1) {
124124
LOG(DEBUG, "connection get cache");
125125
TAILQ_REMOVE(&ctx->conns, conn, next);
126-
ctx->nfree_connq--;
126+
ATOMIC_DEC(ctx->mstats.free_conns, 1);
127127
} else {
128128
conn = malloc(sizeof(struct connection));
129129
}
130130
conn_init(conn, ctx);
131-
ctx->stats.conns++;
131+
ATOMIC_INC(ctx->mstats.conns, 1);
132132
return conn;
133133
}
134134

@@ -138,14 +138,14 @@ struct conn_info *conn_info_create(struct context *ctx)
138138
if (!STAILQ_EMPTY(&ctx->free_conn_infoq)) {
139139
info = STAILQ_FIRST(&ctx->free_conn_infoq);
140140
STAILQ_REMOVE_HEAD(&ctx->free_conn_infoq, next);
141-
ctx->nfree_conn_infoq--;
141+
ATOMIC_DEC(ctx->mstats.free_conn_info, 1);
142142
} else {
143143
info = malloc(sizeof(struct conn_info));
144144
// init iov here
145145
memset(&info->iov, 0, sizeof(info->iov));
146146
}
147147
conn_info_init(info);
148-
ctx->stats.conn_info++;
148+
ATOMIC_INC(ctx->mstats.conn_info, 1);
149149
return info;
150150
}
151151

@@ -216,23 +216,27 @@ void conn_buf_free(struct connection *conn)
216216
void conn_recycle(struct context *ctx, struct connection *conn)
217217
{
218218
if (conn->info != NULL) {
219-
ctx->stats.conn_info--;
219+
ATOMIC_DEC(ctx->mstats.conn_info, 1);
220+
220221
struct conn_info *info = conn->info;
221222
if (!TAILQ_EMPTY(&info->data)) {
222223
LOG(WARN, "connection recycle, data buffer not empty");
223224
}
224225
STAILQ_INSERT_TAIL(&ctx->free_conn_infoq, info, next);
225-
ctx->nfree_conn_infoq++;
226+
227+
ATOMIC_INC(ctx->mstats.free_conn_info, 1);
226228
conn->info = NULL;
227229
}
228230

229-
ctx->stats.conns--;
231+
ATOMIC_DEC(ctx->mstats.conns, 1);
232+
230233
if (conn->next.tqe_next != NULL || conn->next.tqe_prev != NULL) {
231234
TAILQ_REMOVE(&ctx->conns, conn, next);
232235
TAILQ_RESET(conn, next);
233236
}
234237
TAILQ_INSERT_HEAD(&ctx->conns, conn, next);
235-
ctx->nfree_connq++;
238+
239+
ATOMIC_INC(ctx->mstats.free_conns, 1);
236240
}
237241

238242
int conn_create_fd()
@@ -368,7 +372,7 @@ int conn_write(struct connection *conn, int clear)
368372
status = socket_write(conn->fd, vec, n);
369373
if (status == CORVUS_AGAIN || status == CORVUS_ERR) return status;
370374

371-
conn->ctx->stats.send_bytes += status;
375+
ATOMIC_INC(conn->ctx->stats.send_bytes, status);
372376

373377
if (status < bytes) {
374378
for (i = 0; i < n; i++) {
@@ -398,8 +402,8 @@ int conn_read(struct connection *conn, struct mbuf *buf)
398402
if (n == 0) return CORVUS_EOF;
399403
if (n == CORVUS_ERR) return CORVUS_ERR;
400404
if (n == CORVUS_AGAIN) return CORVUS_AGAIN;
401-
conn->ctx->stats.recv_bytes += n;
402-
conn->info->recv_bytes += n;
405+
ATOMIC_INC(conn->ctx->stats.recv_bytes, n);
406+
ATOMIC_INC(conn->info->recv_bytes, n);
403407
return CORVUS_OK;
404408
}
405409

src/corvus.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ void context_free(struct context *ctx)
265265
cmd = STAILQ_FIRST(&ctx->free_cmdq);
266266
STAILQ_REMOVE_HEAD(&ctx->free_cmdq, cmd_next);
267267
free(cmd);
268-
ctx->nfree_cmdq--;
268+
ATOMIC_DEC(ctx->mstats.free_cmds, 1);
269269
}
270270

271271
/* connection queue */
@@ -291,7 +291,8 @@ void context_free(struct context *ctx)
291291
STAILQ_REMOVE_HEAD(&ctx->free_conn_infoq, next);
292292
cmd_iov_free(&info->iov);
293293
free(info);
294-
ctx->nfree_mbufq--;
294+
295+
ATOMIC_DEC(ctx->mstats.free_buffers, 1);
295296
}
296297

297298
/* event loop */

0 commit comments

Comments
 (0)