Skip to content

Commit c3f8f82

Browse files
authored
Merge pull request #2721 from hxy7yx/v2.14-1
[v2.14]datalayers:switch to asynchronous connection
2 parents 35af524 + a2bc2ad commit c3f8f82

File tree

2 files changed

+89
-39
lines changed

2 files changed

+89
-39
lines changed

plugins/datalayers/datalayers_plugin.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ struct neu_plugin {
7474
pthread_rwlock_t plugin_mutex;
7575
bool consumer_thread_stop_flag;
7676

77+
uint32_t config_seq;
78+
7779
int (*parse_config)(neu_plugin_t *plugin, const char *setting,
7880
datalayers_config_t *config);
7981
};

plugins/datalayers/datalayers_plugin_intf.c

Lines changed: 87 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
1818
**/
1919

20+
#include <pthread.h>
21+
2022
#include "define.h"
2123
#include "errcodes.h"
2224
#include "flight_sql_client.h"
@@ -49,6 +51,7 @@ neu_plugin_t *datalayers_plugin_open(void)
4951
neu_plugin_t *plugin = (neu_plugin_t *) calloc(1, sizeof(neu_plugin_t));
5052
neu_plugin_common_init(&plugin->common);
5153
plugin->parse_config = datalayers_config_parse;
54+
plugin->config_seq = 0;
5255
return plugin;
5356
}
5457

@@ -106,6 +109,7 @@ int datalayers_plugin_init(neu_plugin_t *plugin, bool load)
106109
int datalayers_plugin_uninit(neu_plugin_t *plugin)
107110
{
108111
pthread_rwlock_wrlock(&plugin->plugin_mutex);
112+
plugin->config_seq++;
109113

110114
stop_consumer_thread(&plugin->consumer_thread, plugin);
111115
pthread_mutex_lock(&plugin->queue_mutex);
@@ -129,35 +133,87 @@ int datalayers_plugin_uninit(neu_plugin_t *plugin)
129133
return NEU_ERR_SUCCESS;
130134
}
131135

