Skip to content

Commit c0d8099

Browse files
committed
in_tail: move all database queries to prepared statements (#2030)
A problem reported in #2030 lead to find solutions that avoids using fixed size buffers to compose SQL queries. Using the heap for database queries in our case is expensive. The solution proposed is to migrate all SQL queries to prepared SQL statements, so we avoid having local buffers and pass the variables directly to SQLite. Signed-off-by: Eduardo Silva <[email protected]>
1 parent d88af61 commit c0d8099

File tree

4 files changed

+172
-54
lines changed

4 files changed

+172
-54
lines changed

plugins/in_tail/tail_config.c

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,43 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
239239

240240
/* Prepare Statement */
241241
if (ctx->db) {
242+
/* SQL_GET_FILE */
243+
ret = sqlite3_prepare_v2(ctx->db->handler,
244+
SQL_GET_FILE,
245+
-1,
246+
&ctx->stmt_get_file,
247+
0);
248+
if (ret != SQLITE_OK) {
249+
flb_plg_error(ctx->ins, "error preparing database SQL statement");
250+
flb_tail_config_destroy(ctx);
251+
return NULL;
252+
}
253+
254+
/* SQL_INSERT_FILE */
255+
ret = sqlite3_prepare_v2(ctx->db->handler,
256+
SQL_INSERT_FILE,
257+
-1,
258+
&ctx->stmt_insert_file,
259+
0);
260+
if (ret != SQLITE_OK) {
261+
flb_plg_error(ctx->ins, "error preparing database SQL statement");
262+
flb_tail_config_destroy(ctx);
263+
return NULL;
264+
}
265+
266+
/* SQL_ROTATE_FILE */
267+
ret = sqlite3_prepare_v2(ctx->db->handler,
268+
SQL_ROTATE_FILE,
269+
-1,
270+
&ctx->stmt_rotate_file,
271+
0);
272+
if (ret != SQLITE_OK) {
273+
flb_plg_error(ctx->ins, "error preparing database SQL statement");
274+
flb_tail_config_destroy(ctx);
275+
return NULL;
276+
}
277+
278+
/* SQL_UPDATE_OFFSET */
242279
ret = sqlite3_prepare_v2(ctx->db->handler,
243280
SQL_UPDATE_OFFSET,
244281
-1,
@@ -249,6 +286,19 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
249286
flb_tail_config_destroy(ctx);
250287
return NULL;
251288
}
289+
290+
/* SQL_DELETE_FILE */
291+
ret = sqlite3_prepare_v2(ctx->db->handler,
292+
SQL_DELETE_FILE,
293+
-1,
294+
&ctx->stmt_delete_file,
295+
0);
296+
if (ret != SQLITE_OK) {
297+
flb_plg_error(ctx->ins, "error preparing database SQL statement");
298+
flb_tail_config_destroy(ctx);
299+
return NULL;
300+
}
301+
252302
}
253303
#endif
254304

@@ -285,6 +335,10 @@ int flb_tail_config_destroy(struct flb_tail_config *config)
285335

286336
#ifdef FLB_HAVE_SQLDB
287337
if (config->db != NULL) {
338+
sqlite3_finalize(config->stmt_get_file);
339+
sqlite3_finalize(config->stmt_insert_file);
340+
sqlite3_finalize(config->stmt_delete_file);
341+
sqlite3_finalize(config->stmt_rotate_file);
288342
sqlite3_finalize(config->stmt_offset);
289343
flb_tail_db_close(config->db);
290344
}

plugins/in_tail/tail_config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ struct flb_tail_config {
8787
#ifdef FLB_HAVE_SQLDB
8888
struct flb_sqldb *db;
8989
int db_sync;
90+
sqlite3_stmt *stmt_get_file;
91+
sqlite3_stmt *stmt_insert_file;
92+
sqlite3_stmt *stmt_delete_file;
93+
sqlite3_stmt *stmt_rotate_file;
9094
sqlite3_stmt *stmt_offset;
9195
#endif
9296

plugins/in_tail/tail_db.c

Lines changed: 108 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -84,55 +84,99 @@ int flb_tail_db_close(struct flb_sqldb *db)
8484
return 0;
8585
}
8686

87+
/*
88+
* Check if an file inode exists in the database. Return FLB_TRUE or
89+
* FLB_FALSE
90+
*/
91+
static int db_file_exists(struct flb_tail_file *file,
92+
struct flb_tail_config *ctx,
93+
uint64_t *id, uint64_t *inode, off_t *offset)
94+
{
95+
int ret;
96+
int exists = FLB_FALSE;
97+
98+
/* Bind parameters */
99+
sqlite3_bind_int64(ctx->stmt_get_file, 1, file->inode);
100+
ret = sqlite3_step(ctx->stmt_get_file);
101+
102+
if (ret == SQLITE_ROW) {
103+
exists = FLB_TRUE;
104+
105+
/* id: column 0 */
106+
*id = sqlite3_column_int64(ctx->stmt_get_file, 0);
107+
108+
/* offset: column 2 */
109+
*offset = sqlite3_column_int64(ctx->stmt_get_file, 2);
110+
111+
/* inode: column 3 */
112+
*inode = sqlite3_column_int64(ctx->stmt_get_file, 3);
113+
}
114+
else if (ret == SQLITE_DONE) {
115+
/* all good */
116+
}
117+
else {
118+
exists = -1;
119+
}
120+
121+
sqlite3_clear_bindings(ctx->stmt_get_file);
122+
sqlite3_reset(ctx->stmt_get_file);
123+
124+
return exists;
125+
126+
}
127+
128+
static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ctx)
87129

