Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/fluent-bit/flb_aws_credentials.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@
/* 5 second timeout for credential related http requests */
#define FLB_AWS_CREDENTIAL_NET_TIMEOUT 5

/* IoT Credentials Environment Variables */
#define AWS_IOT_KEY_FILE "AWS_IOT_KEY_FILE"
#define AWS_IOT_CERT_FILE "AWS_IOT_CERT_FILE"
#define AWS_IOT_CA_CERT_FILE "AWS_IOT_CA_CERT_FILE"
#define AWS_IOT_CREDENTIALS_ENDPOINT "AWS_IOT_CREDENTIALS_ENDPOINT"
#define AWS_IOT_THING_NAME "AWS_IOT_THING_NAME"
#define AWS_IOT_ROLE_ALIAS "AWS_IOT_ROLE_ALIAS"

/*
* A structure that wraps the sensitive data needed to sign an AWS request
*/
Expand Down Expand Up @@ -225,6 +233,11 @@ struct flb_aws_provider *flb_eks_provider_create(struct flb_config *config,
flb_aws_client_generator
*generator);

/*
* IoT Provider
*/
struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config,
struct flb_aws_client_generator *generator);

/*
* STS Assume Role Provider.
Expand Down
1 change: 0 additions & 1 deletion plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ REGISTER_OUT_PLUGIN("out_es")
REGISTER_OUT_PLUGIN("out_exit")
REGISTER_OUT_PLUGIN("out_file")
REGISTER_OUT_PLUGIN("out_logrotate")

REGISTER_OUT_PLUGIN("out_forward")
REGISTER_OUT_PLUGIN("out_http")
REGISTER_OUT_PLUGIN("out_influxdb")
Expand Down
179 changes: 153 additions & 26 deletions plugins/out_logrotate/logrotate.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_hash_table.h>
#include <msgpack.h>

#include <stdio.h>
Expand Down Expand Up @@ -79,7 +80,7 @@ struct flb_logrotate_conf {
size_t max_size; /* Max file size */
int max_files; /* Maximum number of rotated files to keep */
int gzip; /* Whether to gzip rotated files */
size_t current_file_size; /* Current file size in bytes */
struct flb_hash_table *file_sizes; /* Hash table to store file size per filename */
struct flb_output_instance *ins;
};

Expand Down Expand Up @@ -123,10 +124,20 @@ static int cb_logrotate_init(struct flb_output_instance *ins,
ctx->delimiter = NULL;
ctx->label_delimiter = NULL;
ctx->template = NULL;
ctx->current_file_size = 0; /* Initialize file size counter */

/* Initialize hash table to store file sizes per filename */
ctx->file_sizes = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 64, 0);
if (!ctx->file_sizes) {
flb_errno();
flb_free(ctx);
return -1;
}

ret = flb_output_config_map_set(ins, (void *) ctx);
if (ret == -1) {
if (ctx->file_sizes) {
flb_hash_table_destroy(ctx->file_sizes);
}
flb_free(ctx);
return -1;
}
Expand Down Expand Up @@ -162,6 +173,9 @@ static int cb_logrotate_init(struct flb_output_instance *ins,
}
else {
flb_plg_error(ctx->ins, "unknown format %s. abort.", tmp);
if (ctx->file_sizes) {
flb_hash_table_destroy(ctx->file_sizes);
}
flb_free(ctx);
return -1;
}
Expand Down Expand Up @@ -353,13 +367,14 @@ static int template_output(FILE *fp, struct flb_time *tm, msgpack_object *obj,
return 0;
}

static int plain_output(FILE *fp, msgpack_object *obj, size_t alloc_size)
static int plain_output(FILE *fp, msgpack_object *obj, size_t alloc_size, int escape_unicode)
{
char *buf;

buf = flb_msgpack_to_json_str(alloc_size, obj);
buf = flb_msgpack_to_json_str(alloc_size, obj, escape_unicode);
if (buf) {
fprintf(fp, "%s" NEWLINE, buf);
fprintf(fp, "%s" NEWLINE,
buf);
flb_free(buf);
}
return 0;
Expand Down Expand Up @@ -513,25 +528,50 @@ static int mkpath(struct flb_output_instance *ins, const char *dir)
#endif
}

/* Function to check if file size exceeds max size in MB */
static int should_rotate_file(struct flb_logrotate_conf *ctx)
/* Function to check if file size exceeds max size for a specific file */
static int should_rotate_file(struct flb_logrotate_conf *ctx, const char *filename)
{
if (ctx->current_file_size >= ctx->max_size) {
flb_plg_info(ctx->ins, "going to rotate file: current size=%zu max size=%zu", ctx->current_file_size, ctx->max_size);
size_t file_size = 0;
void *out_buf;
size_t out_size;
int ret;

/* Get file size from hash table */
ret = flb_hash_table_get(ctx->file_sizes, filename, strlen(filename),
&out_buf, &out_size);
if (ret == 0 && out_size == sizeof(size_t)) {
file_size = *(size_t *)out_buf;
}

if (file_size >= ctx->max_size) {
flb_plg_info(ctx->ins, "going to rotate file %s: current size=%zu max size=%zu",
filename, file_size, ctx->max_size);
return 1;
}
else {
flb_plg_debug(ctx->ins, "file should not be rotated: current size=%zu max size=%zu", ctx->current_file_size, ctx->max_size);
flb_plg_debug(ctx->ins, "file %s should not be rotated: current size=%zu max size=%zu",
filename, file_size, ctx->max_size);
return 0;
}
}

/* Function to update file size counter using current file position */
static void update_file_size_counter(struct flb_logrotate_conf *ctx, FILE *fp)
/* Function to update file size counter for a specific file */
static void update_file_size_counter(struct flb_logrotate_conf *ctx,
const char *filename, FILE *fp)
{
struct stat st;
size_t file_size;
int ret;

if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) {
ctx->current_file_size = (size_t) st.st_size;
file_size = (size_t) st.st_size;

/* Store or update file size in hash table */
ret = flb_hash_table_add(ctx->file_sizes, filename, strlen(filename),
&file_size, sizeof(size_t));
if (ret == -1) {
flb_plg_warn(ctx->ins, "failed to update file size for %s", filename);
}
}
}

Expand Down Expand Up @@ -710,11 +750,21 @@ static int rotate_file(struct flb_logrotate_conf *ctx, const char *filename)
char timestamp[32];
char rotated_filename[PATH_MAX];
char gzip_filename[PATH_MAX];
size_t file_size = 0;
void *out_buf;
size_t out_size;
int ret = 0;

/* Get file size from hash table for logging */
ret = flb_hash_table_get(ctx->file_sizes, filename, strlen(filename),
&out_buf, &out_size);
if (ret == 0 && out_size == sizeof(size_t)) {
file_size = *(size_t *)out_buf;
}

/* Log rotation event */
flb_plg_info(ctx->ins, "rotating file: %s (current size: %zu bytes)",
filename, ctx->current_file_size);
filename, file_size);

/* Generate timestamp */
generate_timestamp(timestamp, sizeof(timestamp));
Expand Down Expand Up @@ -750,6 +800,62 @@ static int rotate_file(struct flb_logrotate_conf *ctx, const char *filename)
return 0;
}

/* Function to validate if a filename matches the rotation pattern format
* Valid formats:
* - base_filename.YYYYMMDD_HHMMSS (15 chars after pattern)
* - base_filename.YYYYMMDD_HHMMSS.gz (18 chars after pattern)
*/
static int is_valid_rotation_filename(const char *filename, const char *pattern)
{
size_t pattern_len = strlen(pattern);
size_t filename_len = strlen(filename);
const char *suffix;
size_t suffix_len;
int i;

/* Check that filename starts with pattern */
if (strncmp(filename, pattern, pattern_len) != 0) {
return 0;
}

/* Get the suffix after the pattern */
suffix = filename + pattern_len;
suffix_len = filename_len - pattern_len;

/* Must be exactly 15 or 18 characters */
if (suffix_len != 15 && suffix_len != 18) {
return 0;
}

/* For 18 characters, must end with .gz */
if (suffix_len == 18) {
if (strcmp(suffix + 15, ".gz") != 0) {
return 0;
}
}

/* Validate timestamp format: YYYYMMDD_HHMMSS
* - 8 digits (YYYYMMDD)
* - underscore at position 8
* - 6 digits (HHMMSS)
*/
for (i = 0; i < 8; i++) {
if (suffix[i] < '0' || suffix[i] > '9') {
return 0;
}
}
if (suffix[8] != '_') {
return 0;
}
for (i = 9; i < 15; i++) {
if (suffix[i] < '0' || suffix[i] > '9') {
return 0;
}
}

return 1;
}

