diff --git a/plugins/datalayers/datalayers_plugin.h b/plugins/datalayers/datalayers_plugin.h index 854b49881..a64a618f5 100644 --- a/plugins/datalayers/datalayers_plugin.h +++ b/plugins/datalayers/datalayers_plugin.h @@ -74,6 +74,8 @@ struct neu_plugin { pthread_rwlock_t plugin_mutex; bool consumer_thread_stop_flag; + uint32_t config_seq; + int (*parse_config)(neu_plugin_t *plugin, const char *setting, datalayers_config_t *config); }; diff --git a/plugins/datalayers/datalayers_plugin_intf.c b/plugins/datalayers/datalayers_plugin_intf.c index 8331edc9f..c94aad611 100644 --- a/plugins/datalayers/datalayers_plugin_intf.c +++ b/plugins/datalayers/datalayers_plugin_intf.c @@ -17,6 +17,8 @@ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ +#include + #include "define.h" #include "errcodes.h" #include "flight_sql_client.h" @@ -49,6 +51,7 @@ neu_plugin_t *datalayers_plugin_open(void) neu_plugin_t *plugin = (neu_plugin_t *) calloc(1, sizeof(neu_plugin_t)); neu_plugin_common_init(&plugin->common); plugin->parse_config = datalayers_config_parse; + plugin->config_seq = 0; return plugin; } @@ -106,6 +109,7 @@ int datalayers_plugin_init(neu_plugin_t *plugin, bool load) int datalayers_plugin_uninit(neu_plugin_t *plugin) { pthread_rwlock_wrlock(&plugin->plugin_mutex); + plugin->config_seq++; stop_consumer_thread(&plugin->consumer_thread, plugin); pthread_mutex_lock(&plugin->queue_mutex); @@ -129,28 +133,71 @@ int datalayers_plugin_uninit(neu_plugin_t *plugin) return NEU_ERR_SUCCESS; } -static int config_datalayers_client(neu_plugin_t * plugin, - const datalayers_config_t *config) +static void *datalayers_connect_routine(void *arg) { - if (!config->host || !config->port || !config->username || - !config->password) { - plog_error( - plugin, - "Invalid datalayers config: NULL host/port/username/password"); - return NEU_ERR_DATALAYERS_INIT_FAILURE; + neu_plugin_t *plugin = (neu_plugin_t *) arg; + char * host = NULL; + char * user = NULL; + char * pass = NULL; + int port = 0; + uint32_t seq = 0; + + pthread_rwlock_wrlock(&plugin->plugin_mutex); + seq = plugin->config_seq; + if (!plugin->config.host || !plugin->config.username || + !plugin->config.password) { + pthread_rwlock_unlock(&plugin->plugin_mutex); + return NULL; } + host = strdup(plugin->config.host); + user = strdup(plugin->config.username); + pass = strdup(plugin->config.password); + port = (int) plugin->config.port; + pthread_rwlock_unlock(&plugin->plugin_mutex); - plugin->client = client_create(config->host, config->port, config->username, - config->password); + if (!host || !user || !pass) { + free(host); + free(user); + free(pass); + return NULL; + } - if (NULL == plugin->client) { - plog_error(plugin, "datalayers client_create failed"); - plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED; - } else { + neu_datalayers_client *c = client_create(host, port, user, pass); + free(host); + free(user); + free(pass); + + pthread_rwlock_wrlock(&plugin->plugin_mutex); + if (seq != plugin->config_seq) { + if (c) { + client_destroy(c); + } + pthread_rwlock_unlock(&plugin->plugin_mutex); + return NULL; + } + if (c) { + if (plugin->client) { + client_destroy(plugin->client); + } + plugin->client = c; plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED; + } else { + plog_error(plugin, "datalayers async connect failed"); + plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED; } + pthread_rwlock_unlock(&plugin->plugin_mutex); + return NULL; +} - return NEU_ERR_SUCCESS; +static void datalayers_spawn_async_connect(neu_plugin_t *plugin) +{ + pthread_t tid; + int st = pthread_create(&tid, NULL, datalayers_connect_routine, plugin); + if (st != 0) { + plog_error(plugin, "datalayers pthread_create connect failed: %d", st); + return; + } + pthread_detach(tid); } int datalayers_plugin_config(neu_plugin_t *plugin, const char *setting) @@ -158,6 +205,15 @@ int datalayers_plugin_config(neu_plugin_t *plugin, const char *setting) int rv = 0; const char * plugin_name = neu_plugin_module.module_name; datalayers_config_t config = { 0 }; + + rv = plugin->parse_config(plugin, setting, &config); + if (0 != rv) { + plog_error(plugin, "neu_datalayers_config_parse fail"); + return NEU_ERR_NODE_SETTING_INVALID; + } + + plugin->config_seq++; + if (plugin->client != NULL) { stop_consumer_thread(&plugin->consumer_thread, plugin); pthread_mutex_lock(&plugin->queue_mutex); @@ -168,25 +224,18 @@ int datalayers_plugin_config(neu_plugin_t *plugin, const char *setting) create_consumer_thread(plugin); } - rv = plugin->parse_config(plugin, setting, &config); - if (0 != rv) { - plog_error(plugin, "neu_datalayers_config_parse fail"); - return NEU_ERR_NODE_SETTING_INVALID; - } - - rv = config_datalayers_client(plugin, &config); - datalayers_config_fini(&plugin->config); memmove(&plugin->config, &config, sizeof(config)); - if (rv == 0) { - plog_notice(plugin, "config plugin `%s` success", plugin_name); - } else { - plog_error(plugin, "datalayers client configuration failed"); - } + plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED; - return rv; + plog_notice(plugin, "config plugin `%s` success (connect in background)", + plugin_name); + + datalayers_spawn_async_connect(plugin); + + return NEU_ERR_SUCCESS; } int datalayers_plugin_start(neu_plugin_t *plugin) @@ -194,17 +243,16 @@ int datalayers_plugin_start(neu_plugin_t *plugin) const char *plugin_name = neu_plugin_module.module_name; if (NULL == plugin->client) { - plog_error(plugin, "datalayers started failed, reconnect"); - int ret = config_datalayers_client(plugin, &plugin->config); - if (ret != 0) { - plog_error(plugin, "datalayers started failed"); - plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED; - return NEU_ERR_SUCCESS; - } + plog_notice(plugin, "datalayers start: connect in background"); + datalayers_spawn_async_connect(plugin); } plog_notice(plugin, "start plugin `%s` success", plugin_name); - plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED; + if (plugin->client) { + plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED; + } else { + plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED; + } return NEU_ERR_SUCCESS; } @@ -225,11 +273,11 @@ int datalayers_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head, switch (head->type) { case NEU_REQRESP_TRANS_DATA: { if (plugin->client == NULL) { - plog_notice(plugin, "datalayers client is NULL, reconnect"); + plog_notice(plugin, "datalayers client is NULL, reconnect async"); NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_SEND_MSG_ERRORS_TOTAL, 1, NULL); - config_datalayers_client(plugin, &plugin->config); + datalayers_spawn_async_connect(plugin); } error = handle_trans_data(plugin, data);