88-
static int cb_file_check(void *data, int argc, char **argv, char **cols)
89130
{
90-
struct query_status *qs = data;
131+
int ret;
132+
time_t created;
91133

92-
qs->id = atoi(argv[0]); /* id column */
93-
qs->offset = atoll(argv[2]); /* offset column */
134+
/* Register the file */
135+
created = time(NULL);
94136

95-
qs->rows++;
96-
return 0;
137+
/* Bind parameters */
138+
sqlite3_bind_text(ctx->stmt_insert_file, 1, file->name, -1, 0);
139+
sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset);
140+
sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode);
141+
sqlite3_bind_int64(ctx->stmt_insert_file, 4, created);
142+
143+
/* Run the insert */
144+
ret = sqlite3_step(ctx->stmt_insert_file);
145+
if (ret != SQLITE_DONE) {
146+
flb_plg_error(ctx->ins, "cannot execute insert file %s inode=%lu",
147+
file->name, file->inode);
148+
return -1;
149+
}
150+
151+
/* Get the database ID for this file */
152+
return flb_sqldb_last_id(ctx->db);
97153
}
98154

99155
int flb_tail_db_file_set(struct flb_tail_file *file,
100156
struct flb_tail_config *ctx)
101157
{
102158
int ret;
103-
char query[PATH_MAX];
104-
struct query_status qs = {0};
105-
uint64_t created;
159+
uint64_t id = 0;
160+
off_t offset = 0;
161+
uint64_t inode = 0;
106162

107163
/* Check if the file exists */
108-
snprintf(query, sizeof(query) - 1, SQL_GET_FILE, (uint64_t) file->inode);
109-
memset(&qs, '\0', sizeof(qs));
110-
ret = flb_sqldb_query(ctx->db,
111-
query, cb_file_check, &qs);
112-
if (ret == FLB_ERROR) {
113-
flb_plg_error(ctx->ins, "cannot execute SQL: %s", query);
164+
ret = db_file_exists(file, ctx, &id, &inode, &offset);
165+
if (ret == -1) {
166+
flb_plg_error(ctx->ins, "cannot execute query to check inode: %lu",
167+
file->inode);
114168
return -1;
115169
}
116170

117-
if (qs.rows == 0) {
118-
/* Register the file */
119-
created = time(NULL);
120-
snprintf(query, sizeof(query) - 1,
121-
SQL_INSERT_FILE,
122-
file->name, (uint64_t) 0, (uint64_t) file->inode, created);
123-
ret = flb_sqldb_query(ctx->db, query, NULL, NULL);
124-
if (ret == FLB_ERROR) {
125-
flb_plg_error(ctx->ins, "cannot execute SQL: %s", query);
126-
return -1;
127-
}
128-
171+
if (ret == FLB_FALSE) {
129172
/* Get the database ID for this file */
130-
file->db_id = flb_sqldb_last_id(ctx->db);
131-
return 0;
173+
file->db_id = db_file_insert(file, ctx);
174+
}
175+
else {
176+
file->db_id = id;
177+
file->offset = offset;
132178
}
133179

134-
file->db_id = qs.id;
135-
file->offset = qs.offset;
136180
return 0;
137181
}
138182

@@ -148,32 +192,45 @@ int flb_tail_db_file_offset(struct flb_tail_file *file,
148192

149193
ret = sqlite3_step(ctx->stmt_offset);
150194

151-
sqlite3_clear_bindings(ctx->stmt_offset);
152-
sqlite3_reset(ctx->stmt_offset);
153-
154195
if (ret != SQLITE_DONE) {
196+
sqlite3_clear_bindings(ctx->stmt_offset);
197+
sqlite3_reset(ctx->stmt_offset);
155198
return -1;
156199
}
157200

201+
/* Verify number of updated rows */
202+
ret = sqlite3_changes(ctx->db->handler);
203+
if (ret == 0) {
204+
/*
205+
* 'someone' like you 'the reader' or another user has deleted the database
206+
* entry, just restore it.
207+
*/
208+
file->db_id = db_file_insert(file, ctx);
209+
}
210+
211+
sqlite3_clear_bindings(ctx->stmt_offset);
212+
sqlite3_reset(ctx->stmt_offset);
213+
158214
return 0;
159215
}
160216

161-
/* Mark a file as rotated */
217+
/* Mark a file as rotated v2 */
162218
int flb_tail_db_file_rotate(const char *new_name,
163219
struct flb_tail_file *file,
164220
struct flb_tail_config *ctx)
165221
{
166222
int ret;
167-
char query[PATH_MAX];
168-
struct query_status qs = {0};
169223

170-
snprintf(query, sizeof(query) - 1,
171-
SQL_ROTATE_FILE,
172-
new_name, file->db_id);
224+
/* Bind parameters */
225+
sqlite3_bind_text(ctx->stmt_rotate_file, 1, new_name, -1, 0);
226+
sqlite3_bind_int64(ctx->stmt_rotate_file, 2, file->db_id);
173227

174-
ret = flb_sqldb_query(ctx->db,
175-
query, cb_file_check, &qs);
176-
if (ret != FLB_OK) {
228+
ret = sqlite3_step(ctx->stmt_rotate_file);
229+
230+
sqlite3_clear_bindings(ctx->stmt_rotate_file);
231+
sqlite3_reset(ctx->stmt_rotate_file);
232+
233+
if (ret != SQLITE_DONE) {
177234
return -1;
178235
}
179236

@@ -185,12 +242,15 @@ int flb_tail_db_file_delete(struct flb_tail_file *file,
185242
struct flb_tail_config *ctx)
186243
{
187244
int ret;
188-
char query[PATH_MAX];
189245

190-
/* Check if the file exists */
191-
snprintf(query, sizeof(query) - 1, SQL_DELETE_FILE, file->db_id);
192-
ret = flb_sqldb_query(ctx->db, query, NULL, NULL);
193-
if (ret != FLB_OK) {
246+
/* Bind parameters */
247+
sqlite3_bind_int64(ctx->stmt_delete_file, 1, file->db_id);
248+
ret = sqlite3_step(ctx->stmt_delete_file);
249+
250+
sqlite3_clear_bindings(ctx->stmt_delete_file);
251+
sqlite3_reset(ctx->stmt_delete_file);
252+
253+
if (ret != SQLITE_DONE) {
194254
flb_plg_error(ctx->ins, "db: error deleting entry from database: %s",
195255
file->name);
196256
return -1;

plugins/in_tail/tail_sql.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,20 @@
3838
" rotated INTEGER DEFAULT 0" \
3939
");"
4040

41-
#define SQL_GET_FILE "SELECT * from in_tail_files WHERE inode=%"PRIu64";"
41+
#define SQL_GET_FILE "SELECT * from in_tail_files WHERE inode=@inode;"
4242

4343
#define SQL_INSERT_FILE \
4444
"INSERT INTO in_tail_files (name, offset, inode, created)" \
45-
" VALUES ('%s', %"PRIu64", %"PRIu64", %"PRIu64");"
45+
" VALUES (@name, @offset, @inode, @created);"
46+
47+
#define SQL_ROTATE_FILE \
48+
"UPDATE in_tail_files set name=@name,rotated=1 WHERE id=@id;"
4649

4750
#define SQL_UPDATE_OFFSET \
4851
"UPDATE in_tail_files set offset=@offset WHERE id=@id;"
4952

50-
#define SQL_ROTATE_FILE \
51-
"UPDATE in_tail_files set name='%s',rotated=1 WHERE id=%"PRId64";"
52-
5353
#define SQL_DELETE_FILE \
54-
"DELETE FROM in_tail_files WHERE id=%"PRId64";"
54+
"DELETE FROM in_tail_files WHERE id=@id;"
5555

5656
#define SQL_PRAGMA_SYNC \
5757
"PRAGMA synchronous=%i;"

0 commit comments

Comments
 (0)