132-
static int config_datalayers_client(neu_plugin_t * plugin,
133-
const datalayers_config_t *config)
136+
static void *datalayers_connect_routine(void *arg)
134137
{
135-
if (!config->host || !config->port || !config->username ||
136-
!config->password) {
137-
plog_error(
138-
plugin,
139-
"Invalid datalayers config: NULL host/port/username/password");
140-
return NEU_ERR_DATALAYERS_INIT_FAILURE;
138+
neu_plugin_t *plugin = (neu_plugin_t *) arg;
139+
char * host = NULL;
140+
char * user = NULL;
141+
char * pass = NULL;
142+
int port = 0;
143+
uint32_t seq = 0;
144+
145+
pthread_rwlock_wrlock(&plugin->plugin_mutex);
146+
seq = plugin->config_seq;
147+
if (!plugin->config.host || !plugin->config.username ||
148+
!plugin->config.password) {
149+
pthread_rwlock_unlock(&plugin->plugin_mutex);
150+
return NULL;
141151
}
152+
host = strdup(plugin->config.host);
153+
user = strdup(plugin->config.username);
154+
pass = strdup(plugin->config.password);
155+
port = (int) plugin->config.port;
156+
pthread_rwlock_unlock(&plugin->plugin_mutex);
142157

143-
plugin->client = client_create(config->host, config->port, config->username,
144-
config->password);
158+
if (!host || !user || !pass) {
159+
free(host);
160+
free(user);
161+
free(pass);
162+
return NULL;
163+
}
145164

146-
if (NULL == plugin->client) {
147-
plog_error(plugin, "datalayers client_create failed");
148-
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;
149-
} else {
165+
neu_datalayers_client *c = client_create(host, port, user, pass);
166+
free(host);
167+
free(user);
168+
free(pass);
169+
170+
pthread_rwlock_wrlock(&plugin->plugin_mutex);
171+
if (seq != plugin->config_seq) {
172+
if (c) {
173+
client_destroy(c);
174+
}
175+
pthread_rwlock_unlock(&plugin->plugin_mutex);
176+
return NULL;
177+
}
178+
if (c) {
179+
if (plugin->client) {
180+
client_destroy(plugin->client);
181+
}
182+
plugin->client = c;
150183
plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED;
184+
} else {
185+
plog_error(plugin, "datalayers async connect failed");
186+
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;
151187
}
188+
pthread_rwlock_unlock(&plugin->plugin_mutex);
189+
return NULL;
190+
}
152191

153-
return NEU_ERR_SUCCESS;
192+
static void datalayers_spawn_async_connect(neu_plugin_t *plugin)
193+
{
194+
pthread_t tid;
195+
int st = pthread_create(&tid, NULL, datalayers_connect_routine, plugin);
196+
if (st != 0) {
197+
plog_error(plugin, "datalayers pthread_create connect failed: %d", st);
198+
return;
199+
}
200+
pthread_detach(tid);
154201
}
155202

156203
int datalayers_plugin_config(neu_plugin_t *plugin, const char *setting)
157204
{
158205
int rv = 0;
159206
const char * plugin_name = neu_plugin_module.module_name;
160207
datalayers_config_t config = { 0 };
208+
209+
rv = plugin->parse_config(plugin, setting, &config);
210+
if (0 != rv) {
211+
plog_error(plugin, "neu_datalayers_config_parse fail");
212+
return NEU_ERR_NODE_SETTING_INVALID;
213+
}
214+
215+
plugin->config_seq++;
216+
161217
if (plugin->client != NULL) {
162218
stop_consumer_thread(&plugin->consumer_thread, plugin);
163219
pthread_mutex_lock(&plugin->queue_mutex);
@@ -168,43 +224,35 @@ int datalayers_plugin_config(neu_plugin_t *plugin, const char *setting)
168224
create_consumer_thread(plugin);
169225
}
170226

171-
rv = plugin->parse_config(plugin, setting, &config);
172-
if (0 != rv) {
173-
plog_error(plugin, "neu_datalayers_config_parse fail");
174-
return NEU_ERR_NODE_SETTING_INVALID;
175-
}
176-
177-
rv = config_datalayers_client(plugin, &config);
178-
179227
datalayers_config_fini(&plugin->config);
180228

181229
memmove(&plugin->config, &config, sizeof(config));
182230

183-
if (rv == 0) {
184-
plog_notice(plugin, "config plugin `%s` success", plugin_name);
185-
} else {
186-
plog_error(plugin, "datalayers client configuration failed");
187-
}
231+
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;
188232

189-
return rv;
233+
plog_notice(plugin, "config plugin `%s` success (connect in background)",
234+
plugin_name);
235+
236+
datalayers_spawn_async_connect(plugin);
237+
238+
return NEU_ERR_SUCCESS;
190239
}
191240

192241
int datalayers_plugin_start(neu_plugin_t *plugin)
193242
{
194243
const char *plugin_name = neu_plugin_module.module_name;
195244

196245
if (NULL == plugin->client) {
197-
plog_error(plugin, "datalayers started failed, reconnect");
198-
int ret = config_datalayers_client(plugin, &plugin->config);
199-
if (ret != 0) {
200-
plog_error(plugin, "datalayers started failed");
201-
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;
202-
return NEU_ERR_SUCCESS;
203-
}
246+
plog_notice(plugin, "datalayers start: connect in background");
247+
datalayers_spawn_async_connect(plugin);
204248
}
205249

206250
plog_notice(plugin, "start plugin `%s` success", plugin_name);
207-
plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED;
251+
if (plugin->client) {
252+
plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED;
253+
} else {
254+
plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED;
255+
}
208256
return NEU_ERR_SUCCESS;
209257
}
210258

@@ -225,11 +273,11 @@ int datalayers_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
225273
switch (head->type) {
226274
case NEU_REQRESP_TRANS_DATA: {
227275
if (plugin->client == NULL) {
228-
plog_notice(plugin, "datalayers client is NULL, reconnect");
276+
plog_notice(plugin, "datalayers client is NULL, reconnect async");
229277
NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_SEND_MSG_ERRORS_TOTAL,
230278
1, NULL);
231279

232-
config_datalayers_client(plugin, &plugin->config);
280+
datalayers_spawn_async_connect(plugin);
233281
}
234282

235283
error = handle_trans_data(plugin, data);

0 commit comments

Comments
 (0)