diff --git a/src/ngx_http_upsync_module.c b/src/ngx_http_upsync_module.c index 239a6c5..66804b1 100644 --- a/src/ngx_http_upsync_module.c +++ b/src/ngx_http_upsync_module.c @@ -36,6 +36,7 @@ typedef struct { #define NGX_HTTP_UPSYNC_CONSUL 0x0001 #define NGX_HTTP_UPSYNC_ETCD 0x0002 +#define NGX_HTTP_UPSYNC_ZK 0x0003 typedef ngx_int_t (*ngx_http_upsync_packet_init_pt) @@ -186,6 +187,7 @@ static void ngx_http_upsync_timeout_handler(ngx_event_t *event); static void ngx_http_upsync_clean_event(void *upsync_server); static ngx_int_t ngx_http_upsync_etcd_parse_init(void *upsync_server); static ngx_int_t ngx_http_upsync_consul_parse_init(void *upsync_server); +static ngx_int_t ngx_http_upsync_zk_parse_init(void *upsync_server); static ngx_int_t ngx_http_upsync_dump_server( ngx_http_upsync_server_t *upsync_server); static ngx_int_t ngx_http_upsync_init_server(ngx_event_t *event); @@ -218,6 +220,7 @@ static ngx_int_t ngx_http_upsync_check_index( ngx_http_upsync_server_t *upsync_server); static ngx_int_t ngx_http_upsync_consul_parse_json(void *upsync_server); static ngx_int_t ngx_http_upsync_etcd_parse_json(void *upsync_server); +static ngx_int_t ngx_http_upsync_zk_parse_json(void *upsync_server); static ngx_int_t ngx_http_upsync_check_key(u_char *key); static void *ngx_http_upsync_servers(ngx_cycle_t *cycle, ngx_http_upsync_server_t *upsync_server, ngx_flag_t flag); @@ -347,6 +350,14 @@ static ngx_upsync_conf_t ngx_upsync_types[] = { ngx_http_upsync_etcd_parse_init, ngx_http_upsync_etcd_parse_json, ngx_http_upsync_clean_event }, + + { ngx_string("zk"), + NGX_HTTP_UPSYNC_ZK, + ngx_http_upsync_send_handler, + ngx_http_upsync_recv_handler, + ngx_http_upsync_zk_parse_init, + ngx_http_upsync_zk_parse_json, + ngx_http_upsync_clean_event }, { ngx_null_string, 0, @@ -708,6 +719,8 @@ ngx_http_upsync_check_index(ngx_http_upsync_server_t *upsync_server) ngx_upsync_conf_t *upsync_type_conf; upsync_type_conf = upsync_server->upscf->upsync_type_conf; + + //todo do we need to check the zk index? if (upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_CONSUL) { for (i = 0; i < state.num_headers; i++) { @@ -1587,6 +1600,226 @@ ngx_http_upsync_etcd_parse_json(void *data) return NGX_OK; } +static ngx_int_t +ngx_http_upsync_zk_parse_json(void *data) +{ + u_char *p; + ngx_buf_t *buf; + ngx_int_t max_fails=2, backup=0, down=0; + ngx_str_t src, dst; + ngx_http_upsync_ctx_t *ctx; + ngx_http_upsync_conf_t *upstream_conf = NULL; + ngx_http_upsync_server_t *upsync_server = data; + + ctx = &upsync_server->ctx; + buf = &ctx->body; + + src.len = 0, src.data = NULL; + dst.len = 0, dst.data = NULL; + + cJSON *root = cJSON_Parse((char *)buf->pos); + if (root == NULL) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: root error"); + return NGX_ERROR; + } + + if (ngx_array_init(&ctx->upstream_conf, ctx->pool, 16, + sizeof(*upstream_conf)) != NGX_OK) + { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: array init error"); + cJSON_Delete(root); + return NGX_ERROR; + } + + cJSON *children = cJSON_GetObjectItem(root, "children"); + if (children == NULL) { + cJSON_Delete(root); + return NGX_ERROR; + } + + cJSON *server_next; + cJSON *node = cJSON_GetObjectItem(children, "path"); + + + server_next = node == NULL ? children->child : children; + for (; server_next != NULL; + server_next = server_next->next) + { + cJSON *temp1 = cJSON_GetObjectItem(server_next, "path"); + if (temp1 != NULL && temp1->valuestring != NULL) { + p = (u_char *)ngx_strrchr(temp1->valuestring, '/'); + if (p == NULL) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: %s key format is illegal, " + "contains no slash ('/')", temp1->valuestring); + continue; + } else if (ngx_http_upsync_check_key(p) != NGX_OK) { + continue; + } + + upstream_conf = ngx_array_push(&ctx->upstream_conf); + ngx_memzero(upstream_conf, sizeof(*upstream_conf)); + ngx_sprintf(upstream_conf->sockaddr, "%*s", ngx_strlen(p + 1), p + 1); + } + temp1 = NULL; + + temp1 = cJSON_GetObjectItem(server_next, "data64"); + if (temp1 != NULL && temp1->valuestring != NULL) { + + src.data = (u_char *)temp1->valuestring; + src.len = ngx_strlen(temp1->valuestring); + + if (dst.data == NULL) { + dst.data = ngx_pcalloc(ctx->pool, 1024); + + } else { + ngx_memzero(dst.data, 1024); + } + dst.len = 0; + + ngx_decode_base64(&dst, &src); + } + temp1 = NULL; + + /* default value, server attribute */ + upstream_conf->weight = 1; + upstream_conf->max_fails = 2; + upstream_conf->fail_timeout = 10; + + upstream_conf->down = 0; + upstream_conf->backup = 0; + + p = NULL; + + if (dst.data != NULL && dst.len != 0) { + + p = dst.data; + cJSON *sub_root = cJSON_Parse((char *)p); + if (sub_root == NULL) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: parse attribute json failed," + "setting server attribute to default value"); + continue; + } + + cJSON *sub_attribute = sub_root; + cJSON *temp1 = cJSON_GetObjectItem(sub_attribute, "weight"); + if (temp1 != NULL) { + + if (temp1->valuestring != NULL) { + upstream_conf->weight = ngx_atoi((u_char *)temp1->valuestring, + (size_t)ngx_strlen(temp1->valuestring)); + + } else if (temp1->valueint >= 0) { + upstream_conf->weight = temp1->valueint; + } + } + temp1 = NULL; + + temp1 = cJSON_GetObjectItem(sub_attribute, "max_fails"); + if (temp1 != NULL) { + + if (temp1->valuestring != NULL) { + max_fails = ngx_atoi((u_char *)temp1->valuestring, + (size_t)ngx_strlen(temp1->valuestring)); + + } else if (temp1->valueint >= 0) { + max_fails = temp1->valueint; + } + } + temp1 = NULL; + + temp1 = cJSON_GetObjectItem(sub_attribute, "fail_timeout"); + if (temp1 != NULL){ + + if (temp1->valuestring != NULL) { + + upstream_conf->fail_timeout = ngx_atoi((u_char *)temp1->valuestring, + (size_t)ngx_strlen(temp1->valuestring)); + + } else if (temp1->valueint >= 0) { + upstream_conf->fail_timeout = temp1->valueint; + } + } + temp1 = NULL; + + temp1 = cJSON_GetObjectItem(sub_attribute, "down"); + if (temp1 != NULL) { + + if (temp1->valueint != 0) { + down = temp1->valueint; + + } else if (temp1->valuestring != NULL) { + down = ngx_atoi((u_char *)temp1->valuestring, + (size_t)ngx_strlen(temp1->valuestring)); + } + } + temp1 = NULL; + + temp1 = cJSON_GetObjectItem(sub_attribute, "backup"); + if (temp1 != NULL) { + + if (temp1->valueint != 0) { + backup = temp1->valueint; + + } else if (temp1->valuestring != NULL) { + backup = ngx_atoi((u_char *)temp1->valuestring, + (size_t)ngx_strlen(temp1->valuestring)); + } + } + temp1 = NULL; + + dst.len = 0; + cJSON_Delete(sub_root); + } + + if (upstream_conf->weight <= 0) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: \"weight\" value is invalid" + ", setting default value 1"); + upstream_conf->weight = 1; + } + + if (max_fails < 0) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: \"max_fails\" value is invalid" + ", setting default value 2"); + } else { + upstream_conf->max_fails = (ngx_uint_t)max_fails; + } + + if (upstream_conf->fail_timeout < 0) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: \"fail_timeout\" value is invalid" + ", setting default value 10"); + upstream_conf->fail_timeout = 10; + } + + if (down != 1 && down != 0) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: \"down\" value is invalid" + ", setting default value 0"); + } else { + upstream_conf->down = (ngx_uint_t)down; + } + + if (backup != 1 && backup != 0) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: \"backup\" value is invalid" + ", setting default value 0"); + } else { + upstream_conf->backup = (ngx_uint_t)backup; + } + + max_fails=2, backup=0, down=0; + } + cJSON_Delete(root); + + return NGX_OK; +} + static ngx_int_t ngx_http_upsync_check_key(u_char *key) @@ -2582,6 +2815,12 @@ ngx_http_upsync_send_handler(ngx_event_t *event) } } + + if (upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ZK) { + ngx_sprintf(request, "GET %V?view=children&recursive=true" + " HTTP/1.0\r\nHost: %V\r\nAccept: */*\r\n\r\n", + &upscf->upsync_send, &upscf->conf_server.name); + } ctx->send.pos = request; ctx->send.last = ctx->send.pos + ngx_strlen(request); @@ -2837,6 +3076,56 @@ ngx_http_upsync_etcd_parse_init(void *data) } +static ngx_int_t +ngx_http_upsync_zk_parse_init(void *data) +{ + char *buf; + size_t parsed; + ngx_http_upsync_ctx_t *ctx; + ngx_http_upsync_server_t *upsync_server = data; + + ctx = &upsync_server->ctx; + + if (ngx_http_parser_init() == NGX_ERROR) { + return NGX_ERROR; + } + + buf = (char *)ctx->recv.pos; + + ctx->body.pos = ctx->body.last = NULL; + + parsed = http_parser_execute(parser, &settings, buf, ngx_strlen(buf)); + if (parsed != ngx_strlen(buf)) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_consul_parse_init: parsed body size is wrong"); + return NGX_ERROR; + } + + if (ngx_strncmp(state.status, "OK", 2) == 0) { + + if (ngx_strlen(state.http_body) != 0) { + ctx->body.pos = state.http_body; + ctx->body.last = state.http_body + ngx_strlen(state.http_body); + + } + } + + if (parser != NULL) { + ngx_free(parser); + parser = NULL; + } + + if (ctx->body.pos != ctx->body.last) { + *(ctx->body.last + 1) = '\0'; + + } else { + return NGX_ERROR; + } + + return NGX_OK; +} + + static ngx_int_t ngx_http_upsync_dump_server(ngx_http_upsync_server_t *upsync_server) { @@ -3411,7 +3700,8 @@ ngx_http_upsync_clean_event(void *data) } if (upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_CONSUL - || upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ETCD) + || upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ETCD + || upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ZK) { if (parser != NULL) { ngx_free(parser); @@ -3652,6 +3942,13 @@ ngx_http_client_send(ngx_http_conf_client *client, "Accept: */*\r\n\r\n", &upscf->upsync_send, &upscf->conf_server.name); } + + if (upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ZK) { + ngx_sprintf(request, "GET %V?view=children&recursive=true" + " HTTP/1.0\r\nHost: %V\r\nAccept: */*\r\n\r\n", + &upscf->upsync_send, &upscf->conf_server.name); + + } size = ngx_strlen(request); while(send_num < size) { @@ -3819,12 +4116,12 @@ ngx_http_upsync_show(ngx_http_request_t *r) goto end; } - - for (i = 0; i < umcf->upstreams.nelts; i++) { + + for (i = 0; i < umcf->upstreams.nelts; i++) { ngx_http_upsync_show_upstream(uscfp[i], b); b->last = ngx_snprintf(b->last, b->end - b->last, "\n"); } - + goto end; } @@ -3854,3 +4151,4 @@ ngx_http_upsync_show(ngx_http_request_t *r) return ret; } +