forked from emqx/neuron
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatalayers_plugin.h
More file actions
201 lines (166 loc) · 5.26 KB
/
datalayers_plugin.h
File metadata and controls
201 lines (166 loc) · 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
/**
* NEURON IIoT System for Industry 4.0
* Copyright (C) 2020-2022 EMQ Technologies Co., Ltd All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**/
#ifndef NEURON_PLUGIN_DATALAYERS_H
#define NEURON_PLUGIN_DATALAYERS_H
#include <stdbool.h>
#ifdef __cplusplus
extern "C" {
#endif
#include "neuron.h"
#include "datalayers/flight_sql_client.h"
#include "datalayers_config.h"
typedef struct {
char driver[NEU_NODE_NAME_LEN];
char group[NEU_GROUP_NAME_LEN];
} route_key_t;
typedef struct {
route_key_t key;
UT_hash_handle hh;
} route_entry_t;
#define MAX_QUEUE_SIZE 1000
typedef struct db_write_task_s {
UT_array * int_tags;
UT_array * float_tags;
UT_array * bool_tags;
UT_array * string_tags;
struct db_write_task_s *next;
bool freed;
} db_write_task_t;
typedef struct {
db_write_task_t *head;
db_write_task_t *tail;
int size;
int max_size;
} task_queue_t;
struct neu_plugin {
neu_plugin_common_t common;
datalayers_config_t config;
route_entry_t * route_tbl;
neu_datalayers_client *client;
task_queue_t task_queue;
pthread_t consumer_thread;
pthread_mutex_t queue_mutex;
pthread_cond_t queue_not_empty;
pthread_rwlock_t plugin_mutex;
bool consumer_thread_stop_flag;
uint32_t config_seq;
int (*parse_config)(neu_plugin_t *plugin, const char *setting,
datalayers_config_t *config);
};
static inline void route_entry_free(route_entry_t *e)
{
free(e);
}
static inline void route_tbl_free(route_entry_t *tbl)
{
route_entry_t *e = NULL, *tmp = NULL;
HASH_ITER(hh, tbl, e, tmp)
{
HASH_DEL(tbl, e);
route_entry_free(e);
}
}
static inline route_entry_t *
route_tbl_get(route_entry_t **tbl, const char *driver, const char *group)
{
route_entry_t *find = NULL;
route_key_t key = { 0 };
strncpy(key.driver, driver, sizeof(key.driver));
strncpy(key.group, group, sizeof(key.group));
HASH_FIND(hh, *tbl, &key, sizeof(key), find);
return find;
}
// NOTE: we take ownership of `topic`
static inline int route_tbl_add_new(route_entry_t **tbl, const char *driver,
const char *group)
{
route_entry_t *find = NULL;
find = route_tbl_get(tbl, driver, group);
if (find) {
return NEU_ERR_GROUP_ALREADY_SUBSCRIBED;
}
find = calloc(1, sizeof(*find));
if (NULL == find) {
return NEU_ERR_EINTERNAL;
}
strncpy(find->key.driver, driver, sizeof(find->key.driver));
strncpy(find->key.group, group, sizeof(find->key.group));
HASH_ADD(hh, *tbl, key, sizeof(find->key), find);
return 0;
}
static inline int route_tbl_update(route_entry_t **tbl, const char *driver,
const char *group)
{
route_entry_t *find = NULL;
find = route_tbl_get(tbl, driver, group);
if (NULL == find) {
return NEU_ERR_GROUP_NOT_SUBSCRIBE;
}
return 0;
}
static inline void route_tbl_update_driver(route_entry_t **tbl,
const char * driver,
const char * new_name)
{
route_entry_t *e = NULL, *tmp = NULL;
HASH_ITER(hh, *tbl, e, tmp)
{
if (0 == strcmp(e->key.driver, driver)) {
HASH_DEL(*tbl, e);
strncpy(e->key.driver, new_name, sizeof(e->key.driver));
HASH_ADD(hh, *tbl, key, sizeof(e->key), e);
}
}
}
static inline void route_tbl_update_group(route_entry_t **tbl,
const char *driver, const char *group,
const char *new_name)
{
route_entry_t *e = route_tbl_get(tbl, driver, group);
if (e) {
HASH_DEL(*tbl, e);
strncpy(e->key.group, new_name, sizeof(e->key.group));
HASH_ADD(hh, *tbl, key, sizeof(e->key), e);
}
}
static inline void route_tbl_del_driver(route_entry_t **tbl, const char *driver)
{
route_entry_t *e = NULL, *tmp = NULL;
HASH_ITER(hh, *tbl, e, tmp)
{
if (0 == strcmp(e->key.driver, driver)) {
HASH_DEL(*tbl, e);
route_entry_free(e);
}
}
}
static inline void route_tbl_del(route_entry_t **tbl, const char *driver,
const char *group)
{
route_entry_t *find = NULL;
find = route_tbl_get(tbl, driver, group);
if (find) {
HASH_DEL(*tbl, find);
route_entry_free(find);
}
}
#ifdef __cplusplus
}
#endif
#endif