Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 153 additions & 1 deletion plugins/in_tail/tail_db.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "tail_sql.h"
#include "tail_file.h"

#include <inttypes.h>

struct query_status {
int id;
int rows;
Expand Down Expand Up @@ -253,6 +255,132 @@ static int stmt_add_param_concat(struct flb_tail_config *ctx,
return 0;
}

/*
* Scalable stale inode cleanup: use a temp table to avoid SQLite variable limits.
*
* The legacy implementation builds:
* DELETE ... WHERE inode NOT IN (?,?,?,...);
* which requires one bound parameter per inode and fails when the number of
* monitored files exceeds SQLITE_LIMIT_VARIABLE_NUMBER (commonly 32766 in our
* bundled SQLite, but can vary).
*/
static int flb_tail_db_stale_file_delete_temp_table(struct flb_tail_config *ctx,
uint64_t file_count)
{
int ret;
int changes;
int txn_started = FLB_FALSE;
sqlite3_stmt *stmt_insert_inode = NULL;
struct mk_list *head;
struct mk_list *tmp;
struct flb_tail_file *file;

/* If there are no monitored files, delete everything from the DB table. */
if (file_count == 0) {
ret = flb_sqldb_query(ctx->db, "DELETE FROM in_tail_files;", NULL, NULL);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: cannot delete all stale inodes (no monitored files)");
return -1;
}

changes = sqlite3_changes(ctx->db->handler);
flb_plg_info(ctx->ins, "db: delete unmonitored stale inodes from the database: count=%d",
changes);
return 0;
}

/* Create/clear temp table holding current monitored inodes. */
ret = flb_sqldb_query(ctx->db,
"CREATE TEMP TABLE IF NOT EXISTS in_tail_current_inodes ("
" inode INTEGER PRIMARY KEY"
");",
NULL, NULL);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: cannot create temp table for inode cleanup");
return -1;
}

ret = flb_sqldb_query(ctx->db, "DELETE FROM in_tail_current_inodes;", NULL, NULL);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: cannot clear temp inode table");
return -1;
}

/* Use a transaction for faster bulk inserts. */
ret = flb_sqldb_query(ctx->db, "BEGIN;", NULL, NULL);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: cannot begin transaction for temp inode inserts");
return -1;
}
txn_started = FLB_TRUE;

ret = sqlite3_prepare_v2(ctx->db->handler,
"INSERT OR IGNORE INTO in_tail_current_inodes(inode) VALUES (?);",
-1, &stmt_insert_inode, 0);
if (ret != SQLITE_OK) {
flb_plg_error(ctx->ins, "db: cannot prepare temp inode insert statement, ret=%d", ret);
goto error;
}

mk_list_foreach_safe(head, tmp, &ctx->files_static) {
file = mk_list_entry(head, struct flb_tail_file, _head);

ret = sqlite3_bind_int64(stmt_insert_inode, 1, (sqlite3_int64) file->inode);
if (ret != SQLITE_OK) {
flb_plg_error(ctx->ins, "db: error binding temp inode insert: inode=%" PRIu64 ", ret=%d",
file->inode, ret);
goto error;
}

ret = sqlite3_step(stmt_insert_inode);
if (ret != SQLITE_DONE) {
flb_plg_error(ctx->ins, "db: error inserting inode into temp table: inode=%" PRIu64 ", ret=%d",
file->inode, ret);
goto error;
}

sqlite3_clear_bindings(stmt_insert_inode);
sqlite3_reset(stmt_insert_inode);
}

sqlite3_finalize(stmt_insert_inode);
stmt_insert_inode = NULL;

/* Delete any inode that is not in the current monitored set. */
ret = flb_sqldb_query(ctx->db,
"DELETE FROM in_tail_files "
"WHERE inode NOT IN (SELECT inode FROM in_tail_current_inodes);",
NULL, NULL);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: cannot delete stale inodes using temp table");
goto error;
}

ret = flb_sqldb_query(ctx->db, "COMMIT;", NULL, NULL);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: cannot commit transaction for temp inode inserts");
goto error;
}
txn_started = FLB_FALSE;

changes = sqlite3_changes(ctx->db->handler);
flb_plg_info(ctx->ins, "db: delete unmonitored stale inodes from the database: count=%d",
changes);
return 0;

error:
if (stmt_insert_inode) {
sqlite3_finalize(stmt_insert_inode);
}

if (txn_started == FLB_TRUE) {
/* Best-effort rollback */
flb_sqldb_query(ctx->db, "ROLLBACK;", NULL, NULL);
}

return -1;
}

int flb_tail_db_file_set(struct flb_tail_file *file,
struct flb_tail_config *ctx)
{
Expand Down Expand Up @@ -377,6 +505,7 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
size_t sql_size;
uint64_t idx;
uint64_t file_count = ctx->files_static_count;
int max_vars = -1;
flb_sds_t stale_delete_sql;
flb_sds_t sds_tmp;
sqlite3_stmt *stmt_delete_inodes = NULL;
Expand All @@ -388,14 +517,37 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
return 0;
}

/*
* Avoid SQLite variable limits for large monitored file sets.
*
* sqlite3_limit(..., SQLITE_LIMIT_VARIABLE_NUMBER, -1) returns the current
* runtime limit (compile-time hard limit may be higher). If our monitored
* file count exceeds this, the legacy NOT IN (?,?,...) statement will fail
* at prepare-time.
*/
max_vars = sqlite3_limit(ctx->db->handler, SQLITE_LIMIT_VARIABLE_NUMBER, -1);
if (max_vars > 0 && file_count > (uint64_t) max_vars) {
flb_plg_warn(ctx->ins,
"db: large file set detected (%" PRIu64 " files) exceeds SQLite variable limit (%d); "
"using temp-table cleanup for stale inode deletion",
file_count, max_vars);
return flb_tail_db_stale_file_delete_temp_table(ctx, file_count);
}

/* Create a stmt sql buffer */
sql_size = SQL_DELETE_STALE_FILE_START_LEN;
sql_size += SQL_DELETE_STALE_FILE_WHERE_LEN;
sql_size += SQL_STMT_START_PARAM_LEN;
sql_size += SQL_STMT_PARAM_END_LEN;
sql_size += SQL_STMT_END_LEN;
if (file_count > 0) {
sql_size += (SQL_STMT_ADD_PARAM_LEN * file_count);
/*
* We already account for the first '?' via SQL_STMT_START_PARAM_LEN.
* Additional parameters are count-1 occurrences of ",?".
*/
if (file_count > 1) {
sql_size += (SQL_STMT_ADD_PARAM_LEN * (file_count - 1));
}
}

stale_delete_sql = flb_sds_create_size(sql_size + 1);
Expand Down
Loading