diff --git a/plugins/out_file/file.c b/plugins/out_file/file.c index 89a2e0176af..549344729b1 100644 --- a/plugins/out_file/file.c +++ b/plugins/out_file/file.c @@ -17,33 +17,42 @@ * limitations under the License. */ -#include +#include +#include +#include #include +#include +#include #include -#include +#include #include -#include -#include +#include #include #include - +#include +#include +#include #include #include -#include #include -#include +#include +#include #ifdef FLB_SYSTEM_WINDOWS #include #include +#else +#include +#include +#include #endif #include "file.h" #ifdef FLB_SYSTEM_WINDOWS #define NEWLINE "\r\n" -#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR) +#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR) #else #define NEWLINE "\n" #endif @@ -54,6 +63,18 @@ #define FLB_PATH_SEPARATOR "/" #endif +/* Constants for streaming gzip compression */ +#define GZIP_CHUNK_SIZE (16 * 1024) +#define GZIP_HEADER_SIZE 10 +#define GZIP_FOOTER_SIZE 8 + +struct file_file_size { + flb_sds_t filename; + size_t size; + flb_lock_t lock; + struct mk_list _head; +}; + struct flb_file_conf { const char *out_path; const char *out_file; @@ -63,6 +84,13 @@ struct flb_file_conf { int format; int csv_column_names; int mkdir; + /* Rotation-related fields */ + int files_rotation; + size_t max_size; + int max_files; + int gzip; + struct mk_list file_sizes; + flb_lock_t list_lock; struct flb_output_instance *ins; }; @@ -139,7 +167,8 @@ static int sanitize_tag_name(const char *tag, char *buf, size_t size) } if ((component_len == 1 && component_start[0] == '.') || - (component_len == 2 && component_start[0] == '.' && component_start[1] == '.')) { + (component_len == 2 && component_start[0] == '.' && + component_start[1] == '.')) { if (out_len >= size - 1) { break; } @@ -151,8 +180,9 @@ static int sanitize_tag_name(const char *tag, char *buf, size_t size) for (i = 0; i < component_len; i++) { sanitized_char = component_start[i]; - if (!isalnum((unsigned char) sanitized_char) && sanitized_char != '-' && - sanitized_char != '_' && sanitized_char != '.') { + if (!isalnum((unsigned char)sanitized_char) && + sanitized_char != '-' && sanitized_char != '_' && + sanitized_char != '.') { sanitized_char = '_'; } @@ -175,16 +205,14 @@ static int sanitize_tag_name(const char *tag, char *buf, size_t size) return 0; } - static int cb_file_init(struct flb_output_instance *ins, - struct flb_config *config, - void *data) + struct flb_config *config, void *data) { int ret; const char *tmp; char *ret_str; - (void) config; - (void) data; + (void)config; + (void)data; struct flb_file_conf *ctx; ctx = flb_calloc(1, sizeof(struct flb_file_conf)); @@ -198,36 +226,48 @@ static int cb_file_init(struct flb_output_instance *ins, ctx->label_delimiter = NULL; ctx->template = NULL; - ret = flb_output_config_map_set(ins, (void *) ctx); + /* Initialize rotation-related structures */ + mk_list_init(&ctx->file_sizes); + flb_lock_init(&ctx->list_lock); + + ret = flb_output_config_map_set(ins, (void *)ctx); if (ret == -1) { flb_free(ctx); return -1; } + /* Validate max_files only if rotation is enabled */ + if (ctx->files_rotation == FLB_TRUE && ctx->max_files <= 0) { + flb_plg_error(ctx->ins, "invalid max_files=%d; must be >= 1", + ctx->max_files); + flb_free(ctx); + return -1; + } + /* Optional, file format */ tmp = flb_output_get_property("Format", ins); if (tmp) { if (!strcasecmp(tmp, "csv")) { - ctx->format = FLB_OUT_FILE_FMT_CSV; + ctx->format = FLB_OUT_FILE_FMT_CSV; ctx->delimiter = ","; } else if (!strcasecmp(tmp, "ltsv")) { - ctx->format = FLB_OUT_FILE_FMT_LTSV; + ctx->format = FLB_OUT_FILE_FMT_LTSV; ctx->delimiter = "\t"; ctx->label_delimiter = ":"; } else if (!strcasecmp(tmp, "plain")) { - ctx->format = FLB_OUT_FILE_FMT_PLAIN; + ctx->format = FLB_OUT_FILE_FMT_PLAIN; ctx->delimiter = NULL; ctx->label_delimiter = NULL; } else if (!strcasecmp(tmp, "msgpack")) { - ctx->format = FLB_OUT_FILE_FMT_MSGPACK; + ctx->format = FLB_OUT_FILE_FMT_MSGPACK; ctx->delimiter = NULL; ctx->label_delimiter = NULL; } else if (!strcasecmp(tmp, "template")) { - ctx->format = FLB_OUT_FILE_FMT_TEMPLATE; + ctx->format = FLB_OUT_FILE_FMT_TEMPLATE; } else if (!strcasecmp(tmp, "out_file")) { /* for explicit setting */ @@ -252,15 +292,24 @@ static int cb_file_init(struct flb_output_instance *ins, ctx->label_delimiter = ret_str; } + /* Log configuration if rotation is enabled */ + if (ctx->files_rotation == FLB_TRUE && ctx->max_size > 0) { + flb_plg_info(ctx->ins, + "file rotation enabled: max_size=%zu, max_files=%d, " + "gzip=%s, path=%s", + ctx->max_size, ctx->max_files, + ctx->gzip == FLB_TRUE ? "true" : "false", + ctx->out_path ? ctx->out_path : "not set"); + } + /* Set the context */ flb_output_set_context(ins, ctx); return 0; } -static int csv_output(FILE *fp, int column_names, - struct flb_time *tm, msgpack_object *obj, - struct flb_file_conf *ctx) +static int csv_output(FILE *fp, int column_names, struct flb_time *tm, + msgpack_object *obj, struct flb_file_conf *ctx) { int i; int map_size; @@ -273,7 +322,7 @@ static int csv_output(FILE *fp, int column_names, if (column_names == FLB_TRUE) { fprintf(fp, "timestamp%s", ctx->delimiter); for (i = 0; i < map_size; i++) { - msgpack_object_print(fp, (kv+i)->key); + msgpack_object_print(fp, (kv + i)->key); if (i + 1 < map_size) { fprintf(fp, "%s", ctx->delimiter); } @@ -281,15 +330,15 @@ static int csv_output(FILE *fp, int column_names, fprintf(fp, NEWLINE); } - fprintf(fp, "%lld.%.09ld%s", - (long long) tm->tm.tv_sec, tm->tm.tv_nsec, ctx->delimiter); + fprintf(fp, "%lld.%.09ld%s", (long long)tm->tm.tv_sec, tm->tm.tv_nsec, + ctx->delimiter); for (i = 0; i < map_size - 1; i++) { - msgpack_object_print(fp, (kv+i)->val); + msgpack_object_print(fp, (kv + i)->val); fprintf(fp, "%s", ctx->delimiter); } - msgpack_object_print(fp, (kv+(map_size-1))->val); + msgpack_object_print(fp, (kv + (map_size - 1))->val); fprintf(fp, NEWLINE); } return 0; @@ -305,28 +354,26 @@ static int ltsv_output(FILE *fp, struct flb_time *tm, msgpack_object *obj, if (obj->type == MSGPACK_OBJECT_MAP && obj->via.map.size > 0) { kv = obj->via.map.ptr; map_size = obj->via.map.size; - fprintf(fp, "\"time\"%s%f%s", - ctx->label_delimiter, - flb_time_to_double(tm), - ctx->delimiter); + fprintf(fp, "\"time\"%s%f%s", ctx->label_delimiter, + flb_time_to_double(tm), ctx->delimiter); for (i = 0; i < map_size - 1; i++) { - msgpack_object_print(fp, (kv+i)->key); + msgpack_object_print(fp, (kv + i)->key); fprintf(fp, "%s", ctx->label_delimiter); - msgpack_object_print(fp, (kv+i)->val); + msgpack_object_print(fp, (kv + i)->val); fprintf(fp, "%s", ctx->delimiter); } - msgpack_object_print(fp, (kv+(map_size-1))->key); + msgpack_object_print(fp, (kv + (map_size - 1))->key); fprintf(fp, "%s", ctx->label_delimiter); - msgpack_object_print(fp, (kv+(map_size-1))->val); + msgpack_object_print(fp, (kv + (map_size - 1))->val); fprintf(fp, NEWLINE); } return 0; } -static int template_output_write(struct flb_file_conf *ctx, - FILE *fp, struct flb_time *tm, msgpack_object *obj, +static int template_output_write(struct flb_file_conf *ctx, FILE *fp, + struct flb_time *tm, msgpack_object *obj, const char *key, int size) { int i; @@ -382,7 +429,7 @@ static int template_output(FILE *fp, struct flb_time *tm, msgpack_object *obj, int keysize; const char *key; const char *pos; - const char *inbrace = NULL; /* points to the last open brace */ + const char *inbrace = NULL; /* points to the last open brace */ for (i = 0; i < len; i++) { pos = ctx->template + i; @@ -420,22 +467,20 @@ static int template_output(FILE *fp, struct flb_time *tm, msgpack_object *obj, return 0; } - -static int plain_output(FILE *fp, msgpack_object *obj, size_t alloc_size, int escape_unicode) +static int plain_output(FILE *fp, msgpack_object *obj, size_t alloc_size, + int escape_unicode) { char *buf; buf = flb_msgpack_to_json_str(alloc_size, obj, escape_unicode); if (buf) { - fprintf(fp, "%s" NEWLINE, - buf); + fprintf(fp, "%s" NEWLINE, buf); flb_free(buf); } return 0; } -static void print_metrics_text(struct flb_output_instance *ins, - FILE *fp, +static void print_metrics_text(struct flb_output_instance *ins, FILE *fp, const void *data, size_t bytes) { int ret; @@ -444,7 +489,7 @@ static void print_metrics_text(struct flb_output_instance *ins, struct cmt *cmt = NULL; /* get cmetrics context */ - ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off); + ret = cmt_decode_msgpack_create(&cmt, (char *)data, bytes, &off); if (ret != 0) { flb_plg_error(ins, "could not process metrics payload"); return; @@ -486,7 +531,7 @@ static int mkpath(struct flb_output_instance *ins, const char *dir) } if (stat(dir, &st) == 0) { - if (S_ISDIR (st.st_mode)) { + if (S_ISDIR(st.st_mode)) { return 0; } flb_plg_error(ins, "%s is not a directory", dir); @@ -550,7 +595,7 @@ static int mkpath(struct flb_output_instance *ins, const char *dir) */ parent_dir = dirname(dup_dir); if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) { - if (S_ISDIR (st.st_mode)) { + if (S_ISDIR(st.st_mode)) { flb_plg_debug(ins, "creating directory %s", dup_dir); ret = mkdir(dup_dir, 0755); free(dup_dir); @@ -582,35 +627,667 @@ static int mkpath(struct flb_output_instance *ins, const char *dir) #endif } +/* Helper function to find a file size entry by filename */ +static struct file_file_size *find_file_size_entry(struct flb_file_conf *ctx, + const char *filename) +{ + struct mk_list *head; + struct file_file_size *entry; + + /* Caller must hold ctx->list_lock */ + mk_list_foreach(head, &ctx->file_sizes) + { + entry = mk_list_entry(head, struct file_file_size, _head); + if (entry->filename && strcmp(entry->filename, filename) == 0) { + return entry; + } + } + return NULL; +} + +/* Helper function to create file size entry */ +static struct file_file_size *create_file_size_entry(struct flb_file_conf *ctx, + const char *filename, + size_t size) +{ + struct file_file_size *entry; + flb_sds_t filename_copy; + + /* Caller must hold ctx->list_lock */ + + /* Create new entry */ + entry = flb_calloc(1, sizeof(struct file_file_size)); + if (!entry) { + flb_errno(); + return NULL; + } + + filename_copy = flb_sds_create(filename); + if (!filename_copy) { + flb_free(entry); + flb_errno(); + return NULL; + } + + entry->filename = filename_copy; + entry->size = size; + + /* Initialize mutex for this file entry */ + if (flb_lock_init(&entry->lock) != 0) { + flb_plg_error(ctx->ins, "failed to initialize mutex for file %s", + filename); + flb_sds_destroy(filename_copy); + flb_free(entry); + return NULL; + } + + mk_list_add(&entry->_head, &ctx->file_sizes); + + return entry; +} + +/* Function to generate timestamp for rotated file */ +static void generate_timestamp(char *timestamp, size_t size) +{ + time_t now = time(NULL); + struct tm tm_info; + localtime_r(&now, &tm_info); + strftime(timestamp, size, "%Y%m%d_%H%M%S", &tm_info); +} + +/* Helper function to write gzip header (based on flb_gzip.c) */ +static void write_gzip_header(FILE *fp) +{ + uint8_t header[GZIP_HEADER_SIZE] = { + 0x1F, 0x8B, /* Magic bytes */ + 0x08, /* Compression method (deflate) */ + 0x00, /* Flags */ + 0x00, 0x00, 0x00, 0x00, /* Timestamp */ + 0x00, /* Compression flags */ + 0xFF /* OS (unknown) */ + }; + fwrite(header, 1, GZIP_HEADER_SIZE, fp); +} + +/* Helper function to write gzip footer */ +static void write_gzip_footer(FILE *fp, mz_ulong crc, size_t original_size) +{ + uint8_t footer[GZIP_FOOTER_SIZE]; + + /* Write CRC32 */ + footer[0] = crc & 0xFF; + footer[1] = (crc >> 8) & 0xFF; + footer[2] = (crc >> 16) & 0xFF; + footer[3] = (crc >> 24) & 0xFF; + + /* Write original size */ + footer[4] = original_size & 0xFF; + footer[5] = (original_size >> 8) & 0xFF; + footer[6] = (original_size >> 16) & 0xFF; + footer[7] = (original_size >> 24) & 0xFF; + + fwrite(footer, 1, GZIP_FOOTER_SIZE, fp); +} + +/* Function to compress a file using streaming gzip (memory-safe for large + * files) */ +static int gzip_compress_file(const char *input_filename, + const char *output_filename, + struct flb_output_instance *ins) +{ + FILE *src_fp = NULL, *dst_fp = NULL; + char *input_buffer = NULL, *output_buffer = NULL; + size_t bytes_read, output_buffer_size; + size_t total_input_size = 0; + mz_ulong crc = MZ_CRC32_INIT; + z_stream strm; + int ret = 0, flush, status; + int deflate_initialized = 0; + + /* Open source file */ + src_fp = fopen(input_filename, "rb"); + if (!src_fp) { + flb_plg_error(ins, "failed to open source file for gzip: %s", + input_filename); + return -1; + } + + /* Open destination file */ + dst_fp = fopen(output_filename, "wb"); + if (!dst_fp) { + flb_plg_error(ins, "failed to create gzip file: %s", output_filename); + fclose(src_fp); + return -1; + } + + /* Allocate input and output buffers */ + input_buffer = flb_malloc(GZIP_CHUNK_SIZE); + /* Use mz_compressBound to ensure sufficient buffer size for miniz */ + output_buffer_size = mz_compressBound(GZIP_CHUNK_SIZE); + output_buffer = flb_malloc(output_buffer_size); + + if (!input_buffer || !output_buffer) { + flb_plg_error(ins, "failed to allocate compression buffers"); + ret = -1; + goto cleanup; + } + + /* Write gzip header */ + write_gzip_header(dst_fp); + + /* Initialize deflate stream (raw deflate without gzip wrapper) */ + memset(&strm, 0, sizeof(strm)); + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + status = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + -Z_DEFAULT_WINDOW_BITS, 9, Z_DEFAULT_STRATEGY); + if (status != Z_OK) { + flb_plg_error(ins, "failed to initialize deflate stream"); + ret = -1; + goto cleanup; + } + deflate_initialized = 1; + + /* Process file in chunks */ + do { + bytes_read = fread(input_buffer, 1, GZIP_CHUNK_SIZE, src_fp); + if (bytes_read > 0) { + /* Update CRC and total size */ + crc = + mz_crc32(crc, (const unsigned char *)input_buffer, bytes_read); + total_input_size += bytes_read; + + /* Set up deflate input */ + strm.next_in = (Bytef *)input_buffer; + strm.avail_in = bytes_read; + + /* Determine flush mode based on EOF after this read */ + flush = feof(src_fp) ? Z_FINISH : Z_NO_FLUSH; + + /* Compress chunk */ + do { + strm.next_out = (Bytef *)output_buffer; + strm.avail_out = output_buffer_size; + + status = deflate(&strm, flush); + if (status == Z_STREAM_ERROR) { + flb_plg_error(ins, + "deflate stream error during compression"); + ret = -1; + goto deflate_cleanup; + } + + /* Write compressed data */ + size_t compressed_bytes = output_buffer_size - strm.avail_out; + if (compressed_bytes > 0) { + if (fwrite(output_buffer, 1, compressed_bytes, dst_fp) != + compressed_bytes) { + flb_plg_error(ins, "failed to write compressed data"); + ret = -1; + goto deflate_cleanup; + } + } + } while (strm.avail_out == 0); + + /* Verify all input was consumed */ + if (strm.avail_in != 0) { + flb_plg_error(ins, "deflate did not consume all input data"); + ret = -1; + goto deflate_cleanup; + } + } + } while (bytes_read > 0 && status != Z_STREAM_END); + + /* + * If the file size is a multiple of GZIP_CHUNK_SIZE, the loop above + * finishes because bytes_read == 0, but Z_FINISH was never called (flush + * was Z_NO_FLUSH). We must ensure the stream is finished. + */ + if (status != Z_STREAM_END) { + strm.next_in = Z_NULL; + strm.avail_in = 0; + + do { + strm.next_out = (Bytef *)output_buffer; + strm.avail_out = output_buffer_size; + + status = deflate(&strm, Z_FINISH); + if (status == Z_STREAM_ERROR) { + flb_plg_error(ins, "deflate stream error during final flush"); + ret = -1; + goto deflate_cleanup; + } + + size_t compressed_bytes = output_buffer_size - strm.avail_out; + if (compressed_bytes > 0) { + if (fwrite(output_buffer, 1, compressed_bytes, dst_fp) != + compressed_bytes) { + flb_plg_error( + ins, "failed to write compressed data (final flush)"); + ret = -1; + goto deflate_cleanup; + } + } + } while (status != Z_STREAM_END); + } + + /* Verify compression completed successfully */ + if (status != Z_STREAM_END) { + flb_plg_error(ins, "compression did not complete properly"); + ret = -1; + } + else { + /* Write gzip footer (CRC32 + original size) */ + write_gzip_footer(dst_fp, crc, total_input_size); + } + +deflate_cleanup: + if (deflate_initialized) { + deflateEnd(&strm); + deflate_initialized = 0; + } + +cleanup: + if (input_buffer) { + flb_free(input_buffer); + input_buffer = NULL; + } + if (output_buffer) { + flb_free(output_buffer); + output_buffer = NULL; + } + if (src_fp) { + fclose(src_fp); + src_fp = NULL; + } + if (dst_fp) { + fclose(dst_fp); + dst_fp = NULL; + } + + return ret; +} + +/* Function to rotate file */ +static int rotate_file(struct flb_file_conf *ctx, const char *filename, + struct file_file_size *entry) +{ + char timestamp[32]; + char *rotated_filename = NULL; + char *gzip_filename = NULL; + size_t file_size = 0; + int ret = 0; + + /* Caller must hold entry->lock */ + + rotated_filename = flb_malloc(PATH_MAX); + if (!rotated_filename) { + flb_errno(); + return -1; + } + + if (ctx->gzip == FLB_TRUE) { + gzip_filename = flb_malloc(PATH_MAX); + if (!gzip_filename) { + flb_free(rotated_filename); + flb_errno(); + return -1; + } + } + + file_size = entry->size; + + /* Log rotation event */ + flb_plg_info(ctx->ins, "rotating file: %s (current size: %zu bytes)", + filename, file_size); + + /* Generate timestamp */ + generate_timestamp(timestamp, sizeof(timestamp)); + + /* Create rotated filename with timestamp */ + snprintf(rotated_filename, PATH_MAX - 1, "%s.%s", filename, timestamp); + + /* Rename current file to rotated filename */ + if (rename(filename, rotated_filename) != 0) { + flb_plg_error(ctx->ins, "failed to rename file from %s to %s", filename, + rotated_filename); + ret = -1; + goto cleanup; + } + + /* If gzip is enabled, compress the rotated file */ + if (ctx->gzip == FLB_TRUE) { + snprintf(gzip_filename, PATH_MAX - 1, "%s.gz", rotated_filename); + flb_plg_debug(ctx->ins, "compressing file: %s to %s", rotated_filename, + gzip_filename); + ret = gzip_compress_file(rotated_filename, gzip_filename, ctx->ins); + if (ret == 0) { + /* Remove the uncompressed file */ +#ifdef FLB_SYSTEM_WINDOWS + DeleteFileA(rotated_filename); +#else + unlink(rotated_filename); +#endif + flb_plg_debug(ctx->ins, "rotated and compressed file: %s", + gzip_filename); + } + else { + /* Remove the failed gzip file */ +#ifdef FLB_SYSTEM_WINDOWS + DeleteFileA(gzip_filename); +#else + unlink(gzip_filename); +#endif + ret = -1; + goto cleanup; + } + } + else { + flb_plg_debug(ctx->ins, "rotated file: %s (no compression)", + rotated_filename); + } + + /* Reset file size in the entry since we rotated */ + entry->size = 0; + +cleanup: + if (rotated_filename) { + flb_free(rotated_filename); + } + if (gzip_filename) { + flb_free(gzip_filename); + } + + return ret; +} + +/* + * Function to validate if a filename matches the rotation pattern format + * Valid formats: + * - base_filename.YYYYMMDD_HHMMSS (15 chars after pattern) + * - base_filename.YYYYMMDD_HHMMSS.gz (18 chars after pattern) + */ +static int is_valid_rotation_filename(const char *filename, const char *pattern) +{ + size_t pattern_len = strlen(pattern); + size_t filename_len = strlen(filename); + const char *suffix; + size_t suffix_len; + int i; + + /* Check that filename starts with pattern */ + if (strncmp(filename, pattern, pattern_len) != 0) { + return 0; + } + + /* Get the suffix after the pattern */ + suffix = filename + pattern_len; + suffix_len = filename_len - pattern_len; + + /* Must be exactly 15 or 18 characters */ + if (suffix_len != 15 && suffix_len != 18) { + return 0; + } + + /* For 18 characters, must end with .gz */ + if (suffix_len == 18) { + if (strcmp(suffix + 15, ".gz") != 0) { + return 0; + } + } + + /* Validate timestamp format: YYYYMMDD_HHMMSS + * 8 digits (YYYYMMDD) + * underscore at position 8 + * 6 digits (HHMMSS) + */ + for (i = 0; i < 8; i++) { + if (suffix[i] < '0' || suffix[i] > '9') { + return 0; + } + } + if (suffix[8] != '_') { + return 0; + } + for (i = 9; i < 15; i++) { + if (suffix[i] < '0' || suffix[i] > '9') { + return 0; + } + } + + return 1; +} + +/* Function to clean up old rotated files */ +static int cleanup_old_files(struct flb_file_conf *ctx, const char *directory, + const char *base_filename) +{ + char pattern[PATH_MAX]; + char full_path[PATH_MAX]; +#ifdef FLB_SYSTEM_WINDOWS + char search_path[PATH_MAX]; +#endif + char **files = NULL; + int file_count = 0; + int max_files = ctx->max_files; + int i, j; + + /* Create pattern to match rotated files */ + snprintf(pattern, PATH_MAX - 1, "%s.", base_filename); + +#ifdef FLB_SYSTEM_WINDOWS + HANDLE hFind; + WIN32_FIND_DATA findData; + + /* Create search path: directory\* */ + snprintf(search_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "*", directory); + + hFind = FindFirstFileA(search_path, &findData); + if (hFind == INVALID_HANDLE_VALUE) { + return 0; /* Directory doesn't exist or can't be opened */ + } + + /* Count matching files */ + do { + /* Skip . and .. */ + if (strcmp(findData.cFileName, ".") == 0 || + strcmp(findData.cFileName, "..") == 0) { + continue; + } + /* Skip directories */ + if (findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + if (is_valid_rotation_filename(findData.cFileName, pattern)) { + file_count++; + } + } while (FindNextFileA(hFind, &findData) != 0); + + if (file_count <= max_files) { + FindClose(hFind); + return 0; + } + + /* Allocate array for file names */ + files = flb_calloc(file_count, sizeof(char *)); + if (!files) { + FindClose(hFind); + return -1; + } + + /* Collect file names - restart search */ + FindClose(hFind); + hFind = FindFirstFileA(search_path, &findData); + if (hFind == INVALID_HANDLE_VALUE) { + flb_free(files); + return -1; + } + + i = 0; + do { + /* Skip . and .. */ + if (strcmp(findData.cFileName, ".") == 0 || + strcmp(findData.cFileName, "..") == 0) { + continue; + } + /* Skip directories */ + if (findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + if (is_valid_rotation_filename(findData.cFileName, pattern) && + i < file_count) { + snprintf(full_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + directory, findData.cFileName); + files[i] = flb_strdup(full_path); + i++; + } + } while (FindNextFileA(hFind, &findData) != 0 && i < file_count); + + FindClose(hFind); +#else + DIR *dir; + struct dirent *entry; + + dir = opendir(directory); + if (!dir) { + return 0; /* Directory doesn't exist or can't be opened */ + } + + /* Count matching files */ + while ((entry = readdir(dir)) != NULL) { + if (is_valid_rotation_filename(entry->d_name, pattern)) { + file_count++; + } + } + + if (file_count <= max_files) { + closedir(dir); + return 0; + } + + /* Allocate array for file names */ + files = flb_calloc(file_count, sizeof(char *)); + if (!files) { + closedir(dir); + return -1; + } + + /* Collect file names */ + rewinddir(dir); + i = 0; + while ((entry = readdir(dir)) != NULL && i < file_count) { + if (is_valid_rotation_filename(entry->d_name, pattern)) { + snprintf(full_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + directory, entry->d_name); + files[i] = flb_strdup(full_path); + i++; + } + } + closedir(dir); +#endif + + /* Sort files by modification time (oldest first) */ + for (i = 0; i < file_count - 1; i++) { + for (j = i + 1; j < file_count; j++) { + struct stat st1; + struct stat st2; + + if (!files[i] || !files[j]) { + continue; + } + + if (stat(files[i], &st1) == 0 && stat(files[j], &st2) == 0) { + if (st1.st_mtime > st2.st_mtime) { + char *temp = files[i]; + files[i] = files[j]; + files[j] = temp; + } + } + } + } + + /* Remove oldest files */ + if (file_count > max_files) { + flb_plg_info( + ctx->ins, + "cleaning up old rotated files: removing %d files (keeping %d)", + file_count - max_files, max_files); + } + for (i = 0; i < file_count - max_files; i++) { + if (files[i]) { +#ifdef FLB_SYSTEM_WINDOWS + if (DeleteFileA(files[i]) != 0) { +#else + if (unlink(files[i]) == 0) { +#endif + flb_plg_debug(ctx->ins, "removed old rotated file: %s", + files[i]); + } + flb_free(files[i]); + } + } + + /* Free remaining file names */ + for (i = file_count - max_files; i < file_count; i++) { + if (files[i]) { + flb_free(files[i]); + } + } + + flb_free(files); + return 0; +} + static void cb_file_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, - struct flb_input_instance *ins, - void *out_context, + struct flb_input_instance *ins, void *out_context, struct flb_config *config) { int ret; + int ret_val = FLB_OK; int column_names; - FILE * fp; + FILE *fp; size_t off = 0; size_t last_off = 0; size_t alloc_size = 0; size_t total; - char out_file[PATH_MAX * 2]; - char sanitized_tag[PATH_MAX]; + size_t file_size = 0; + char *out_file = NULL; + char *sanitized_tag = NULL; char *buf; + char *out_file_copy; + char *directory = NULL; + char *base_filename = NULL; long file_pos; + bool have_directory; struct flb_file_conf *ctx = out_context; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; - char* out_file_copy; + struct file_file_size *entry = NULL; + struct stat st; + bool entry_just_created = false; + + (void)config; + + /* Allocate buffers on heap to avoid stack overflow with sanitizers */ + out_file = flb_malloc(PATH_MAX * 2); + if (!out_file) { + flb_errno(); + FLB_OUTPUT_RETURN(FLB_ERROR); + } - (void) config; + sanitized_tag = flb_malloc(PATH_MAX); + if (!sanitized_tag) { + flb_errno(); + flb_free(out_file); + FLB_OUTPUT_RETURN(FLB_ERROR); + } /* Set the right output file */ if (ctx->out_file == NULL) { - ret = sanitize_tag_name(event_chunk->tag, - sanitized_tag, - sizeof(sanitized_tag)); + ret = sanitize_tag_name(event_chunk->tag, sanitized_tag, PATH_MAX); if (ret != 0) { flb_plg_error(ctx->ins, "failed to sanitize tag for output file"); @@ -620,11 +1297,11 @@ static void cb_file_flush(struct flb_event_chunk *event_chunk, if (ctx->out_path) { if (ctx->out_file) { - snprintf(out_file, sizeof(out_file) , "%s" FLB_PATH_SEPARATOR "%s", + snprintf(out_file, PATH_MAX * 2, "%s" FLB_PATH_SEPARATOR "%s", ctx->out_path, ctx->out_file); } else { - snprintf(out_file, sizeof(out_file), "%s" FLB_PATH_SEPARATOR "%s", + snprintf(out_file, PATH_MAX * 2, "%s" FLB_PATH_SEPARATOR "%s", ctx->out_path, sanitized_tag); } } @@ -637,10 +1314,121 @@ static void cb_file_flush(struct flb_event_chunk *event_chunk, } } + /* Rotation logic - only if files_rotation enabled and max_size > 0 */ + if (ctx->files_rotation == FLB_TRUE && ctx->max_size > 0) { + /* Find or create file size entry and acquire lock (Hand-Over-Hand) */ + if (flb_lock_acquire(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + ret_val = FLB_ERROR; + goto cleanup; + } + + entry = find_file_size_entry(ctx, out_file); + if (entry == NULL) { + /* Entry doesn't exist yet, create it with initial size 0 */ + entry = create_file_size_entry(ctx, out_file, 0); + if (entry == NULL) { + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + ret_val = FLB_ERROR; + goto cleanup; + } + entry_just_created = true; + } + + /* Acquire lock before any file operations */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_error(ctx->ins, "failed to acquire lock for file %s", + out_file); + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + ret_val = FLB_ERROR; + goto cleanup; + } + + /* Release list lock now that we hold the entry lock */ + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + + /* If entry was just created, seed size from on-disk file to handle + * pre-existing files that may already exceed max_size on startup. + * This ensures rotation decision is correct on first flush. + */ + if (entry_just_created && entry->size == 0) { + if (stat(out_file, &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } + + /* Check if file needs rotation based on current size counter */ + file_size = entry->size; + + if (file_size >= ctx->max_size) { + have_directory = false; + + directory = flb_malloc(PATH_MAX); + if (!directory) { + flb_errno(); + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + ret_val = FLB_ERROR; + goto cleanup; + } + directory[0] = '\0'; + + base_filename = flb_malloc(PATH_MAX); + if (!base_filename) { + flb_errno(); + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + ret_val = FLB_ERROR; + goto cleanup; + } + + /* Extract directory and base filename for cleanup */ + out_file_copy = flb_strdup(out_file); + if (out_file_copy) { +#ifdef FLB_SYSTEM_WINDOWS + PathRemoveFileSpecA(out_file_copy); + strncpy(directory, out_file_copy, PATH_MAX - 1); + directory[PATH_MAX - 1] = '\0'; +#else + strncpy(directory, dirname(out_file_copy), PATH_MAX - 1); + directory[PATH_MAX - 1] = '\0'; +#endif + flb_free(out_file_copy); + have_directory = true; + } + + /* Get base filename for cleanup */ + { + char *last_sep = strrchr(out_file, FLB_PATH_SEPARATOR[0]); + if (last_sep) { + strncpy(base_filename, last_sep + 1, PATH_MAX - 1); + } + else { + strncpy(base_filename, out_file, PATH_MAX - 1); + } + base_filename[PATH_MAX - 1] = '\0'; + } + + /* Rotate the file - passing entry, with lock held */ + if (rotate_file(ctx, out_file, entry) == 0) { + /* Clean up old rotated files */ + if (have_directory) { + cleanup_old_files(ctx, directory, base_filename); + } + } + } + } + /* Open output file with default name as the Tag */ - fp = fopen(out_file, "ab+"); + /* Use "a" mode for thread-safe append operations - automatically seeks to + * end */ + fp = fopen(out_file, "ab"); if (ctx->mkdir == FLB_TRUE && fp == NULL && errno == ENOENT) { - out_file_copy = strdup(out_file); + out_file_copy = flb_strdup(out_file); if (out_file_copy) { #ifdef FLB_SYSTEM_WINDOWS PathRemoveFileSpecA(out_file_copy); @@ -648,30 +1436,69 @@ static void cb_file_flush(struct flb_event_chunk *event_chunk, #else ret = mkpath(ctx->ins, dirname(out_file_copy)); #endif - free(out_file_copy); + flb_free(out_file_copy); if (ret == 0) { - fp = fopen(out_file, "ab+"); + fp = fopen(out_file, "ab"); } } } if (fp == NULL) { flb_errno(); flb_plg_error(ctx->ins, "error opening: %s", out_file); - FLB_OUTPUT_RETURN(FLB_ERROR); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_ERROR; + goto cleanup; + } + + /* Initialize file size counter if this is a new file (for rotation) */ + if (ctx->files_rotation == FLB_TRUE && ctx->max_size > 0 && entry != NULL) { + if (entry->size == 0) { + /* We already have the entry and the lock, update size directly. */ + /* Flush first to ensure all previous writes are visible */ + fflush(fp); + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } } /* * Get current file stream position, we gather this in case 'csv' format * needs to write the column names. + * With "a" mode, ftell() returns the current position (end of file). */ file_pos = ftell(fp); /* Check if the event type is metrics, handle the payload differently */ if (event_chunk->type == FLB_INPUT_METRICS) { - print_metrics_text(ctx->ins, fp, - event_chunk->data, event_chunk->size); + print_metrics_text(ctx->ins, fp, event_chunk->data, event_chunk->size); + /* Flush all buffered data before updating size counter */ + fflush(fp); +#ifndef FLB_SYSTEM_WINDOWS + /* Only fsync when rotation is enabled - needed for accurate size + * tracking */ + if (ctx->files_rotation == FLB_TRUE && ctx->max_size > 0) { + fsync(fileno(fp)); + } +#endif + /* Update file size counter - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } fclose(fp); - FLB_OUTPUT_RETURN(FLB_OK); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_OK; + goto cleanup; } /* @@ -683,57 +1510,110 @@ static void cb_file_flush(struct flb_event_chunk *event_chunk, total = 0; do { - ret = fwrite((char *) event_chunk->data + off, 1, + ret = fwrite((char *)event_chunk->data + off, 1, event_chunk->size - off, fp); if (ret < 0) { flb_errno(); fclose(fp); - FLB_OUTPUT_RETURN(FLB_RETRY); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_RETRY; + goto cleanup; } total += ret; } while (total < event_chunk->size); + /* Flush all buffered data before updating size counter */ + fflush(fp); +#ifndef FLB_SYSTEM_WINDOWS + /* Only fsync when rotation is enabled - needed for accurate size + * tracking */ + if (ctx->files_rotation == FLB_TRUE && ctx->max_size > 0) { + fsync(fileno(fp)); + } +#endif + /* Update file size counter - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } fclose(fp); - FLB_OUTPUT_RETURN(FLB_OK); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_OK; + goto cleanup; } - ret = flb_log_event_decoder_init(&log_decoder, - (char *) event_chunk->data, + ret = flb_log_event_decoder_init(&log_decoder, (char *)event_chunk->data, event_chunk->size); if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - + flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", + ret); + + /* Flush any buffered data before updating size counter */ + fflush(fp); + /* Update file size counter before closing - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } fclose(fp); - FLB_OUTPUT_RETURN(FLB_ERROR); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_ERROR; + goto cleanup; } /* * Upon flush, for each array, lookup the time and the first field * of the map to use as a data point. */ - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == + FLB_EVENT_DECODER_SUCCESS) { alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */ last_off = off; - switch (ctx->format){ + switch (ctx->format) { case FLB_OUT_FILE_FMT_JSON: buf = flb_msgpack_to_json_str(alloc_size, log_event.body, config->json_escape_unicode); if (buf) { - fprintf(fp, "%s: [%"PRIu64".%09lu, %s]" NEWLINE, + fprintf(fp, "%s: [%" PRIu64 ".%09lu, %s]" NEWLINE, event_chunk->tag, - (uint64_t) log_event.timestamp.tm.tv_sec, log_event.timestamp.tm.tv_nsec, - buf); + (uint64_t)log_event.timestamp.tm.tv_sec, + log_event.timestamp.tm.tv_nsec, buf); flb_free(buf); } else { + /* Flush any buffered data before updating size counter */ + fflush(fp); + /* Update file size counter - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } flb_log_event_decoder_destroy(&log_decoder); fclose(fp); - FLB_OUTPUT_RETURN(FLB_RETRY); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_RETRY; + goto cleanup; } break; case FLB_OUT_FILE_FMT_CSV: @@ -744,23 +1624,19 @@ static void cb_file_flush(struct flb_event_chunk *event_chunk, else { column_names = FLB_FALSE; } - csv_output(fp, column_names, - &log_event.timestamp, - log_event.body, ctx); + csv_output(fp, column_names, &log_event.timestamp, log_event.body, + ctx); break; case FLB_OUT_FILE_FMT_LTSV: - ltsv_output(fp, - &log_event.timestamp, - log_event.body, ctx); + ltsv_output(fp, &log_event.timestamp, log_event.body, ctx); break; case FLB_OUT_FILE_FMT_PLAIN: - plain_output(fp, log_event.body, alloc_size, config->json_escape_unicode); + plain_output(fp, log_event.body, alloc_size, + config->json_escape_unicode); break; case FLB_OUT_FILE_FMT_TEMPLATE: - template_output(fp, - &log_event.timestamp, - log_event.body, ctx); + template_output(fp, &log_event.timestamp, log_event.body, ctx); break; } @@ -768,88 +1644,143 @@ static void cb_file_flush(struct flb_event_chunk *event_chunk, flb_log_event_decoder_destroy(&log_decoder); + /* Flush all buffered data before updating size counter */ + fflush(fp); +#ifndef FLB_SYSTEM_WINDOWS + /* Only fsync when rotation is enabled - needed for accurate size tracking + */ + if (ctx->files_rotation == FLB_TRUE && ctx->max_size > 0) { + fsync(fileno(fp)); + } +#endif + /* Update file size counter - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } fclose(fp); - FLB_OUTPUT_RETURN(FLB_OK); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + + ret_val = FLB_OK; + +cleanup: + if (out_file) { + flb_free(out_file); + } + if (sanitized_tag) { + flb_free(sanitized_tag); + } + if (directory) { + flb_free(directory); + } + if (base_filename) { + flb_free(base_filename); + } + FLB_OUTPUT_RETURN(ret_val); } static int cb_file_exit(void *data, struct flb_config *config) { struct flb_file_conf *ctx = data; + struct mk_list *head; + struct mk_list *tmp; + struct file_file_size *entry; if (!ctx) { return 0; } + /* Free all file size entries from linked list */ + flb_lock_acquire(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + mk_list_foreach_safe(head, tmp, &ctx->file_sizes) + { + entry = mk_list_entry(head, struct file_file_size, _head); + mk_list_del(&entry->_head); + /* Destroy mutex before freeing entry */ + flb_lock_destroy(&entry->lock); + if (entry->filename) { + flb_sds_destroy(entry->filename); + } + flb_free(entry); + } + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + flb_lock_destroy(&ctx->list_lock); + flb_free(ctx); return 0; } /* Configuration properties map */ static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "path", NULL, - 0, FLB_TRUE, offsetof(struct flb_file_conf, out_path), - "Absolute path to store the files. This parameter is optional" - }, + {FLB_CONFIG_MAP_STR, "path", NULL, 0, FLB_TRUE, + offsetof(struct flb_file_conf, out_path), + "Absolute path to store the files. This parameter is optional"}, - { - FLB_CONFIG_MAP_STR, "file", NULL, - 0, FLB_TRUE, offsetof(struct flb_file_conf, out_file), + {FLB_CONFIG_MAP_STR, "file", NULL, 0, FLB_TRUE, + offsetof(struct flb_file_conf, out_file), "Name of the target file to write the records. If 'path' is specified, " - "the value is prefixed" - }, + "the value is prefixed"}, - { - FLB_CONFIG_MAP_STR, "format", NULL, - 0, FLB_FALSE, 0, + {FLB_CONFIG_MAP_STR, "format", NULL, 0, FLB_FALSE, 0, "Specify the output data format, the available options are: plain (json), " - "csv, ltsv and template. If no value is set the outgoing data is formatted " - "using the tag and the record in json" - }, + "csv, ltsv and template. If no value is set the outgoing data is " + "formatted " + "using the tag and the record in json"}, - { - FLB_CONFIG_MAP_STR, "delimiter", NULL, - 0, FLB_FALSE, 0, - "Set a custom delimiter for the records" - }, + {FLB_CONFIG_MAP_STR, "delimiter", NULL, 0, FLB_FALSE, 0, + "Set a custom delimiter for the records"}, - { - FLB_CONFIG_MAP_STR, "label_delimiter", NULL, - 0, FLB_FALSE, 0, - "Set a custom label delimiter, to be used with 'ltsv' format" - }, + {FLB_CONFIG_MAP_STR, "label_delimiter", NULL, 0, FLB_FALSE, 0, + "Set a custom label delimiter, to be used with 'ltsv' format"}, - { - FLB_CONFIG_MAP_STR, "template", "{time} {message}", - 0, FLB_TRUE, offsetof(struct flb_file_conf, template), - "Set a custom template format for the data" - }, + {FLB_CONFIG_MAP_STR, "template", "{time} {message}", 0, FLB_TRUE, + offsetof(struct flb_file_conf, template), + "Set a custom template format for the data"}, - { - FLB_CONFIG_MAP_BOOL, "csv_column_names", "false", - 0, FLB_TRUE, offsetof(struct flb_file_conf, csv_column_names), - "Add column names (keys) in the first line of the target file" - }, + {FLB_CONFIG_MAP_BOOL, "csv_column_names", "false", 0, FLB_TRUE, + offsetof(struct flb_file_conf, csv_column_names), + "Add column names (keys) in the first line of the target file"}, - { - FLB_CONFIG_MAP_BOOL, "mkdir", "false", - 0, FLB_TRUE, offsetof(struct flb_file_conf, mkdir), - "Recursively create output directory if it does not exist. Permissions set to 0755" - }, + {FLB_CONFIG_MAP_BOOL, "mkdir", "false", 0, FLB_TRUE, + offsetof(struct flb_file_conf, mkdir), + "Recursively create output directory if it does not exist. Permissions " + "set to 0755"}, + + {FLB_CONFIG_MAP_BOOL, "files_rotation", "false", 0, FLB_TRUE, + offsetof(struct flb_file_conf, files_rotation), + "Enable file rotation feature (default: false)"}, + + {FLB_CONFIG_MAP_SIZE, "max_size", "100000000", 0, FLB_TRUE, + offsetof(struct flb_file_conf, max_size), + "Maximum size of file before rotation (default: 100MB)"}, + + {FLB_CONFIG_MAP_INT, "max_files", "7", 0, FLB_TRUE, + offsetof(struct flb_file_conf, max_files), + "Maximum number of rotated files to keep (default: 7)"}, + + {FLB_CONFIG_MAP_BOOL, "gzip", "true", 0, FLB_TRUE, + offsetof(struct flb_file_conf, gzip), + "Whether to gzip rotated files (default: true)"}, /* EOF */ - {0} -}; + {0}}; struct flb_output_plugin out_file_plugin = { - .name = "file", - .description = "Generate log file", - .cb_init = cb_file_init, - .cb_flush = cb_file_flush, - .cb_exit = cb_file_exit, - .flags = 0, - .workers = 1, - .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS, - .config_map = config_map, + .name = "file", + .description = "Generate log file", + .cb_init = cb_file_init, + .cb_flush = cb_file_flush, + .cb_exit = cb_file_exit, + .flags = 0, + .workers = 1, + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS, + .config_map = config_map, }; diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index dd76c16faee..f63863cdb86 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -234,6 +234,7 @@ if(FLB_IN_LIB) # These plugins work only on Linux if(NOT FLB_SYSTEM_WINDOWS) FLB_RT_TEST(FLB_OUT_FILE "out_file.c") + FLB_RT_TEST(FLB_OUT_FILE "out_file_logrotate.c") endif() FLB_RT_TEST(FLB_OUT_S3 "out_s3.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") diff --git a/tests/runtime/out_file_logrotate.c b/tests/runtime/out_file_logrotate.c new file mode 100644 index 00000000000..e0a82fe503d --- /dev/null +++ b/tests/runtime/out_file_logrotate.c @@ -0,0 +1,1541 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include "flb_tests_runtime.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifndef FLB_SYSTEM_WINDOWS +#include +#include +#define TEST_MKDIR(path) mkdir(path, 0755) +#define PATH_SEPARATOR "/" +#else +#include +#include +#define TEST_MKDIR(path) _mkdir(path) +#define PATH_SEPARATOR "\\" +/* Windows S_ISDIR compatibility */ +#ifndef S_ISDIR +#define S_ISDIR(mode) (((mode) & S_IFMT) == S_IFDIR) +#endif +#endif + +/* Test data */ +#include "data/common/json_invalid.h" /* JSON_INVALID */ +#include "data/common/json_long.h" /* JSON_LONG */ +#include "data/common/json_small.h" /* JSON_SMALL */ + +/* Test functions */ +void flb_test_file_logrotate_basic_rotation(void); +void flb_test_file_logrotate_gzip_compression(void); +void flb_test_file_logrotate_gzip_compression_exact_chunk(void); +void flb_test_file_logrotate_max_files_cleanup(void); +void flb_test_file_logrotate_max_files_validation(void); +void flb_test_file_logrotate_format_csv(void); +void flb_test_file_logrotate_format_ltsv(void); +void flb_test_file_logrotate_format_plain(void); +void flb_test_file_logrotate_format_msgpack(void); +void flb_test_file_logrotate_format_template(void); +void flb_test_file_logrotate_path(void); +void flb_test_file_logrotate_mkdir(void); +void flb_test_file_logrotate_delimiter(void); +void flb_test_file_logrotate_label_delimiter(void); +void flb_test_file_logrotate_csv_column_names(void); +void flb_test_file_logrotate_multithreaded(void); + +/* Test list */ +TEST_LIST = { + {"basic_rotation", flb_test_file_logrotate_basic_rotation}, + {"gzip_compression", flb_test_file_logrotate_gzip_compression}, + {"gzip_compression_exact_chunk", + flb_test_file_logrotate_gzip_compression_exact_chunk}, + {"max_files_cleanup", flb_test_file_logrotate_max_files_cleanup}, + {"max_files_validation", flb_test_file_logrotate_max_files_validation}, + {"logrotate_format_csv", flb_test_file_logrotate_format_csv}, + {"logrotate_format_ltsv", flb_test_file_logrotate_format_ltsv}, + {"logrotate_format_plain", flb_test_file_logrotate_format_plain}, + {"logrotate_format_msgpack", flb_test_file_logrotate_format_msgpack}, + {"logrotate_format_template", flb_test_file_logrotate_format_template}, + {"logrotate_path", flb_test_file_logrotate_path}, + {"logrotate_mkdir", flb_test_file_logrotate_mkdir}, + {"logrotate_delimiter", flb_test_file_logrotate_delimiter}, + {"logrotate_label_delimiter", flb_test_file_logrotate_label_delimiter}, + {"logrotate_csv_column_names", flb_test_file_logrotate_csv_column_names}, + {"logrotate_multithreaded", flb_test_file_logrotate_multithreaded}, + {NULL, NULL}}; + +#define TEST_LOGFILE "flb_test_file_logrotate.log" +#define TEST_LOGPATH "out_logrotate" +#define TEST_TIMEOUT 10 + +/* Helper function to recursively delete directory and all its contents */ +static int recursive_delete_directory(const char *dir_path) +{ +#ifdef FLB_SYSTEM_WINDOWS + WIN32_FIND_DATAA ffd; + HANDLE hFind = INVALID_HANDLE_VALUE; + char search_path[PATH_MAX]; + char file_path[PATH_MAX]; + int ret = 0; + + if (dir_path == NULL) { + return -1; + } + + /* Create search path: dir_path\* */ + snprintf(search_path, sizeof(search_path), "%s\\*", dir_path); + search_path[sizeof(search_path) - 1] = '\0'; + + hFind = FindFirstFileA(search_path, &ffd); + if (hFind == INVALID_HANDLE_VALUE) { + /* Directory doesn't exist or can't be opened, consider it success */ + return 0; + } + + do { + /* Skip . and .. */ + if (strcmp(ffd.cFileName, ".") == 0 || + strcmp(ffd.cFileName, "..") == 0) { + continue; + } + + /* Build full path */ + snprintf(file_path, sizeof(file_path), "%s\\%s", dir_path, + ffd.cFileName); + file_path[sizeof(file_path) - 1] = '\0'; + + /* Recursively delete subdirectories */ + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + if (recursive_delete_directory(file_path) != 0) { + ret = -1; + } + } + else { + /* Delete file - clear read-only if needed */ + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_READONLY) { + SetFileAttributesA(file_path, ffd.dwFileAttributes & + ~FILE_ATTRIBUTE_READONLY); + } + if (DeleteFileA(file_path) == 0) { + ret = -1; + } + } + } while (FindNextFileA(hFind, &ffd) != 0); + + FindClose(hFind); + + /* Remove the directory itself */ + if (RemoveDirectoryA(dir_path) == 0) { + ret = -1; + } + + return ret; +#else + DIR *dir; + struct dirent *entry; + struct stat statbuf; + char path[PATH_MAX]; + int ret = 0; + + if (dir_path == NULL) { + return -1; + } + + /* Check if directory exists */ + if (stat(dir_path, &statbuf) != 0) { + /* Directory doesn't exist, consider it success */ + return 0; + } + + /* Check if it's actually a directory */ + if (!S_ISDIR(statbuf.st_mode)) { + /* Not a directory, try to remove as file */ + return remove(dir_path); + } + + /* Open directory */ + dir = opendir(dir_path); + if (dir == NULL) { + return -1; + } + + /* Iterate through directory entries */ + while ((entry = readdir(dir)) != NULL) { + /* Skip . and .. */ + if (strcmp(entry->d_name, ".") == 0 || + strcmp(entry->d_name, "..") == 0) { + continue; + } + + /* Build full path */ + snprintf(path, sizeof(path), "%s/%s", dir_path, entry->d_name); + + /* Get file status */ + if (stat(path, &statbuf) != 0) { + continue; + } + + /* Recursively delete subdirectories */ + if (S_ISDIR(statbuf.st_mode)) { + if (recursive_delete_directory(path) != 0) { + ret = -1; + } + } + else { + /* Delete file */ + if (unlink(path) != 0) { + ret = -1; + } + } + } + + closedir(dir); + + /* Remove the directory itself */ + if (rmdir(dir_path) != 0) { + ret = -1; + } + + return ret; +#endif +} + +/* Helper function to count files in directory */ +#ifdef FLB_SYSTEM_WINDOWS +static int count_files_in_directory(const char *dir_path, const char *prefix) +{ + WIN32_FIND_DATAA ffd; + HANDLE hFind = INVALID_HANDLE_VALUE; + char search_path[PATH_MAX]; + int count = 0; + + snprintf(search_path, sizeof(search_path), "%s\\*", dir_path); + hFind = FindFirstFileA(search_path, &ffd); + if (hFind == INVALID_HANDLE_VALUE) { + return -1; + } + + do { + if (strncmp(ffd.cFileName, prefix, strlen(prefix)) == 0) { + count++; + } + } while (FindNextFileA(hFind, &ffd) != 0); + + FindClose(hFind); + return count; +} +#else +static int count_files_in_directory(const char *dir_path, const char *prefix) +{ + DIR *dir; + struct dirent *entry; + int count = 0; + + dir = opendir(dir_path); + if (dir == NULL) { + return -1; + } + + while ((entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, prefix, strlen(prefix)) == 0) { + count++; + } + } + + closedir(dir); + return count; +} +#endif + +/* + * Helper function: Wait for a file matching the pattern "prefix*suffix" to + * appear in dir_path + */ +#ifdef FLB_SYSTEM_WINDOWS +static int wait_for_file_pattern(const char *dir_path, const char *prefix, + const char *suffix, int time_limit) +{ + int elapsed_time, found = 0; + WIN32_FIND_DATAA ffd; + HANDLE hFind = INVALID_HANDLE_VALUE; + char search_path[PATH_MAX]; + size_t prefix_len = strlen(prefix); + size_t suffix_len = strlen(suffix); + + snprintf(search_path, sizeof(search_path), "%s\\*", dir_path); + + for (elapsed_time = 0; elapsed_time < time_limit && !found; + elapsed_time++) { + hFind = FindFirstFileA(search_path, &ffd); + if (hFind != INVALID_HANDLE_VALUE) { + do { + if (strncmp(ffd.cFileName, prefix, prefix_len) == 0 && + strlen(ffd.cFileName) > prefix_len + suffix_len && + strcmp(ffd.cFileName + strlen(ffd.cFileName) - suffix_len, + suffix) == 0) { + found = 1; + break; + } + } while (FindNextFileA(hFind, &ffd) != 0); + FindClose(hFind); + } + if (!found) { + flb_time_msleep(1000); + } + } + return found ? 0 : -1; +} +#else +static int wait_for_file_pattern(const char *dir_path, const char *prefix, + const char *suffix, int time_limit) +{ + int elapsed_time, found = 0; + DIR *dir; + struct dirent *entry; + size_t prefix_len = strlen(prefix); + size_t suffix_len = strlen(suffix); + + for (elapsed_time = 0; elapsed_time < time_limit && !found; + elapsed_time++) { + dir = opendir(dir_path); + if (dir) { + while ((entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, prefix, prefix_len) == 0 && + strlen(entry->d_name) > prefix_len + suffix_len && + strcmp(entry->d_name + strlen(entry->d_name) - suffix_len, + suffix) == 0) { + found = 1; + break; + } + } + closedir(dir); + } + if (!found) { + flb_time_msleep(1000); + } + } + return found ? 0 : -1; +} +#endif + +/* Helper function: Wait for a file to exist and have a minimum size */ +static int wait_for_file_size(const char *path, size_t min_size, int time_limit) +{ + int elapsed_time; + struct stat st; + + for (elapsed_time = 0; elapsed_time < time_limit; elapsed_time++) { + if (stat(path, &st) == 0 && st.st_size >= min_size) { + return 0; + } + flb_time_msleep(1000); + } + return -1; +} + +/* Helper function to read file content into buffer */ +static char *read_file_content(const char *filename, size_t *out_size) +{ + FILE *fp; + char *buffer; + struct stat st; + size_t size; + + if (stat(filename, &st) != 0) { + return NULL; + } + + size = st.st_size; + fp = fopen(filename, "rb"); + if (!fp) { + return NULL; + } + + buffer = flb_malloc(size + 1); + if (!buffer) { + fclose(fp); + return NULL; + } + + if (fread(buffer, 1, size, fp) != size) { + flb_free(buffer); + fclose(fp); + return NULL; + } + + buffer[size] = '\0'; + fclose(fp); + *out_size = size; + return buffer; +} + +/* Format Tests */ +void flb_test_file_logrotate_format_csv(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_csv.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "csv", "files_rotation", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify CSV format - should contain commas as delimiters */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* CSV should contain commas */ + TEST_CHECK(strstr(content, ",") != NULL); + /* CSV should contain timestamp */ + TEST_CHECK(strstr(content, "1448403340") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_format_ltsv(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_ltsv.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "ltsv", "files_rotation", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify LTSV format - should contain colons (label delimiter) and tabs */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* LTSV should contain colons for label:value pairs */ + TEST_CHECK(strstr(content, ":") != NULL); + /* Should contain "time" label */ + TEST_CHECK(strstr(content, "time") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_format_plain(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_plain.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "plain", "files_rotation", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify plain format - should be JSON without tag/timestamp prefix */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Plain format should contain JSON */ + TEST_CHECK(strstr(content, "{") != NULL); + /* Should not contain tag prefix like "test: [" */ + TEST_CHECK(strstr(content, "test: [") == NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_format_msgpack(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[512]; + struct stat st; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_msgpack.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "msgpack", "files_rotation", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify msgpack format - should be binary data */ + if (stat(logfile, &st) == 0) { + TEST_CHECK(st.st_size > 0); + /* Msgpack files should not be readable as text (no newlines in first + * bytes) + */ + fp = fopen(logfile, "rb"); + if (fp) { + unsigned char first_bytes[10]; + size_t read_bytes = fread(first_bytes, 1, 10, fp); + fclose(fp); + if (read_bytes > 0) { + /* + * Msgpack typically starts with array markers (0x91, 0x92, + * etc.) or map markers. Just verify it's not plain text JSON. + */ + TEST_CHECK(first_bytes[0] != '{' && first_bytes[0] != '['); + } + } + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_format_template(void) +{ + int i; + int ret; + int bytes; + /* Use JSON with specific fields for template testing */ + const char *json_template = "[1448403340, {\"message\": \"test log " + "entry\", \"level\": \"info\"}]"; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_template.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "template", "template", + "{time} {message}", "files_rotation", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, (char *)json_template, + strlen(json_template)); + TEST_CHECK(bytes == strlen(json_template)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify template format - should contain substituted values */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Template should contain the message value */ + TEST_CHECK(strstr(content, "test log entry") != NULL); + /* Should contain timestamp (as float) */ + TEST_CHECK(strstr(content, "1448403340") != NULL || + strstr(content, ".") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +/* Configuration Option Tests */ +void flb_test_file_logrotate_path(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[PATH_MAX]; + char test_path[PATH_MAX]; + + snprintf(test_path, sizeof(test_path), "%s" PATH_SEPARATOR "path_test", + TEST_LOGPATH); + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); +/* Construct logfile path - test_path is short so this is safe */ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wformat-truncation" + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "path_test.log", + test_path); +#pragma GCC diagnostic pop + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "path", test_path, + "file", "path_test.log", "mkdir", "true", + "files_rotation", "true", "max_size", "100M", + "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify file was created in the specified path */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + fclose(fp); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_mkdir(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[PATH_MAX]; + char nested_path[PATH_MAX]; + struct stat st; + + snprintf(nested_path, sizeof(nested_path), + "%s" PATH_SEPARATOR "nested" PATH_SEPARATOR "deep" PATH_SEPARATOR + "path", + TEST_LOGPATH); +/* Construct logfile path - nested_path is short so this is safe */ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wformat-truncation" + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "test_mkdir.log", + nested_path); +#pragma GCC diagnostic pop + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "mkdir", "true", "files_rotation", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify nested directory was created */ + TEST_CHECK(stat(nested_path, &st) == 0); + TEST_CHECK(S_ISDIR(st.st_mode)); + + /* Verify file was created */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + fclose(fp); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_delimiter(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_delimiter.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "csv", "delimiter", "tab", + "files_rotation", "true", "max_size", "100M", + "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify tab delimiter is used (should contain tabs, not + * commas) */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain tab characters */ + int has_tab = 0; + int j; + for (j = 0; j < content_size; j++) { + if (content[j] == '\t') { + has_tab = 1; + break; + } + } + TEST_CHECK(has_tab); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_label_delimiter(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_label_delimiter.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "ltsv", "label_delimiter", "comma", + "files_rotation", "true", "max_size", "100M", + "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify custom label delimiter is used */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain "," as label delimiter (comma) */ + TEST_CHECK(strstr(content, ",") != NULL); + /* Should contain "time" label with comma delimiter */ + /* LTSV format prints "time" (with quotes) followed by + * delimiter */ + TEST_CHECK(strstr(content, "\"time\",") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_csv_column_names(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_csv_columns.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "csv", "csv_column_names", "true", + "files_rotation", "true", "max_size", "100M", + "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify CSV column names header exists */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* First line should contain "timestamp" */ + TEST_CHECK(strstr(content, "timestamp") != NULL); + /* Should contain key names from JSON */ + TEST_CHECK(strstr(content, "key_0") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +/* Multithreaded Test */ +struct thread_data { + flb_ctx_t *ctx; + int in_ffd; + int thread_id; + int events_per_thread; + char *json_data; + size_t json_len; + int *success; + pthread_mutex_t *mutex; +}; + +static void *thread_worker(void *arg) +{ + struct thread_data *data = (struct thread_data *)arg; + int i; + int bytes; + + for (i = 0; i < data->events_per_thread; i++) { + bytes = flb_lib_push(data->ctx, data->in_ffd, data->json_data, + data->json_len); + if (bytes != (int)data->json_len) { + pthread_mutex_lock(data->mutex); + *data->success = 0; + pthread_mutex_unlock(data->mutex); + return NULL; + } + /* Small delay to allow interleaving */ + flb_time_msleep(10); + } + + return NULL; +} + +void flb_test_file_logrotate_multithreaded(void) +{ + int ret; + int i; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + pthread_t threads[8]; + struct thread_data thread_data[8]; + pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + int success = 1; + int num_threads = 4; + int events_per_thread = 10; + FILE *fp; + char *content; + size_t content_size; + int line_count = 0; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_multithreaded.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "0.5", "Grace", "2", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "files_rotation", "true", "max_size", "1M", + "max_files", "5", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Prepare thread data */ + for (i = 0; i < num_threads; i++) { + thread_data[i].ctx = ctx; + thread_data[i].in_ffd = in_ffd; + thread_data[i].thread_id = i; + thread_data[i].events_per_thread = events_per_thread; + thread_data[i].json_data = p; + thread_data[i].json_len = strlen(p); + thread_data[i].success = &success; + thread_data[i].mutex = &mutex; + } + + /* Create and start threads */ + for (i = 0; i < num_threads; i++) { + ret = pthread_create(&threads[i], NULL, thread_worker, &thread_data[i]); + TEST_CHECK(ret == 0); + } + + /* Wait for all threads to complete */ + for (i = 0; i < num_threads; i++) { + pthread_join(threads[i], NULL); + } + + /* Wait for flush to complete - allow multiple flush cycles */ + flb_time_msleep(3000); + + /* Wait for file to exist and have content before stopping */ + ret = wait_for_file_size(logfile, 100 * 1024, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify all data was written correctly */ + TEST_CHECK(success == 1); + + /* Verify file exists and has content */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + char line[4096]; + while (fgets(line, sizeof(line), fp) != NULL) { + line_count++; + } + fclose(fp); + } + + /* Should have at least num_threads * events_per_thread records + */ + /* (may be more due to JSON format adding tag prefix) */ + TEST_CHECK(line_count >= num_threads * events_per_thread); + + /* Verify file content is valid - read and check for expected + * data */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain tag */ + TEST_CHECK(strstr(content, "test") != NULL); + /* Should contain timestamp */ + TEST_CHECK(strstr(content, "1448403340") != NULL); + /* Count occurrences of key_0 to verify records */ + int key_count = 0; + char *pos = content; + while ((pos = strstr(pos, "key_0")) != NULL) { + key_count++; + pos++; + } + TEST_CHECK(key_count >= num_threads * events_per_thread); + flb_free(content); + } + + pthread_mutex_destroy(&mutex); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +}; + +void flb_test_file_logrotate_basic_rotation(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[512]; + time_t now = time(NULL); + struct tm tm_info; + char timestamp[32]; + + localtime_r(&now, &tm_info); + strftime(timestamp, sizeof(timestamp), "%Y%m%d_%H%M%S", &tm_info); + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "files_rotation", "true", "max_size", "5K", + "max_files", "3", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to fill the file (JSON_SMALL is ~4KB, 4 events = ~16KB) + */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + /* Wait for file to be created */ + ret = wait_for_file_size(logfile, 10 * 1024, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Wait a bit more to ensure flush completes and file size is + * updated */ + flb_time_msleep(1500); + + /* Write additional data to trigger rotation (4 more events = + * ~16KB more) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that the original file exists */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp != NULL) { + fclose(fp); + } + + /* Check that at least one rotated file exists: + * flb_test_file_logrotate.log.*" + */ + TEST_CHECK(count_files_in_directory(TEST_LOGPATH, + "flb_test_file_logrotate.log.") >= 1); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +}; + +void flb_test_file_logrotate_gzip_compression(void) +{ + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "files_rotation", "true", "max_size", "5K", + "max_files", "3", "gzip", "true", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data for rotation to happen (JSON_SMALL is + * ~4KB) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + /* Wait for file to be created */ + ret = wait_for_file_size(logfile, 10 * 1024, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Write enough data to trigger rotation (JSON_SMALL is ~4KB) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(2000); /* waiting flush and rotation/compression */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that a gzipped rotated file exists: + * flb_test_file_logrotate.log.*.gz + */ + ret = wait_for_file_pattern(TEST_LOGPATH, "flb_test_file_logrotate.log.", + ".gz", TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +}; + +void flb_test_file_logrotate_max_files_cleanup(void) +{ + int i, j; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int file_count; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "files_rotation", "true", "max_size", "5K", + "max_files", "3", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to trigger multiple rotations */ + for (i = 0; i < 5; i++) { /* Write enough data (5 * 4 * ~4KB = ~80KB) to + trigger multiple rotations (max_size=5K) */ + /* Write enough data for rotation to happen (JSON_SMALL is ~4KB) */ + for (j = 0; j < 4; j++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); /* waiting flush */ + + file_count = count_files_in_directory(TEST_LOGPATH, TEST_LOGFILE); + TEST_ASSERT(file_count >= 0); + TEST_CHECK(file_count <= 4); + } + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that only Max_Files + 1 files exist (current + rotated) + */ + file_count = count_files_in_directory(TEST_LOGPATH, TEST_LOGFILE); + TEST_ASSERT(file_count >= 0); + TEST_CHECK(file_count <= + 4); /* Current file + 3 rotated files (max_files=3) */ + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_max_files_validation(void) +{ + flb_ctx_t *ctx; + int out_ffd; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "off", NULL) == 0); + + /* Test with max_files = 0 */ + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "files_rotation", "true", "max_files", "0", + NULL) == 0); + + /* Start should fail */ + TEST_CHECK(flb_start(ctx) == -1); + + flb_destroy(ctx); + + /* Test with max_files = -1 */ + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "off", NULL) == 0); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "files_rotation", "true", "max_files", "-1", + NULL) == 0); + + /* Start should fail */ + TEST_CHECK(flb_start(ctx) == -1); + + flb_destroy(ctx); + + /* Clean up directory */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_file_logrotate_gzip_compression_exact_chunk(void) +{ + int ret; + int bytes; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *large_message; + char *json_payload; + size_t msg_size = 64 * 1024; /* 64KB exact chunk size */ + size_t json_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"file", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "template", "template", "{message}", + "files_rotation", "true", "max_size", "64K", + "max_files", "3", "gzip", "true", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Prepare 64KB message */ + large_message = flb_malloc(msg_size + 1); + TEST_CHECK(large_message != NULL); + memset(large_message, 'A', msg_size); + large_message[msg_size] = '\0'; + + /* Create JSON payload: [timestamp, {"message": "..."}] */ + /* Estimate size: msg_size + overhead */ + json_size = msg_size + 100; + json_payload = flb_malloc(json_size); + TEST_CHECK(json_payload != NULL); + + snprintf(json_payload, json_size, "[%lu, {\"message\": \"%s\"}]", + time(NULL), large_message); + + /* Write exactly 64KB of data (the message content) */ + bytes = flb_lib_push(ctx, in_ffd, json_payload, strlen(json_payload)); + TEST_CHECK(bytes == strlen(json_payload)); + + flb_free(large_message); + flb_free(json_payload); + + /* Wait for flush and file creation */ + flb_time_msleep(1500); + + /* Trigger rotation by writing one more small record */ + char *small_payload = "[1234567890, {\"message\": \"trigger\"}]"; + bytes = flb_lib_push(ctx, in_ffd, small_payload, strlen(small_payload)); + TEST_CHECK(bytes == strlen(small_payload)); + + flb_time_msleep(2000); /* waiting flush and rotation/compression */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that a gzipped rotated file exists */ + ret = wait_for_file_pattern(TEST_LOGPATH, "flb_test_file_logrotate.log.", + ".gz", TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +}