Skip to content

Commit ae86260

Browse files
authored
Merge pull request #717 from Netflix/dev
Merge dev into 0.6
2 parents f5fe291 + f2a4f23 commit ae86260

File tree

11 files changed

+238
-46
lines changed

11 files changed

+238
-46
lines changed

src/dyn_connection.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,13 @@ rstatus_t conn_listen(struct context *ctx, struct conn *p) {
234234
return DN_ERROR;
235235
}
236236

237+
status = dn_set_keepalive(p->sd, 1);
238+
if (status != DN_OK) {
239+
log_error("set keepalive on p %d on addr '%.*s' failed: %s", p->sd,
240+
p->pname.len, p->pname.data, strerror(errno));
241+
// Continue since this is not catastrophic
242+
}
243+
237244
status = conn_event_add_conn(p);
238245
if (status < 0) {
239246
log_error("event add conn p %d on addr '%.*s' failed: %s", p->sd,

src/dyn_core.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ static void core_print_peer_status(void *arg1) {
5050
struct rack *rack = array_get(&dc->racks, rack_index);
5151
uint8_t i = 0;
5252
for (i = 0; i < rack->ncontinuum; i++) {
53-
struct continuum *c = &rack->continuum[i];
53+
struct continuum *c = (struct continuum*) array_get(&rack->continuums, i);
54+
ASSERT(c != NULL);
5455
uint32_t peer_index = c->index;
5556
struct node *peer = *(struct node **)array_get(&sp->peers, peer_index);
5657
if (!peer) log_panic("peer is null. Topology not inited proerly");

src/dyn_core.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ struct instance {
126126

127127
struct continuum {
128128
uint32_t index; /* dyn_peer index */
129-
uint32_t value; /* hash value, used by ketama */
129+
uint32_t value; /* hash value, used ONLY by ketama */
130130
struct dyn_token *token; /* used in vnode/dyn_token situations */
131131
};
132132

@@ -136,7 +136,7 @@ struct rack {
136136
uint32_t ncontinuum; /* # continuum points */
137137
uint32_t
138138
nserver_continuum; /* # servers - live and dead on continuum (const) */
139-
struct continuum *continuum; /* continuum */
139+
struct array continuums;
140140
};
141141

142142
struct datacenter {

src/dyn_dnode_peer.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -785,7 +785,7 @@ uint32_t dnode_peer_idx_for_key_on_rack(struct server_pool *pool,
785785
uint32_t keylen) {
786786
struct dyn_token token;
787787
pool->key_hash(key, keylen, &token);
788-
return vnode_dispatch(rack->continuum, rack->ncontinuum, &token);
788+
return vnode_dispatch(&rack->continuums, rack->ncontinuum, &token);
789789
}
790790

791791
static struct node *dnode_peer_for_key_on_rack(struct server_pool *pool,

src/dyn_message.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,11 @@ void msg_put(struct msg *msg) {
632632
msg->keys = NULL;
633633
}
634634

635+
if (msg->args) {
636+
array_destroy(msg->args);
637+
msg->args = NULL;
638+
}
639+
635640
if (msg->orig_msg) {
636641
msg_put(msg->orig_msg);
637642
msg->orig_msg = NULL;
@@ -881,8 +886,10 @@ static rstatus_t msg_repair(struct context *ctx, struct conn *conn,
881886
return DN_ENOMEM;
882887
}
883888

884-
mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);
885-
mbuf_remove(&msg->mhdr, mbuf);
889+
// This was added to handle a specific case which doesn't seem reproducible
890+
// now. Revisit if things seem off.
891+
//mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);
892+
//mbuf_remove(&msg->mhdr, mbuf);
886893
mbuf_insert(&msg->mhdr, nbuf);
887894
msg->pos = nbuf->pos;
888895

src/dyn_server.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,9 @@ dictType dc_string_dict_type = {
508508
};
509509

510510
static rstatus_t rack_init(struct rack *rack) {
511-
rack->continuum = dn_alloc(sizeof(struct continuum));
511+
512+
// TODO: Initialize the array to the size of the ring instead of to 1.
513+
THROW_STATUS(array_init(&rack->continuums, 1, sizeof(struct continuum)));
512514
rack->ncontinuum = 0;
513515
rack->nserver_continuum = 0;
514516
rack->name = dn_alloc(sizeof(struct string));
@@ -521,9 +523,7 @@ static rstatus_t rack_init(struct rack *rack) {
521523
}
522524

523525
static rstatus_t rack_deinit(struct rack *rack) {
524-
if (rack->continuum != NULL) {
525-
dn_free(rack->continuum);
526-
}
526+
array_deinit(&rack->continuums);
527527

528528
return DN_OK;
529529
}

src/dyn_vnode.c

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,26 @@
2929
#include <dyn_server.h>
3030
#include <dyn_vnode.h>
3131

32+
33+
// Similar to strcmp() but compares 2 'dyn_token' structs instead.
3234
static int vnode_item_cmp(const void *t1, const void *t2) {
3335
const struct continuum *ct1 = t1, *ct2 = t2;
3436

3537
return cmp_dyn_token(ct1->token, ct2->token);
3638
}
3739

40+
// Sorts the continuum for a rack based on their tokens.
3841
static rstatus_t vnode_rack_verify_continuum(void *elem) {
3942
struct rack *rack = elem;
40-
qsort(rack->continuum, rack->ncontinuum, sizeof(*rack->continuum),
43+
qsort(rack->continuums.elem, rack->ncontinuum, sizeof(struct continuum),
4144
vnode_item_cmp);
4245

4346
log_debug(LOG_VERB, "**** printing continuums for rack '%.*s'",
4447
rack->name->len, rack->name->data);
4548
uint32_t i;
4649
for (i = 0; i < rack->ncontinuum; i++) {
47-
struct continuum *c = &rack->continuum[i];
50+
struct continuum *c = (struct continuum*) array_get(&rack->continuums, i);
51+
ASSERT(c != NULL);
4852
log_debug(LOG_VERB, "next c[%d]: idx = %u, token->mag = %u", i, c->index,
4953
c->token->mag[0]);
5054
}
@@ -88,21 +92,18 @@ rstatus_t vnode_update(struct server_pool *sp) {
8892
uint32_t orig_cnt = rack->nserver_continuum;
8993
uint32_t new_cnt = orig_cnt + token_cnt;
9094

91-
if (new_cnt > 1) {
92-
struct continuum *continuum =
93-
dn_realloc(rack->continuum, sizeof(struct continuum) * new_cnt);
94-
if (continuum == NULL) {
95-
log_debug(LOG_ERR, "Are we failing? Why???? This is a serious issue");
96-
return DN_ENOMEM;
97-
}
98-
99-
rack->continuum = continuum;
95+
struct continuum *continuum = array_push(&rack->continuums);
96+
if (continuum == NULL) {
97+
log_error("Could not allocate memory to expand the continuum.");
98+
return DN_ENOMEM;
10099
}
101100
rack->nserver_continuum = new_cnt;
102101

103102
uint32_t j;
104103
for (j = 0; j < token_cnt; j++) {
105-
struct continuum *c = &rack->continuum[orig_cnt + j];
104+
struct continuum *c = (struct continuum*) array_get(
105+
&rack->continuums, orig_cnt + j);
106+
ASSERT(c != NULL);
106107
c->index = i;
107108
c->value = 0; /* set this to an empty value, only used by ketama */
108109
c->token = array_get(&peer->tokens, j);
@@ -120,16 +121,15 @@ rstatus_t vnode_update(struct server_pool *sp) {
120121
return DN_OK;
121122
}
122123

123-
// if token falls into interval (a,b], we return b.
124-
uint32_t vnode_dispatch(struct continuum *continuum, uint32_t ncontinuum,
124+
uint32_t vnode_dispatch(struct array *continuums, uint32_t ncontinuum,
125125
struct dyn_token *token) {
126126
struct continuum *left, *right, *middle;
127127

128-
ASSERT(continuum != NULL);
128+
ASSERT(continuums != NULL);
129129
ASSERT(ncontinuum != 0);
130130

131-
left = continuum;
132-
right = continuum + ncontinuum - 1;
131+
left = (struct continuum*) array_get(continuums, 0);
132+
right = (struct continuum*) array_get(continuums, ncontinuum - 1);
133133

134134
if (cmp_dyn_token(right->token, token) < 0 ||
135135
cmp_dyn_token(left->token, token) >= 0)

src/dyn_vnode.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1+
/*
2+
* Dynomite - A thin, distributed replication layer for multi non-distributed
3+
* storages. Copyright (C) 2019 Netflix, Inc.
4+
*/
5+
16
#pragma once
27
#include <dyn_types.h>
38

9+
// Initializes (on first call) and updates (on subsequent calls) the per rack continuums
10+
// and makes sure the tokens managed by the continuums are ascending.
411
rstatus_t vnode_update(struct server_pool *pool);
5-
uint32_t vnode_dispatch(struct continuum *continuum, uint32_t ncontinuum,
12+
13+
// Returns the index of the continuum from 'continuums' where 'token' falls.
14+
// If 'token' falls into interval (a,b], we return b.
15+
uint32_t vnode_dispatch(struct array *continuums, uint32_t ncontinuum,
616
struct dyn_token *token);

src/proto/dyn_proto_repair.h

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
* compile time constants.
7878
*
7979
* This is still in the beta stage and has some limitations:
80-
* - Large values may cause overflows ( > 512 bytes)
8180
* - Limited command support due to parser limitations:
8281
* - Sorted sets and lists need optional command parsing support.
8382
* - Repairing on the read path vs. the background has perf implications. In the future,
@@ -290,6 +289,71 @@
290289
"local value = redis.call(orig_cmd, key, field)\n"\
291290
"return {status_field, ts, value}\n\r\n"
292291

292+
#define ZADD_SCRIPT "$4\r\nEVAL\r\n$2022\r\n"\
293+
"local key = KEYS[1]\n"\
294+
"local top_level_add_set = KEYS[2]\n"\
295+
"local top_level_rem_set = KEYS[3]\n"\
296+
"local add_set = top_level_add_set .. '_' .. key\n"\
297+
"local rem_set = top_level_rem_set .. '_' .. key\n"\
298+
"local orig_cmd = ARGV[1]\n"\
299+
"local num_opts = ARGV[2]\n"\
300+
"local num_fields = ARGV[3]\n"\
301+
"local cur_ts = ARGV[4]\n"\
302+
"local start_loop = 5 + num_opts\n"\
303+
"local end_loop = (num_fields * 2) + 4 + num_opts\n"\
304+
"local top_level_rem_set_ts = redis.call('ZSCORE', top_level_rem_set, key)\n"\
305+
"if (top_level_rem_set_ts) then\n"\
306+
" if (tonumber(cur_ts) < tonumber(top_level_rem_set_ts)) then\n"\
307+
" return 0\n"\
308+
" end\n"\
309+
" redis.call('ZREM', top_level_rem_set, key)\n"\
310+
"end\n"\
311+
"local top_level_add_set_ts = redis.call('ZSCORE', top_level_add_set, key)\n"\
312+
"if (top_level_add_set_ts) then\n"\
313+
" if (tonumber(cur_ts) > tonumber(top_level_add_set_ts)) then\n"\
314+
" redis.call('ZADD', top_level_add_set, cur_ts, key)\n"\
315+
" end\n"\
316+
"else\n"\
317+
" redis.call('ZADD', top_level_add_set, cur_ts, key)\n"\
318+
"end\n"\
319+
"local skiploop\n"\
320+
"local ret\n"\
321+
"for i=start_loop,end_loop,2\n"\
322+
"do\n"\
323+
" skiploop = false\n"\
324+
" local field = ARGV[i]\n"\
325+
" local value = ARGV[i+1]\n"\
326+
" local last_seen_ts_in_add = redis.call('ZSCORE', add_set, field)\n"\
327+
" local last_seen_ts_in_rem = redis.call('ZSCORE', rem_set, field)\n"\
328+
" if (last_seen_ts_in_rem) then\n"\
329+
" if (tonumber(cur_ts) < tonumber(last_seen_ts_in_rem)) then\n"\
330+
" skiploop = true\n"\
331+
" end\n"\
332+
" redis.call('ZREM', rem_set, field)\n"\
333+
" elseif (last_seen_ts_in_add) then\n"\
334+
" if (tonumber(cur_ts) < tonumber(last_seen_ts_in_add)) then\n"\
335+
" skiploop = true\n"\
336+
" end\n"\
337+
" end\n"\
338+
" if (skiploop == false) then\n"\
339+
" if (num_opts == '0') then\n"\
340+
" ret = redis.call(orig_cmd, key, value, field)\n"\
341+
" elseif (num_opts == '1') then\n"\
342+
" ret = redis.call(orig_cmd, key, ARGV[5], value, field)\n"\
343+
" elseif (num_opts == '2') then\n"\
344+
" ret = redis.call(orig_cmd, key, ARGV[5], ARGV[6], value, field)\n"\
345+
" elseif (num_opts == '3') then\n"\
346+
" ret = redis.call(orig_cmd, key, ARGV[5], ARGV[6], ARGV[7], value, field)\n"\
347+
" else\n"\
348+
" ret = false\n"\
349+
" end\n"\
350+
" if (type(ret) ~= 'boolean') then\n"\
351+
" redis.call('ZADD', add_set, cur_ts, field)\n"\
352+
" end\n"\
353+
" end\n"\
354+
"end\n"\
355+
"return ret\n\r\n"
356+
293357
#define SADD_SCRIPT "$4\r\nEVAL\r\n$1526\r\n"\
294358
"local key = KEYS[1]\n"\
295359
"local top_level_add_set = KEYS[2]\n"\

src/proto/dyn_redis.c

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,6 +1065,7 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
10651065
// This is not to be confused with 'EXISTS'. This is the second half of the
10661066
// command 'SCRIPT EXISTS'.
10671067
r->type = MSG_REQ_REDIS_SCRIPT_EXISTS;
1068+
r->msg_routing = ROUTING_ALL_NODES_ALL_RACKS_ALL_DCS;
10681069
r->is_read = 1;
10691070
break;
10701071
}
@@ -1829,7 +1830,8 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
18291830

18301831
m = next_mbuf->pos + new_mbuf_offset;
18311832
b = next_mbuf;
1832-
++r->latest_parsed_mbuf_idx;
1833+
1834+
if (mbuf_full(b)) ++r->latest_parsed_mbuf_idx;
18331835
}
18341836
if (arg1_across_mbufs == false) {
18351837
rstatus_t argstatus = record_arg(p , m , r->args);
@@ -1846,6 +1848,8 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
18461848
r->rlen -= (uint32_t)(b->last - p);
18471849
m = b->last - 1;
18481850
p = m;
1851+
1852+
if (mbuf_full(b)) ++r->latest_parsed_mbuf_idx;
18491853
break;
18501854
}
18511855

@@ -1967,7 +1971,7 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
19671971

19681972
m = next_mbuf->pos + new_mbuf_offset;
19691973
b = next_mbuf;
1970-
++r->latest_parsed_mbuf_idx;
1974+
if (mbuf_full(b)) ++r->latest_parsed_mbuf_idx;
19711975
}
19721976
if (arg2_across_mbufs == false) {
19731977
// TODO: Verify if this is the correct behavior for EVAL/EVALSHA
@@ -1985,6 +1989,8 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
19851989
r->rlen -= (uint32_t)(b->last - p);
19861990
m = b->last - 1;
19871991
p = m;
1992+
1993+
if (mbuf_full(b)) ++r->latest_parsed_mbuf_idx;
19881994
break;
19891995
}
19901996

@@ -2120,7 +2126,7 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
21202126

21212127
m = next_mbuf->pos + new_mbuf_offset;
21222128
b = next_mbuf;
2123-
++r->latest_parsed_mbuf_idx;
2129+
if (mbuf_full(b)) ++r->latest_parsed_mbuf_idx;
21242130
}
21252131
if (arg3_across_mbufs == false) {
21262132
rstatus_t argstatus = record_arg(p , m , r->args);
@@ -2137,6 +2143,8 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
21372143
r->rlen -= (uint32_t)(b->last - p);
21382144
m = b->last - 1;
21392145
p = m;
2146+
2147+
if (mbuf_full(b)) ++r->latest_parsed_mbuf_idx;
21402148
break;
21412149
}
21422150

@@ -2227,7 +2235,7 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
22272235

22282236
m = next_mbuf->pos + new_mbuf_offset;
22292237
b = next_mbuf;
2230-
++r->latest_parsed_mbuf_idx;
2238+
if (mbuf_full(b)) ++r->latest_parsed_mbuf_idx;
22312239
}
22322240
if (argn_across_mbufs == false) {
22332241
rstatus_t argstatus = record_arg(p , m , r->args);
@@ -2244,6 +2252,8 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
22442252
r->rlen -= (uint32_t)(b->last - p);
22452253
m = b->last - 1;
22462254
p = m;
2255+
2256+
if (mbuf_full(b)) ++r->latest_parsed_mbuf_idx;
22472257
break;
22482258
}
22492259

@@ -2288,15 +2298,13 @@ void redis_parse_req(struct msg *r, struct context *ctx) {
22882298
r->pos = p;
22892299
r->state = state;
22902300

2291-
// We reached here since we finished parsing the current 'mbuf' in 'r->mhdr'.
2292-
++r->latest_parsed_mbuf_idx;
2293-
22942301
// If we have to parse again, we won't be able to write with the timestamp.
22952302
r->rewrite_with_ts_possible = false;
22962303
if (b->last == b->end && r->token != NULL) {
22972304
r->pos = r->token;
22982305
r->token = NULL;
22992306
r->result = MSG_PARSE_REPAIR;
2307+
23002308
} else {
23012309
r->result = MSG_PARSE_AGAIN;
23022310
}

0 commit comments

Comments
 (0)