/* Function to clean up old rotated files */
static int cleanup_old_files(struct flb_logrotate_conf *ctx, const char *directory, const char *base_filename)
{
Expand All @@ -772,7 +878,7 @@ static int cleanup_old_files(struct flb_logrotate_conf *ctx, const char *directo

/* Count matching files */
while ((entry = readdir(dir)) != NULL) {
if (strncmp(entry->d_name, pattern, strlen(pattern)) == 0) {
if (is_valid_rotation_filename(entry->d_name, pattern)) {
file_count++;
}
}
Expand All @@ -793,7 +899,7 @@ static int cleanup_old_files(struct flb_logrotate_conf *ctx, const char *directo
rewinddir(dir);
i = 0;
while ((entry = readdir(dir)) != NULL && i < file_count) {
if (strncmp(entry->d_name, pattern, strlen(pattern)) == 0) {
if (is_valid_rotation_filename(entry->d_name, pattern)) {
snprintf(full_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s",
directory, entry->d_name);
files[i] = flb_strdup(full_path);
Expand Down Expand Up @@ -853,6 +959,7 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,
char out_file[PATH_MAX];
char *buf;
long file_pos;
bool have_directory;

char *out_file_copy;
char directory[PATH_MAX];
Expand Down Expand Up @@ -884,7 +991,9 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,
}

/* Check if file needs rotation based on current size counter */
if (should_rotate_file(ctx)) {
if (should_rotate_file(ctx, out_file)) {
have_directory = false;
directory[0] = '\0';
/* Extract directory and base filename for cleanup */
out_file_copy = flb_strdup(out_file);
if (out_file_copy) {
Expand All @@ -897,6 +1006,7 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,
directory[PATH_MAX - 1] = '\0';
#endif
flb_free(out_file_copy);
have_directory = true;
}

/* Get base filename for cleanup */
Expand All @@ -910,10 +1020,12 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,

/* Rotate the file */
if (rotate_file(ctx, out_file) == 0) {
/* Reset file size counter after rotation */
ctx->current_file_size = 0;
/* Remove file size entry from hash table after rotation */
flb_hash_table_del(ctx->file_sizes, out_file);
/* Clean up old rotated files */
cleanup_old_files(ctx, directory, base_filename);
if (have_directory) {
cleanup_old_files(ctx, directory, base_filename);
}
}
}

Expand Down Expand Up @@ -941,8 +1053,15 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,
}

/* Initialize file size counter if this is a new file */
if (ctx->current_file_size == 0) {
update_file_size_counter(ctx, fp);
{
void *out_buf;
size_t out_size;
int ret = flb_hash_table_get(ctx->file_sizes, out_file, strlen(out_file),
&out_buf, &out_size);
if (ret != 0) {
/* File not in hash table, initialize it */
update_file_size_counter(ctx, out_file, fp);
}
}

/*
Expand All @@ -955,6 +1074,8 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,
if (event_chunk->type == FLB_INPUT_METRICS) {
print_metrics_text(ctx->ins, fp,
event_chunk->data, event_chunk->size);
/* Update file size counter */
update_file_size_counter(ctx, out_file, fp);
fclose(fp);
FLB_OUTPUT_RETURN(FLB_OK);
}
Expand All @@ -979,7 +1100,7 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,
} while (total < event_chunk->size);

/* Update file size counter */
update_file_size_counter(ctx, fp);
update_file_size_counter(ctx, out_file, fp);
fclose(fp);
FLB_OUTPUT_RETURN(FLB_OK);
}
Expand Down Expand Up @@ -1008,7 +1129,8 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,

switch (ctx->format){
case FLB_OUT_LOGROTATE_FMT_JSON:
buf = flb_msgpack_to_json_str(alloc_size, log_event.body);
buf = flb_msgpack_to_json_str(alloc_size, log_event.body,
config->json_escape_unicode);
if (buf) {
fprintf(fp, "%s: [%"PRIu64".%09lu, %s]" NEWLINE,
event_chunk->tag,
Expand Down Expand Up @@ -1040,7 +1162,7 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,
log_event.body, ctx);
break;
case FLB_OUT_LOGROTATE_FMT_PLAIN:
plain_output(fp, log_event.body, alloc_size);
plain_output(fp, log_event.body, alloc_size, config->json_escape_unicode);
break;
case FLB_OUT_LOGROTATE_FMT_TEMPLATE:
template_output(fp,
Expand All @@ -1053,7 +1175,7 @@ static void cb_logrotate_flush(struct flb_event_chunk *event_chunk,
flb_log_event_decoder_destroy(&log_decoder);

/* Update file size counter */
update_file_size_counter(ctx, fp);
update_file_size_counter(ctx, out_file, fp);
fclose(fp);

FLB_OUTPUT_RETURN(FLB_OK);
Expand All @@ -1067,6 +1189,11 @@ static int cb_logrotate_exit(void *data, struct flb_config *config)
return 0;
}

/* Destroy hash table */
if (ctx->file_sizes) {
flb_hash_table_destroy(ctx->file_sizes);
}

flb_free(ctx);
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions src/aws/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ set(src
"flb_aws_imds.c"
"flb_aws_credentials_http.c"
"flb_aws_credentials_profile.c"
"flb_aws_credentials_iot.c"
)

message(STATUS "=== AWS Credentials ===")
Expand Down
Loading