Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugins/datalayers/datalayers_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ struct neu_plugin {
pthread_rwlock_t plugin_mutex;
bool consumer_thread_stop_flag;

uint32_t config_seq;

int (*parse_config)(neu_plugin_t *plugin, const char *setting,
datalayers_config_t *config);
};
Expand Down
126 changes: 87 additions & 39 deletions plugins/datalayers/datalayers_plugin_intf.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**/

#include <pthread.h>

#include "define.h"
#include "errcodes.h"
#include "flight_sql_client.h"
Expand Down Expand Up @@ -49,6 +51,7 @@ neu_plugin_t *datalayers_plugin_open(void)
neu_plugin_t *plugin = (neu_plugin_t *) calloc(1, sizeof(neu_plugin_t));
neu_plugin_common_init(&plugin->common);
plugin->parse_config = datalayers_config_parse;
plugin->config_seq = 0;
return plugin;
}

Expand Down Expand Up @@ -106,6 +109,7 @@ int datalayers_plugin_init(neu_plugin_t *plugin, bool load)
int datalayers_plugin_uninit(neu_plugin_t *plugin)
{
pthread_rwlock_wrlock(&plugin->plugin_mutex);
plugin->config_seq++;

stop_consumer_thread(&plugin->consumer_thread, plugin);
pthread_mutex_lock(&plugin->queue_mutex);
Expand All @@ -129,35 +133,87 @@ int datalayers_plugin_uninit(neu_plugin_t *plugin)
return NEU_ERR_SUCCESS;
}

static int config_datalayers_client(neu_plugin_t * plugin,
const datalayers_config_t *config)
static void *datalayers_connect_routine(void *arg)
{
if (!config->host || !config->port || !config->username ||
!config->password) {
plog_error(
plugin,
"Invalid datalayers config: NULL host/port/username/password");
return NEU_ERR_DATALAYERS_INIT_FAILURE;
neu_plugin_t *plugin = (neu_plugin_t *) arg;
char * host = NULL;
char * user = NULL;
char * pass = NULL;
int port = 0;
uint32_t seq = 0;

pthread_rwlock_wrlock(&plugin->plugin_mutex);
seq = plugin->config_seq;
if (!plugin->config.host || !plugin->config.username ||
!plugin->config.password) {
pthread_rwlock_unlock(&plugin->plugin_mutex);
return NULL;
}
host = strdup(plugin->config.host);
user = strdup(plugin->config.username);
pass = strdup(plugin->config.password);
port = (int) plugin->config.port;
pthread_rwlock_unlock(&plugin->plugin_mutex);

plugin->client = client_create(config->host, config->port, config->username,
config->password);
if (!host || !user || !pass) {
free(host);
free(user);
free(pass);
return NULL;
}

if (NULL == plugin->client) {
plog_error(plugin, "datalayers client_create failed");
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;
} else {
neu_datalayers_client *c = client_create(host, port, user, pass);
free(host);
free(user);
free(pass);

pthread_rwlock_wrlock(&plugin->plugin_mutex);
if (seq != plugin->config_seq) {
if (c) {
client_destroy(c);
}
pthread_rwlock_unlock(&plugin->plugin_mutex);
return NULL;
}
if (c) {
if (plugin->client) {
client_destroy(plugin->client);
}
plugin->client = c;
plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED;
} else {
plog_error(plugin, "datalayers async connect failed");
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;
}
pthread_rwlock_unlock(&plugin->plugin_mutex);
return NULL;
}

return NEU_ERR_SUCCESS;
static void datalayers_spawn_async_connect(neu_plugin_t *plugin)
{
pthread_t tid;
int st = pthread_create(&tid, NULL, datalayers_connect_routine, plugin);
if (st != 0) {
plog_error(plugin, "datalayers pthread_create connect failed: %d", st);
return;
}
pthread_detach(tid);
}

int datalayers_plugin_config(neu_plugin_t *plugin, const char *setting)
{
int rv = 0;
const char * plugin_name = neu_plugin_module.module_name;
datalayers_config_t config = { 0 };

rv = plugin->parse_config(plugin, setting, &config);
if (0 != rv) {
plog_error(plugin, "neu_datalayers_config_parse fail");
return NEU_ERR_NODE_SETTING_INVALID;
}

plugin->config_seq++;

if (plugin->client != NULL) {
stop_consumer_thread(&plugin->consumer_thread, plugin);
pthread_mutex_lock(&plugin->queue_mutex);
Expand All @@ -168,43 +224,35 @@ int datalayers_plugin_config(neu_plugin_t *plugin, const char *setting)
create_consumer_thread(plugin);
}

rv = plugin->parse_config(plugin, setting, &config);
if (0 != rv) {
plog_error(plugin, "neu_datalayers_config_parse fail");
return NEU_ERR_NODE_SETTING_INVALID;
}

rv = config_datalayers_client(plugin, &config);

datalayers_config_fini(&plugin->config);

memmove(&plugin->config, &config, sizeof(config));

if (rv == 0) {
plog_notice(plugin, "config plugin `%s` success", plugin_name);
} else {
plog_error(plugin, "datalayers client configuration failed");
}
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;

return rv;
plog_notice(plugin, "config plugin `%s` success (connect in background)",
plugin_name);

datalayers_spawn_async_connect(plugin);

return NEU_ERR_SUCCESS;
}

int datalayers_plugin_start(neu_plugin_t *plugin)
{
const char *plugin_name = neu_plugin_module.module_name;

if (NULL == plugin->client) {
plog_error(plugin, "datalayers started failed, reconnect");
int ret = config_datalayers_client(plugin, &plugin->config);
if (ret != 0) {
plog_error(plugin, "datalayers started failed");
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;
return NEU_ERR_SUCCESS;
}
plog_notice(plugin, "datalayers start: connect in background");
datalayers_spawn_async_connect(plugin);
}

plog_notice(plugin, "start plugin `%s` success", plugin_name);
plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED;
if (plugin->client) {
plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED;
} else {
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;
}
return NEU_ERR_SUCCESS;
}

Expand All @@ -225,11 +273,11 @@ int datalayers_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
switch (head->type) {
case NEU_REQRESP_TRANS_DATA: {
if (plugin->client == NULL) {
plog_notice(plugin, "datalayers client is NULL, reconnect");
plog_notice(plugin, "datalayers client is NULL, reconnect async");
NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_SEND_MSG_ERRORS_TOTAL,
1, NULL);

config_datalayers_client(plugin, &plugin->config);
datalayers_spawn_async_connect(plugin);
}

error = handle_trans_data(plugin, data);
Expand Down
Loading