1717 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
1818 **/
1919
20+ #include <pthread.h>
21+
2022#include "define.h"
2123#include "errcodes.h"
2224#include "flight_sql_client.h"
@@ -49,6 +51,7 @@ neu_plugin_t *datalayers_plugin_open(void)
4951 neu_plugin_t * plugin = (neu_plugin_t * ) calloc (1 , sizeof (neu_plugin_t ));
5052 neu_plugin_common_init (& plugin -> common );
5153 plugin -> parse_config = datalayers_config_parse ;
54+ plugin -> config_seq = 0 ;
5255 return plugin ;
5356}
5457
@@ -106,6 +109,7 @@ int datalayers_plugin_init(neu_plugin_t *plugin, bool load)
106109int datalayers_plugin_uninit (neu_plugin_t * plugin )
107110{
108111 pthread_rwlock_wrlock (& plugin -> plugin_mutex );
112+ plugin -> config_seq ++ ;
109113
110114 stop_consumer_thread (& plugin -> consumer_thread , plugin );
111115 pthread_mutex_lock (& plugin -> queue_mutex );
@@ -129,35 +133,87 @@ int datalayers_plugin_uninit(neu_plugin_t *plugin)
129133 return NEU_ERR_SUCCESS ;
130134}
131135
132- static int config_datalayers_client (neu_plugin_t * plugin ,
133- const datalayers_config_t * config )
136+ static void * datalayers_connect_routine (void * arg )
134137{
135- if (!config -> host || !config -> port || !config -> username ||
136- !config -> password ) {
137- plog_error (
138- plugin ,
139- "Invalid datalayers config: NULL host/port/username/password" );
140- return NEU_ERR_DATALAYERS_INIT_FAILURE ;
138+ neu_plugin_t * plugin = (neu_plugin_t * ) arg ;
139+ char * host = NULL ;
140+ char * user = NULL ;
141+ char * pass = NULL ;
142+ int port = 0 ;
143+ uint32_t seq = 0 ;
144+
145+ pthread_rwlock_wrlock (& plugin -> plugin_mutex );
146+ seq = plugin -> config_seq ;
147+ if (!plugin -> config .host || !plugin -> config .username ||
148+ !plugin -> config .password ) {
149+ pthread_rwlock_unlock (& plugin -> plugin_mutex );
150+ return NULL ;
141151 }
152+ host = strdup (plugin -> config .host );
153+ user = strdup (plugin -> config .username );
154+ pass = strdup (plugin -> config .password );
155+ port = (int ) plugin -> config .port ;
156+ pthread_rwlock_unlock (& plugin -> plugin_mutex );
142157
143- plugin -> client = client_create (config -> host , config -> port , config -> username ,
144- config -> password );
158+ if (!host || !user || !pass ) {
159+ free (host );
160+ free (user );
161+ free (pass );
162+ return NULL ;
163+ }
145164
146- if (NULL == plugin -> client ) {
147- plog_error (plugin , "datalayers client_create failed" );
148- plugin -> common .link_state = NEU_NODE_LINK_STATE_DISCONNECTED ;
149- } else {
165+ neu_datalayers_client * c = client_create (host , port , user , pass );
166+ free (host );
167+ free (user );
168+ free (pass );
169+
170+ pthread_rwlock_wrlock (& plugin -> plugin_mutex );
171+ if (seq != plugin -> config_seq ) {
172+ if (c ) {
173+ client_destroy (c );
174+ }
175+ pthread_rwlock_unlock (& plugin -> plugin_mutex );
176+ return NULL ;
177+ }
178+ if (c ) {
179+ if (plugin -> client ) {
180+ client_destroy (plugin -> client );
181+ }
182+ plugin -> client = c ;
150183 plugin -> common .link_state = NEU_NODE_LINK_STATE_CONNECTED ;
184+ } else {
185+ plog_error (plugin , "datalayers async connect failed" );
186+ plugin -> common .link_state = NEU_NODE_LINK_STATE_DISCONNECTED ;
151187 }
188+ pthread_rwlock_unlock (& plugin -> plugin_mutex );
189+ return NULL ;
190+ }
152191
153- return NEU_ERR_SUCCESS ;
192+ static void datalayers_spawn_async_connect (neu_plugin_t * plugin )
193+ {
194+ pthread_t tid ;
195+ int st = pthread_create (& tid , NULL , datalayers_connect_routine , plugin );
196+ if (st != 0 ) {
197+ plog_error (plugin , "datalayers pthread_create connect failed: %d" , st );
198+ return ;
199+ }
200+ pthread_detach (tid );
154201}
155202
156203int datalayers_plugin_config (neu_plugin_t * plugin , const char * setting )
157204{
158205 int rv = 0 ;
159206 const char * plugin_name = neu_plugin_module .module_name ;
160207 datalayers_config_t config = { 0 };
208+
209+ rv = plugin -> parse_config (plugin , setting , & config );
210+ if (0 != rv ) {
211+ plog_error (plugin , "neu_datalayers_config_parse fail" );
212+ return NEU_ERR_NODE_SETTING_INVALID ;
213+ }
214+
215+ plugin -> config_seq ++ ;
216+
161217 if (plugin -> client != NULL ) {
162218 stop_consumer_thread (& plugin -> consumer_thread , plugin );
163219 pthread_mutex_lock (& plugin -> queue_mutex );
@@ -168,43 +224,35 @@ int datalayers_plugin_config(neu_plugin_t *plugin, const char *setting)
168224 create_consumer_thread (plugin );
169225 }
170226
171- rv = plugin -> parse_config (plugin , setting , & config );
172- if (0 != rv ) {
173- plog_error (plugin , "neu_datalayers_config_parse fail" );
174- return NEU_ERR_NODE_SETTING_INVALID ;
175- }
176-
177- rv = config_datalayers_client (plugin , & config );
178-
179227 datalayers_config_fini (& plugin -> config );
180228
181229 memmove (& plugin -> config , & config , sizeof (config ));
182230
183- if (rv == 0 ) {
184- plog_notice (plugin , "config plugin `%s` success" , plugin_name );
185- } else {
186- plog_error (plugin , "datalayers client configuration failed" );
187- }
231+ plugin -> common .link_state = NEU_NODE_LINK_STATE_DISCONNECTED ;
188232
189- return rv ;
233+ plog_notice (plugin , "config plugin `%s` success (connect in background)" ,
234+ plugin_name );
235+
236+ datalayers_spawn_async_connect (plugin );
237+
238+ return NEU_ERR_SUCCESS ;
190239}
191240
192241int datalayers_plugin_start (neu_plugin_t * plugin )
193242{
194243 const char * plugin_name = neu_plugin_module .module_name ;
195244
196245 if (NULL == plugin -> client ) {
197- plog_error (plugin , "datalayers started failed, reconnect" );
198- int ret = config_datalayers_client (plugin , & plugin -> config );
199- if (ret != 0 ) {
200- plog_error (plugin , "datalayers started failed" );
201- plugin -> common .link_state = NEU_NODE_LINK_STATE_DISCONNECTED ;
202- return NEU_ERR_SUCCESS ;
203- }
246+ plog_notice (plugin , "datalayers start: connect in background" );
247+ datalayers_spawn_async_connect (plugin );
204248 }
205249
206250 plog_notice (plugin , "start plugin `%s` success" , plugin_name );
207- plugin -> common .link_state = NEU_NODE_LINK_STATE_CONNECTED ;
251+ if (plugin -> client ) {
252+ plugin -> common .link_state = NEU_NODE_LINK_STATE_CONNECTED ;
253+ } else {
254+ plugin -> common .link_state = NEU_NODE_LINK_STATE_DISCONNECTED ;
255+ }
208256 return NEU_ERR_SUCCESS ;
209257}
210258
@@ -225,11 +273,11 @@ int datalayers_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
225273 switch (head -> type ) {
226274 case NEU_REQRESP_TRANS_DATA : {
227275 if (plugin -> client == NULL ) {
228- plog_notice (plugin , "datalayers client is NULL, reconnect" );
276+ plog_notice (plugin , "datalayers client is NULL, reconnect async " );
229277 NEU_PLUGIN_UPDATE_METRIC (plugin , NEU_METRIC_SEND_MSG_ERRORS_TOTAL ,
230278 1 , NULL );
231279
232- config_datalayers_client (plugin , & plugin -> config );
280+ datalayers_spawn_async_connect (plugin );
233281 }
234282
235283 error = handle_trans_data (plugin , data );
0 commit comments