diff --git a/plugins/out_azure_blob/CMakeLists.txt b/plugins/out_azure_blob/CMakeLists.txt index f9e2a370741..5d5ea721526 100644 --- a/plugins/out_azure_blob/CMakeLists.txt +++ b/plugins/out_azure_blob/CMakeLists.txt @@ -6,6 +6,7 @@ set(src azure_blob_db.c azure_blob_appendblob.c azure_blob_blockblob.c + azure_blob_store.c ) FLB_PLUGIN(out_azure_blob "${src}" "") diff --git a/plugins/out_azure_blob/azure_blob.c b/plugins/out_azure_blob/azure_blob.c index 8d2b8bcea79..d0ae18d503c 100644 --- a/plugins/out_azure_blob/azure_blob.c +++ b/plugins/out_azure_blob/azure_blob.c @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -40,6 +41,7 @@ #include "azure_blob_appendblob.h" #include "azure_blob_blockblob.h" #include "azure_blob_http.h" +#include "azure_blob_store.h" #define CREATE_BLOB 1337 @@ -76,6 +78,95 @@ static int azure_blob_format(struct flb_config *config, return 0; } +/* + * Either new_data or chunk can be NULL, but not both + */ +static int construct_request_buffer(struct flb_azure_blob *ctx, flb_sds_t new_data, + struct azure_blob_file *upload_file, + char **out_buf, size_t *out_size) +{ + char *body; + char *tmp; + size_t body_size = 0; + char *buffered_data = NULL; + size_t buffer_size = 0; + int ret; + + if (new_data == NULL && upload_file == NULL) { + flb_plg_error(ctx->ins, "[construct_request_buffer] Something went wrong" + " both chunk and new_data are NULL"); + return -1; + } + + if (upload_file) { + ret = azure_blob_store_file_upload_read(ctx, upload_file->fsf, &buffered_data, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not read locally buffered data %s", + upload_file->fsf->name); + return -1; + } + + /* + * lock the upload_file from buffer list + */ + azure_blob_store_file_lock(upload_file); + body = buffered_data; + body_size = buffer_size; + } + + flb_plg_debug(ctx->ins, "[construct_request_buffer] size of buffer file read %zu", buffer_size); + + /* + * If new data is arriving, increase the original 'buffered_data' size + * to append the new one. + */ + if (new_data) { + body_size += flb_sds_len(new_data); + flb_plg_debug(ctx->ins, "[construct_request_buffer] size of new_data %zu", body_size); + + tmp = flb_realloc(buffered_data, body_size + 1); + if (!tmp) { + flb_errno(); + flb_free(buffered_data); + if (upload_file) { + azure_blob_store_file_unlock(upload_file); + } + return -1; + } + body = buffered_data = tmp; + memcpy(body + buffer_size, new_data, flb_sds_len(new_data)); + if (ctx->compress_gzip == FLB_FALSE){ + body[body_size] = '\0'; + } + } + + flb_plg_debug(ctx->ins, "[construct_request_buffer] final increased %zu", body_size); + + *out_buf = body; + *out_size = body_size; + + return 0; +} + +void generate_random_string_blob(char *str, size_t length) +{ + const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + const size_t charset_size = sizeof(charset) - 1; + size_t i; + size_t index; + + /* Seed the random number generator with multiple sources of entropy */ + unsigned int seed = (unsigned int)(time(NULL) ^ clock() ^ getpid()); + srand(seed); + + for (i = 0; i < length; ++i) { + index = (size_t)rand() % charset_size; + str[i] = charset[index]; + } + + str[length] = '\0'; +} + static int create_blob(struct flb_azure_blob *ctx, char *name) { int ret; @@ -89,6 +180,11 @@ static int create_blob(struct flb_azure_blob *ctx, char *name) return FLB_RETRY; } + if (ctx->buffering_enabled == FLB_TRUE){ + ctx->u->base.flags &= ~(FLB_IO_ASYNC); + ctx->u->base.net.io_timeout = ctx->io_timeout; + } + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { @@ -243,6 +339,13 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx, struct flb_http_client *c; struct flb_connection *u_conn; + flb_plg_debug(ctx->ins, "generated blob uri ::: %s", uri); + + if (ctx->buffering_enabled == FLB_TRUE){ + ctx->u->base.flags &= ~(FLB_IO_ASYNC); + ctx->u->base.net.io_timeout = ctx->io_timeout; + } + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { @@ -358,68 +461,57 @@ static int send_blob(struct flb_config *config, flb_sds_t ref_name = NULL; void *payload_buf = data; size_t payload_size = bytes; + char *generated_random_string; ref_name = flb_sds_create_size(256); if (!ref_name) { return FLB_RETRY; } + /* Allocate memory for the random string dynamically */ + generated_random_string = flb_malloc(ctx->blob_uri_length + 1); + if (!generated_random_string) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot allocate memory for random string"); + flb_sds_destroy(ref_name); + return FLB_RETRY; + } + if (blob_type == AZURE_BLOB_APPENDBLOB) { uri = azb_append_blob_uri(ctx, tag); } else if (blob_type == AZURE_BLOB_BLOCKBLOB) { + generate_random_string_blob(generated_random_string, ctx->blob_uri_length); /* Generate the random string */ if (event_type == FLB_EVENT_TYPE_LOGS) { block_id = azb_block_blob_id_logs(&ms); if (!block_id) { flb_plg_error(ctx->ins, "could not generate block id"); - + flb_free(generated_random_string); cfl_sds_destroy(ref_name); - return FLB_RETRY; } - uri = azb_block_blob_uri(ctx, tag, block_id, ms); + uri = azb_block_blob_uri(ctx, tag, block_id, ms, generated_random_string); ref_name = flb_sds_printf(&ref_name, "file=%s.%" PRIu64, name, ms); } else if (event_type == FLB_EVENT_TYPE_BLOBS) { block_id = azb_block_blob_id_blob(ctx, name, part_id); - uri = azb_block_blob_uri(ctx, name, block_id, 0); + uri = azb_block_blob_uri(ctx, name, block_id, 0, generated_random_string); ref_name = flb_sds_printf(&ref_name, "file=%s:%" PRIu64, name, part_id); } } if (!uri) { + flb_free(generated_random_string); if (block_id != NULL) { flb_free(block_id); } - flb_sds_destroy(ref_name); - return FLB_RETRY; } - /* Logs: Format the data (msgpack -> JSON) */ - if (event_type == FLB_EVENT_TYPE_LOGS) { - ret = azure_blob_format(config, i_ins, - ctx, NULL, - FLB_EVENT_TYPE_LOGS, - tag, tag_len, - data, bytes, - &payload_buf, &payload_size); - if (ret != 0) { - flb_sds_destroy(uri); - - if (block_id != NULL) { - flb_free(block_id); - } - - flb_sds_destroy(ref_name); - return FLB_ERROR; - } - } - else if (event_type == FLB_EVENT_TYPE_BLOBS) { - payload_buf = data; - payload_size = bytes; - } + /* Map buffer */ + payload_buf = data; + payload_size = bytes; ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, payload_buf, payload_size); flb_plg_debug(ctx->ins, "http_send_blob()=%i", ret); @@ -427,7 +519,7 @@ static int send_blob(struct flb_config *config, if (ret == FLB_OK) { /* For Logs type, we need to commit the block right away */ if (event_type == FLB_EVENT_TYPE_LOGS) { - ret = azb_block_blob_commit_block(ctx, block_id, tag, ms); + ret = azb_block_blob_commit_block(ctx, block_id, tag, ms, generated_random_string); } } else if (ret == CREATE_BLOB) { @@ -443,6 +535,7 @@ static int send_blob(struct flb_config *config, } flb_sds_destroy(uri); + flb_free(generated_random_string); if (block_id != NULL) { flb_free(block_id); @@ -459,6 +552,11 @@ static int create_container(struct flb_azure_blob *ctx, char *name) struct flb_http_client *c; struct flb_connection *u_conn; + if (ctx->buffering_enabled == FLB_TRUE){ + ctx->u->base.flags &= ~(FLB_IO_ASYNC); + ctx->u->base.net.io_timeout = ctx->io_timeout; + } + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { @@ -552,6 +650,11 @@ static int ensure_container(struct flb_azure_blob *ctx) return FLB_FALSE; } + if (ctx->buffering_enabled == FLB_TRUE){ + ctx->u->base.flags &= ~(FLB_IO_ASYNC); + ctx->u->base.net.io_timeout = ctx->io_timeout; + } + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { @@ -609,7 +712,7 @@ static int ensure_container(struct flb_azure_blob *ctx) ctx->container_name); return FLB_FALSE; } - + flb_plg_error(ctx->ins, "get container request failed, status=%i", status); @@ -631,6 +734,39 @@ static int cb_azure_blob_init(struct flb_output_instance *ins, return -1; } + if (ctx->buffering_enabled == FLB_TRUE) { + ctx->ins = ins; + ctx->retry_time = 0; + + /* Initialize local storage */ + int ret = azure_blob_store_init(ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to initialize kusto storage: %s", + ctx->store_dir); + return -1; + } + + /* validate 'total_file_size' */ + if (ctx->file_size <= 0) { + flb_plg_error(ctx->ins, "Failed to parse upload_file_size"); + return -1; + } + if (ctx->file_size < 1000000) { + flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB"); + return -1; + } + if (ctx->file_size > MAX_FILE_SIZE) { + flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE); + return -1; + } + ctx->has_old_buffers = azure_blob_store_has_data(ctx); + ctx->timer_created = FLB_FALSE; + ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000; + flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size); + } + + flb_output_set_context(ins, ctx); + flb_output_set_http_debug_callbacks(ins); return 0; } @@ -1035,6 +1171,303 @@ static int azb_timer_create(struct flb_azure_blob *ctx) return 0; } +/** + * Azure Blob Storage ingestion callback function + * This function handles the upload of data chunks to Azure Blob Storage with retry mechanism + * @param config: Fluent Bit configuration + * @param data: Azure Blob context data + */ +static void cb_azure_blob_ingest(struct flb_config *config, void *data) { + /* Initialize context and file handling variables */ + struct flb_azure_blob *ctx = data; + struct azure_blob_file *file = NULL; + struct flb_fstore_file *fsf; + char *buffer = NULL; + size_t buffer_size = 0; + struct mk_list *tmp; + struct mk_list *head; + int ret; + time_t now; + flb_sds_t payload; + flb_sds_t tag_sds; + + /* Retry mechanism configuration */ + int retry_count; + int backoff_time; + const int max_backoff_time = 64; /* Maximum backoff time in seconds */ + + /* Log entry point and container information */ + flb_plg_debug(ctx->ins, "Running upload timer callback (cb_azure_blob_ingest).."); + + /* Initialize jitter for retry mechanism */ + srand(time(NULL)); + now = time(NULL); + + /* Iterate through all chunks in the active stream */ + mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + file = fsf->data; + + /* Debug logging for current file processing */ + flb_plg_debug(ctx->ins, "Iterating files inside upload timer callback (cb_azure_blob_ingest).. %s", + file->fsf->name); + + /* Skip if chunk hasn't timed out yet */ + if (now < (file->create_time + ctx->upload_timeout + ctx->retry_time)) { + continue; + } + + /* Skip if file is already being processed */ + flb_plg_debug(ctx->ins, "cb_azure_blob_ingest :: Before file locked check %s", file->fsf->name); + if (file->locked == FLB_TRUE) { + continue; + } + + /* Initialize retry mechanism parameters */ + retry_count = 0; + backoff_time = 2; /* Initial backoff time in seconds */ + + /* Retry loop for upload attempts */ + while (retry_count < ctx->scheduler_max_retries) { + /* Construct request buffer for upload */ + flb_plg_debug(ctx->ins, "cb_azure_blob_ingest :: Before construct_request_buffer %s", file->fsf->name); + ret = construct_request_buffer(ctx, NULL, file, &buffer, &buffer_size); + + /* Handle request buffer construction failure */ + if (ret < 0) { + flb_plg_error(ctx->ins, "cb_azure_blob_ingest :: Could not construct request buffer for %s", + file->fsf->name); + retry_count++; + + /* Implement exponential backoff with jitter */ + int jitter = rand() % backoff_time; + flb_plg_warn(ctx->ins, "cb_azure_blob_ingest :: failure in construct_request_buffer :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s", + backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name); + sleep(backoff_time + jitter); + backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; + continue; + } + + /* Create payload and tags for blob upload */ + payload = flb_sds_create_len(buffer, buffer_size); + tag_sds = flb_sds_create(fsf->meta_buf); + flb_plg_debug(ctx->ins, "cb_azure_blob_ingest ::: tag of the file %s", tag_sds); + + /* Attempt to send blob */ + ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS,ctx->btype , (char *) tag_sds,0, (char *) tag_sds, + flb_sds_len(tag_sds), payload, flb_sds_len(payload)); + + /* Handle blob creation if necessary */ + if (ret == CREATE_BLOB) { + ret = create_blob(ctx, tag_sds); + if (ret == FLB_OK) { + ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS,ctx->btype, (char *) tag_sds, 0, (char *) tag_sds, + flb_sds_len(tag_sds), payload, flb_sds_len(payload)); + } + } + + /* Handle blob send failure */ + if (ret != FLB_OK) { + /* Clean up resources and update failure count */ + flb_plg_error(ctx->ins, "cb_azure_blob_ingest :: Failed to ingest data to Azure Blob Storage (attempt %d of %d)", + retry_count + 1, ctx->scheduler_max_retries); + flb_free(buffer); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + + if (file) { + azure_blob_store_file_unlock(file); + file->failures += 1; + } + + retry_count++; + + /* Implement exponential backoff with jitter for retry */ + int jitter = rand() % backoff_time; + flb_plg_warn(ctx->ins, "cb_azure_blob_ingest :: error sending blob :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s", + backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name); + sleep(backoff_time + jitter); + backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; + continue; + } + + /* Handle successful upload */ + ret = azure_blob_store_file_delete(ctx, file); + if (ret == 0) { + flb_plg_debug(ctx->ins, "cb_azure_blob_ingest :: deleted successfully ingested file %s", fsf->name); + } + else { + flb_plg_error(ctx->ins, "cb_azure_blob_ingest :: failed to delete ingested file %s", fsf->name); + if (file) { + azure_blob_store_file_unlock(file); + file->failures += 1; + } + } + + /* Clean up resources */ + flb_free(buffer); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + break; + } + + /* Ensure file is unlocked if max retries reached */ + if (retry_count >= ctx->scheduler_max_retries) { + flb_plg_error(ctx->ins, "cb_azure_blob_ingest :: Max retries reached for file :: attempting to delete/marking inactive %s", + file->fsf->name); + if (ctx->delete_on_max_upload_error){ + azure_blob_store_file_delete(ctx, file); + } + else { + azure_blob_store_file_inactive(ctx, file); + } + } + + flb_plg_debug(ctx->ins, "Exited upload timer callback (cb_azure_blob_ingest).."); + } +} + + +static int ingest_all_chunks(struct flb_azure_blob *ctx, struct flb_config *config) +{ + struct azure_blob_file *chunk; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_file *fsf; + struct flb_fstore_stream *fs_stream; + flb_sds_t payload = NULL; + char *buffer = NULL; + size_t buffer_size; + int ret; + flb_sds_t tag_sds; + + mk_list_foreach(head, &ctx->fs->streams) { + /* skip multi upload stream */ + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + if (fs_stream == ctx->stream_upload) { + continue; + } + + mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + chunk = fsf->data; + + /* Locked chunks are being processed, skip */ + if (chunk->locked == FLB_TRUE) { + continue; + } + + if (chunk->failures >= ctx->scheduler_max_retries) { + flb_plg_warn(ctx->ins, + "ingest_all_chunks :: Chunk for tag %s failed to send %i times, " + "will not retry", + (char *) fsf->meta_buf, ctx->scheduler_max_retries); + if (ctx->delete_on_max_upload_error){ + azure_blob_store_file_delete(ctx, chunk); + } + else { + azure_blob_store_file_inactive(ctx, chunk); + } + continue; + } + + ret = construct_request_buffer(ctx, NULL, chunk, + &buffer, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, + "ingest_all_chunks :: Could not construct request buffer for %s", + chunk->file_path); + return -1; + } + + payload = flb_sds_create_len(buffer, buffer_size); + tag_sds = flb_sds_create(fsf->meta_buf); + flb_free(buffer); + + ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype, (char *)tag_sds, 0, (char *)tag_sds, flb_sds_len(tag_sds), payload, flb_sds_len(payload)); + + if (ret == CREATE_BLOB) { + ret = create_blob(ctx, tag_sds); + if (ret == FLB_OK) { + ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype, (char *)tag_sds, 0, (char *)tag_sds, flb_sds_len(tag_sds), payload, flb_sds_len(payload)); + } + } + + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "ingest_all_chunks :: Failed to ingest data to Azure Blob Storage"); + if (chunk){ + azure_blob_store_file_unlock(chunk); + chunk->failures += 1; + } + flb_sds_destroy(tag_sds); + flb_sds_destroy(payload); + return -1; + } + + flb_sds_destroy(tag_sds); + flb_sds_destroy(payload); + + /* data was sent successfully- delete the local buffer */ + azure_blob_store_file_cleanup(ctx, chunk); + } + } + + return 0; +} + +static void flush_init(void *out_context, struct flb_config *config) +{ + int ret; + struct flb_azure_blob *ctx = out_context; + struct flb_sched *sched; + + /* clean up any old buffers found on startup */ + if (ctx->has_old_buffers == FLB_TRUE) { + flb_plg_info(ctx->ins, + "Sending locally buffered data from previous " + "executions to azure blob; buffer=%s", + ctx->fs->root_path); + ctx->has_old_buffers = FLB_FALSE; + ret = ingest_all_chunks(ctx, config); + if (ret < 0) { + ctx->has_old_buffers = FLB_TRUE; + flb_plg_error(ctx->ins, + "Failed to send locally buffered data left over " + "from previous executions; will retry. Buffer=%s", + ctx->fs->root_path); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + else { + flb_plg_debug(ctx->ins, + "Did not find any local buffered data from previous " + "executions to azure blob; buffer=%s", + ctx->fs->root_path); + } + + /* + * create a timer that will run periodically and check if uploads + * are ready for completion + * this is created once on the first flush + */ + if (ctx->timer_created == FLB_FALSE) { + flb_plg_debug(ctx->ins, + "Creating upload timer with frequency %ds", + ctx->timer_ms / 1000); + + sched = flb_sched_ctx_get(); + + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->timer_ms, cb_azure_blob_ingest, ctx, NULL); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to create upload timer"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + ctx->timer_created = FLB_TRUE; + } +} + static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, @@ -1045,39 +1478,183 @@ static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk, struct flb_azure_blob *ctx = out_context; (void) i_ins; (void) config; - - /* - * Azure blob requires a container. The following function validate that the container exists, - * otherwise it will be created. Note that that container name is specified by the user - * in the configuration file. - * - * https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-container-create#about-container-naming - */ - ret = ensure_container(ctx); - if (ret == FLB_FALSE) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } + flb_sds_t json = NULL; + size_t json_size; if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { - ret = send_blob(config, i_ins, ctx, - FLB_EVENT_TYPE_LOGS, - ctx->btype, /* blob type per user configuration */ - (char *) event_chunk->tag, /* use tag as 'name' */ - 0, /* part id */ - (char *) event_chunk->tag, flb_sds_len(event_chunk->tag), - (char *) event_chunk->data, event_chunk->size); - - if (ret == CREATE_BLOB) { - ret = create_blob(ctx, event_chunk->tag); - if (ret == FLB_OK) { - ret = send_blob(config, i_ins, ctx, - FLB_EVENT_TYPE_LOGS, - ctx->btype, /* blob type per user configuration */ - (char *) event_chunk->tag, /* use tag as 'name' */ - 0, /* part id */ - (char *) event_chunk->tag, /* use tag as 'name' */ - flb_sds_len(event_chunk->tag), - (char *) event_chunk->data, event_chunk->size); + if (ctx->buffering_enabled == FLB_TRUE) { + size_t tag_len; + struct azure_blob_file *upload_file = NULL; + int upload_timeout_check = FLB_FALSE; + int total_file_size_check = FLB_FALSE; + + char *final_payload = NULL; + size_t final_payload_size = 0; + flb_sds_t tag_name = NULL; + + flb_plg_trace(ctx->ins, "flushing bytes for event tag %s and size %zu", event_chunk->tag, event_chunk->size); + + if (ctx->unify_tag == FLB_TRUE) { + tag_name = flb_sds_create("fluentbit-buffer-file-unify-tag.log"); + } + else { + tag_name = event_chunk->tag; + } + tag_len = flb_sds_len(tag_name); + + flush_init(ctx, config); + /* Reformat msgpack to JSON payload */ + ret = azure_blob_format(config, i_ins, ctx, NULL, FLB_EVENT_TYPE_LOGS, tag_name, tag_len, event_chunk->data, event_chunk->size, (void **)&json, &json_size); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot reformat data into json"); + goto error; + } + + /* Get a file candidate matching the given 'tag' */ + upload_file = azure_blob_store_file_get(ctx, tag_name, tag_len); + + /* Handle upload timeout or file size limits */ + if (upload_file != NULL) { + if (upload_file->failures >= ctx->scheduler_max_retries) { + flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not retry", event_chunk->tag, ctx->scheduler_max_retries); + if (ctx->delete_on_max_upload_error) { + azure_blob_store_file_delete(ctx, upload_file); + } else { + azure_blob_store_file_inactive(ctx, upload_file); + } + upload_file = NULL; + } else if (time(NULL) > (upload_file->create_time + ctx->upload_timeout)) { + upload_timeout_check = FLB_TRUE; + } else if (upload_file->size + json_size > ctx->file_size) { + total_file_size_check = FLB_TRUE; + } + } + + if (upload_file != NULL && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) { + flb_plg_debug(ctx->ins, "uploading file %s with size %zu", upload_file->fsf->name, upload_file->size); + + /* Construct the payload for upload */ + ret = construct_request_buffer(ctx, json, upload_file, &final_payload, &final_payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, "error constructing request buffer for %s", event_chunk->tag); + flb_sds_destroy(json); + upload_file->failures += 1; + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* + * Azure blob requires a container. The following function validate that the container exists, + * otherwise it will be created. Note that that container name is specified by the user + * in the configuration file. + * + * https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-container-create#about-container-naming + */ + ret = ensure_container(ctx); + if (ret == FLB_FALSE) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Upload the file */ + ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype,(char *)tag_name, 0, (char *)tag_name, tag_len, final_payload, final_payload_size); + + if (ret == CREATE_BLOB) { + ret = create_blob(ctx, upload_file->fsf->name); + if (ret == FLB_OK) { + ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype,(char *)tag_name, 0, (char *)tag_name, tag_len, final_payload, final_payload_size); + } + } + + if (ret == FLB_OK) { + flb_plg_debug(ctx->ins, "uploaded file %s successfully", upload_file->fsf->name); + azure_blob_store_file_delete(ctx, upload_file); + goto cleanup; + } + else { + flb_plg_error(ctx->ins, "error uploading file %s", upload_file->fsf->name); + if (upload_file) { + azure_blob_store_file_unlock(upload_file); + upload_file->failures += 1; + } + goto error; + } + } + else { + /* Buffer current chunk */ + ret = azure_blob_store_buffer_put(ctx, upload_file, tag_name, tag_len, json, json_size); + if (ret == 0) { + flb_plg_debug(ctx->ins, "buffered chunk %s", event_chunk->tag); + goto cleanup; + } + else { + flb_plg_error(ctx->ins, "failed to buffer chunk %s", event_chunk->tag); + goto error; + } + } + + cleanup: + if (json) { + flb_sds_destroy(json); + } + if (tag_name && ctx->unify_tag == FLB_TRUE) { + flb_sds_destroy(tag_name); + } + if (final_payload) { + flb_free(final_payload); + } + FLB_OUTPUT_RETURN(FLB_OK); + + error: + if (json) { + flb_sds_destroy(json); + } + if (tag_name && ctx->unify_tag == FLB_TRUE) { + flb_sds_destroy(tag_name); + } + if (final_payload) { + flb_free(final_payload); + } + FLB_OUTPUT_RETURN(FLB_RETRY); + } + else { + + /* + * Azure blob requires a container. The following function validate that the container exists, + * otherwise it will be created. Note that that container name is specified by the user + * in the configuration file. + * + * https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-container-create#about-container-naming + */ + ret = ensure_container(ctx); + if (ret == FLB_FALSE) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + ret = azure_blob_format(config, i_ins, ctx, NULL, FLB_EVENT_TYPE_LOGS,(char *) event_chunk->tag, flb_sds_len(event_chunk->tag), (char *) event_chunk->data ,event_chunk->size, (void **)&json, &json_size); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot reformat data into json"); + ret = FLB_RETRY; + } + /* Buffering mode is disabled, proceed with regular flow */ + ret = send_blob(config, i_ins, ctx, + FLB_EVENT_TYPE_LOGS, + ctx->btype, /* blob type per user configuration */ + (char *) event_chunk->tag, /* use tag as 'name' */ + 0, /* part id */ + (char *) event_chunk->tag, flb_sds_len(event_chunk->tag), + json, json_size); + + if (ret == CREATE_BLOB) { + ret = create_blob(ctx, event_chunk->tag); + if (ret == FLB_OK) { + ret = send_blob(config, i_ins, ctx, + FLB_EVENT_TYPE_LOGS, + ctx->btype, /* blob type per user configuration */ + (char *) event_chunk->tag, /* use tag as 'name' */ + 0, /* part id */ + (char *) event_chunk->tag, /* use tag as 'name' */ + flb_sds_len(event_chunk->tag), + json, json_size); + } } } } @@ -1092,6 +1669,10 @@ static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk, } } + if (json){ + flb_sds_destroy(json); + } + /* FLB_RETRY, FLB_OK, FLB_ERROR */ FLB_OUTPUT_RETURN(ret); } @@ -1099,11 +1680,28 @@ static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk, static int cb_azure_blob_exit(void *data, struct flb_config *config) { struct flb_azure_blob *ctx = data; + int ret = -1; if (!ctx) { return 0; } + if (ctx->buffering_enabled == FLB_TRUE){ + if (azure_blob_store_has_data(ctx) == FLB_TRUE) { + flb_plg_info(ctx->ins, "Sending all locally buffered data to Azure Blob"); + ret = ingest_all_chunks(ctx, config); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not send all chunks on exit"); + } + } + azure_blob_store_exit(ctx); + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + ctx->u = NULL; + } + flb_azure_blob_conf_destroy(ctx); return 0; } @@ -1296,6 +1894,79 @@ static struct flb_config_map config_map[] = { "Configuration endpoint bearer token" }, + { + FLB_CONFIG_MAP_BOOL, "buffering_enabled", "false", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, buffering_enabled), + "Enable buffering into disk before ingesting into Azure Blob" + }, + + { + FLB_CONFIG_MAP_STR, "buffer_dir", "/tmp/fluent-bit/azure-blob/", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, buffer_dir), + "Specifies the location of directory where the buffered data will be stored" + }, + + { + FLB_CONFIG_MAP_TIME, "upload_timeout", "30m", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, upload_timeout), + "Optionally specify a timeout for uploads. " + "Fluent Bit will start ingesting buffer files which have been created more than x minutes and haven't reached upload_file_size limit yet" + "Default is 30m." + }, + + { + FLB_CONFIG_MAP_SIZE, "upload_file_size", "200M", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, file_size), + "Specifies the size of files to be uploaded in MBs. Default is 200MB" + }, + + { + FLB_CONFIG_MAP_STR, "azure_blob_buffer_key", "key", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, azure_blob_buffer_key), + "Set the azure blob buffer key which needs to be specified when using multiple instances of azure blob output plugin and buffering is enabled" + }, + + { + FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", "8G", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, store_dir_limit_size), + "Set the max size of the buffer directory. Default is 8GB" + }, + + { + FLB_CONFIG_MAP_BOOL, "buffer_file_delete_early", "false", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, buffer_file_delete_early), + "Whether to delete the buffered file early after successful blob creation. Default is false" + }, + + { + FLB_CONFIG_MAP_INT, "blob_uri_length", "64", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, blob_uri_length), + "Set the length of generated blob uri before ingesting to Azure Kusto. Default is 64" + }, + + { + FLB_CONFIG_MAP_BOOL, "unify_tag", "false", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, unify_tag), + "Whether to create a single buffer file when buffering mode is enabled. Default is false" + }, + + { + FLB_CONFIG_MAP_INT, "scheduler_max_retries", "3", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, scheduler_max_retries), + "Maximum number of retries for the scheduler send blob. Default is 3" + }, + + { + FLB_CONFIG_MAP_BOOL, "delete_on_max_upload_error", "false", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, delete_on_max_upload_error), + "Whether to delete the buffer file on maximum upload errors. Default is false" + }, + + { + FLB_CONFIG_MAP_TIME, "io_timeout", "60s",0, FLB_TRUE, offsetof(struct flb_azure_blob, io_timeout), + "HTTP IO timeout. Default is 60s" + }, + /* EOF */ {0} }; diff --git a/plugins/out_azure_blob/azure_blob.h b/plugins/out_azure_blob/azure_blob.h index 361e348de79..8699dda54f8 100644 --- a/plugins/out_azure_blob/azure_blob.h +++ b/plugins/out_azure_blob/azure_blob.h @@ -39,6 +39,12 @@ /* service endpoint */ #define AZURE_ENDPOINT_PREFIX ".blob.core.windows.net" +/* buffering directory max size */ +#define FLB_AZURE_BLOB_BUFFER_DIR_MAX_SIZE "8G" +#define UPLOAD_TIMER_MAX_WAIT 180000 +#define UPLOAD_TIMER_MIN_WAIT 18000 +#define MAX_FILE_SIZE 4000000000 // 4GB + #define AZURE_BLOB_APPENDBLOB 0 #define AZURE_BLOB_BLOCKBLOB 1 @@ -76,6 +82,32 @@ struct flb_azure_blob { int container_name_overriden_flag; int path_overriden_flag; + int buffering_enabled; + flb_sds_t buffer_dir; + int unify_tag; + + size_t file_size; + time_t upload_timeout; + time_t retry_time; + int timer_created; + int timer_ms; + int io_timeout; + + flb_sds_t azure_blob_buffer_key; + size_t store_dir_limit_size; + int buffer_file_delete_early; + int blob_uri_length; + int delete_on_max_upload_error; + + int has_old_buffers; + int scheduler_max_retries; + /* track the total amount of buffered data */ + size_t current_buffer_size; + char *store_dir; + struct flb_fstore *fs; + struct flb_fstore_stream *stream_active; /* default active stream */ + struct flb_fstore_stream *stream_upload; + /* * Internal use */ diff --git a/plugins/out_azure_blob/azure_blob_blockblob.c b/plugins/out_azure_blob/azure_blob_blockblob.c index 5acb03689b9..860401b2623 100644 --- a/plugins/out_azure_blob/azure_blob_blockblob.c +++ b/plugins/out_azure_blob/azure_blob_blockblob.c @@ -56,7 +56,7 @@ flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name) } flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name, - char *blockid, uint64_t ms) + char *blockid, uint64_t ms, char *random_str) { int len; flb_sds_t uri; @@ -84,22 +84,22 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name, if (ctx->path) { if (ms > 0) { - flb_sds_printf(&uri, "/%s/%s.%" PRIu64 "%s?blockid=%s&comp=block", - ctx->path, name, ms, ext, encoded_blockid); + flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?blockid=%s&comp=block", + ctx->path, name, random_str, ms, ext, encoded_blockid); } else { - flb_sds_printf(&uri, "/%s/%s%s?blockid=%s&comp=block", - ctx->path, name, ext, encoded_blockid); + flb_sds_printf(&uri, "/%s/%s.%s%s?blockid=%s&comp=block", + ctx->path, name, random_str, ext, encoded_blockid); } } else { if (ms > 0) { - flb_sds_printf(&uri, "/%s.%" PRIu64 "%s?blockid=%s&comp=block", - name, ms, ext, encoded_blockid); + flb_sds_printf(&uri, "/%s.%s.%" PRIu64 "%s?blockid=%s&comp=block", + name, random_str, ms, ext, encoded_blockid); } else { - flb_sds_printf(&uri, "/%s%s?blockid=%s&comp=block", - name, ext, encoded_blockid); + flb_sds_printf(&uri, "/%s.%s%s?blockid=%s&comp=block", + name, random_str, ext, encoded_blockid); } } @@ -112,7 +112,7 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name, } flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx, - char *tag, uint64_t ms) + char *tag, uint64_t ms, char *str) { char *ext; flb_sds_t uri; @@ -130,11 +130,11 @@ flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx, } if (ctx->path) { - flb_sds_printf(&uri, "/%s/%s.%" PRIu64 "%s?comp=blocklist", ctx->path, tag, - ms, ext); + flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?comp=blocklist", ctx->path, tag, str, + ms, ext); } else { - flb_sds_printf(&uri, "/%s.%" PRIu64 "%s?comp=blocklist", tag, ms, ext); + flb_sds_printf(&uri, "/%s.%s.%" PRIu64 "%s?comp=blocklist", tag, str, ms, ext); } if (ctx->atype == AZURE_BLOB_AUTH_SAS && ctx->sas_token) { @@ -258,6 +258,11 @@ int azb_block_blob_put_block_list(struct flb_azure_blob *ctx, flb_sds_t uri, flb struct flb_http_client *c; struct flb_connection *u_conn; + if (ctx->buffering_enabled == FLB_TRUE){ + ctx->u->base.flags &= ~(FLB_IO_ASYNC); + ctx->u->base.net.io_timeout = ctx->io_timeout; + } + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { @@ -286,7 +291,7 @@ int azb_block_blob_put_block_list(struct flb_azure_blob *ctx, flb_sds_t uri, flb /* Validate HTTP status */ if (ret == -1) { - flb_plg_error(ctx->ins, "error sending append_blob"); + flb_plg_error(ctx->ins, "error sending block_blob"); return FLB_RETRY; } @@ -326,14 +331,14 @@ int azb_block_blob_put_block_list(struct flb_azure_blob *ctx, flb_sds_t uri, flb } /* Commit a single block */ -int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms) +int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms, char *str) { int ret; flb_sds_t uri = NULL; flb_sds_t payload; /* Compose commit URI */ - uri = azb_block_blob_uri_commit(ctx, tag, ms); + uri = azb_block_blob_uri_commit(ctx, tag, ms, str); if (!uri) { return FLB_ERROR; } diff --git a/plugins/out_azure_blob/azure_blob_blockblob.h b/plugins/out_azure_blob/azure_blob_blockblob.h index 42d6a970428..6949aefde51 100644 --- a/plugins/out_azure_blob/azure_blob_blockblob.h +++ b/plugins/out_azure_blob/azure_blob_blockblob.h @@ -25,11 +25,11 @@ flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name); flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *tag, char *blockid, - uint64_t ms); + uint64_t ms, char *random_str); char *azb_block_blob_id_logs(uint64_t *ms); char *azb_block_blob_id_blob(struct flb_azure_blob *ctx, char *path, uint64_t part_id); -int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms); +int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms, char *str); int azb_block_blob_commit_file_parts(struct flb_azure_blob *ctx, uint64_t file_id, cfl_sds_t path, cfl_sds_t part_ids); diff --git a/plugins/out_azure_blob/azure_blob_conf.c b/plugins/out_azure_blob/azure_blob_conf.c index d9c6ee68837..ea883a01852 100644 --- a/plugins/out_azure_blob/azure_blob_conf.c +++ b/plugins/out_azure_blob/azure_blob_conf.c @@ -647,6 +647,14 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in } } + /* Check for invalid configuration: buffering enabled with appendblob */ + if (ctx->buffering_enabled == FLB_TRUE && ctx->btype == AZURE_BLOB_APPENDBLOB) { + flb_plg_error(ctx->ins, + "buffering is not supported with 'appendblob' blob_type. " + "Please use 'blockblob' blob_type or disable buffering."); + return NULL; + } + /* Compress (gzip) */ tmp = (char *) flb_output_get_property("compress", ins); ctx->compress_gzip = FLB_FALSE; @@ -708,6 +716,12 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in ctx->u = flb_upstream_create(config, ctx->real_endpoint, port, io_flags, ins->tls); + if (ctx->buffering_enabled == FLB_TRUE){ + flb_stream_disable_flags(&ctx->u->base, FLB_IO_ASYNC); + ctx->u->base.net.io_timeout = ctx->io_timeout; + } + + flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base)); if (!ctx->u) { flb_plg_error(ctx->ins, "cannot create upstream for endpoint '%s'", ctx->real_endpoint); diff --git a/plugins/out_azure_blob/azure_blob_store.c b/plugins/out_azure_blob/azure_blob_store.c new file mode 100644 index 00000000000..26211e89af0 --- /dev/null +++ b/plugins/out_azure_blob/azure_blob_store.c @@ -0,0 +1,453 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "azure_blob_store.h" +#include +#include +#include +#include + +/* + * Simple and fast hashing algorithm to create keys in the local buffer + */ +static flb_sds_t gen_store_filename(const char *tag) +{ + int c; + unsigned long hash = 5381; + unsigned long hash2 = 5381; + flb_sds_t hash_str; + flb_sds_t tmp; + struct flb_time tm; + + /* get current time */ + flb_time_get(&tm); + + /* compose hash */ + while ((c = *tag++)) { + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + hash2 = (unsigned long) hash2 * tm.tm.tv_sec * tm.tm.tv_nsec; + + /* flb_sds_printf allocs if the incoming sds is not at least 64 bytes */ + hash_str = flb_sds_create_size(64); + if (!hash_str) { + return NULL; + } + tmp = flb_sds_printf(&hash_str, "%lu-%lu", hash, hash2); + if (!tmp) { + flb_sds_destroy(hash_str); + return NULL; + } + hash_str = tmp; + + return hash_str; +} + + +/* Retrieve a candidate buffer file using the tag */ +struct azure_blob_file *azure_blob_store_file_get(struct flb_azure_blob *ctx, const char *tag, + int tag_len) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_fstore_file *fsf = NULL; + struct azure_blob_file *azure_blob_file; + int found = 0; + + /* + * Based in the current ctx->stream_name, locate a candidate file to + * store the incoming data using as a lookup pattern the content Tag. + */ + mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + + /* skip and warn on partially initialized chunks */ + if (fsf->data == NULL) { + flb_plg_warn(ctx->ins, "BAD: found flb_fstore_file with NULL data reference, tag=%s, file=%s, will try to delete", tag, fsf->name); + flb_fstore_file_delete(ctx->fs, fsf); + } + + if (fsf->meta_size != tag_len) { + fsf = NULL; + continue; + } + + /* skip locked chunks */ + azure_blob_file = fsf->data; + if (azure_blob_file->locked == FLB_TRUE) { + flb_plg_debug(ctx->ins, "File '%s' is being processed by another worker, continuing search", fsf->name); + fsf = NULL; + continue; + } + + + /* compare meta and tag */ + if (strncmp((char *) fsf->meta_buf, tag, tag_len) == 0 ) { + flb_plg_debug(ctx->ins, "Found matching file '%s' for tag '%.*s'", fsf->name, tag_len, tag); + found = 1; + break; + } + } + + if (!found) { + return NULL; + } else { + return fsf->data; + } +} + +/* Append data to a new or existing fstore file */ +int azure_blob_store_buffer_put(struct flb_azure_blob *ctx, struct azure_blob_file *azure_blob_file, + flb_sds_t tag, size_t tag_len, + flb_sds_t data, size_t bytes) { + int ret; + flb_sds_t name; + struct flb_fstore_file *fsf; + size_t space_remaining; + + if (ctx->store_dir_limit_size > 0 && ctx->current_buffer_size + bytes >= ctx->store_dir_limit_size) { + flb_plg_error(ctx->ins, "Buffer is full: current_buffer_size=%zu, new_data=%zu, store_dir_limit_size=%zu bytes", + ctx->current_buffer_size, bytes, ctx->store_dir_limit_size); + return -1; + } + + /* If no target file was found, create a new one */ + if (azure_blob_file == NULL) { + name = gen_store_filename(tag); + if (!name) { + flb_plg_error(ctx->ins, "could not generate chunk file name"); + return -1; + } + + flb_plg_debug(ctx->ins, "[azure_blob] new buffer file: %s", name); + + /* Create the file */ + fsf = flb_fstore_file_create(ctx->fs, ctx->stream_active, name, bytes); + if (!fsf) { + flb_plg_error(ctx->ins, "could not create the file '%s' in the store", + name); + flb_sds_destroy(name); + return -1; + } + + /* Write tag as metadata */ + ret = flb_fstore_file_meta_set(ctx->fs, fsf, (char *) tag, tag_len); + if (ret == -1) { + flb_plg_warn(ctx->ins, "Deleting buffer file because metadata could not be written"); + flb_fstore_file_delete(ctx->fs, fsf); + flb_sds_destroy(name); + return -1; + } + + /* Allocate local context */ + azure_blob_file = flb_calloc(1, sizeof(struct azure_blob_file)); + if (!azure_blob_file) { + flb_errno(); + flb_plg_warn(ctx->ins, "Deleting buffer file because azure_blob context creation failed"); + flb_fstore_file_delete(ctx->fs, fsf); + flb_sds_destroy(name); + return -1; + } + azure_blob_file->fsf = fsf; + azure_blob_file->create_time = time(NULL); + azure_blob_file->size = 0; // Initialize size to 0 + + /* Use fstore opaque 'data' reference to keep our context */ + fsf->data = azure_blob_file; + flb_sds_destroy(name); + + } + else { + fsf = azure_blob_file->fsf; + } + + /* Append data to the target file */ + ret = flb_fstore_file_append(azure_blob_file->fsf, data, bytes); + + if (ret != 0) { + flb_plg_error(ctx->ins, "error writing data to local azure_blob file"); + return -1; + } + + azure_blob_file->size += bytes; + ctx->current_buffer_size += bytes; + + flb_plg_debug(ctx->ins, "[azure_blob] new file size: %zu", azure_blob_file->size); + flb_plg_debug(ctx->ins, "[azure_blob] current_buffer_size: %zu", ctx->current_buffer_size); + + /* if buffer is 95% full, warn user */ + if (ctx->store_dir_limit_size > 0) { + space_remaining = ctx->store_dir_limit_size - ctx->current_buffer_size; + if ((space_remaining * 20) < ctx->store_dir_limit_size) { + flb_plg_warn(ctx->ins, "Buffer is almost full: current_buffer_size=%zu, store_dir_limit_size=%zu bytes", + ctx->current_buffer_size, ctx->store_dir_limit_size); + } + } + return 0; +} + +static int set_files_context(struct flb_azure_blob *ctx) +{ + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_stream *fs_stream; + struct flb_fstore_file *fsf; + struct azure_blob_file *azure_blob_file; + + mk_list_foreach(head, &ctx->fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + + /* skip current stream since it's new */ + if (fs_stream == ctx->stream_active) { + continue; + } + + /* skip multi-upload */ + if (fs_stream == ctx->stream_upload) { + continue; + } + + mk_list_foreach(f_head, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + if (fsf->data) { + continue; + } + + /* Allocate local context */ + azure_blob_file = flb_calloc(1, sizeof(struct azure_blob_file)); + if (!azure_blob_file) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot allocate azure_blob file context"); + continue; + } + azure_blob_file->fsf = fsf; + azure_blob_file->create_time = time(NULL); + + /* Use fstore opaque 'data' reference to keep our context */ + fsf->data = azure_blob_file; + } + } + + return 0; +} + +/* Initialize filesystem storage for azure_blob plugin */ +int azure_blob_store_init(struct flb_azure_blob *ctx) +{ + int type; + time_t now; + char tmp[64]; + struct tm *tm; + struct flb_fstore *fs; + struct flb_fstore_stream *fs_stream; + + /* Set the storage type */ + type = FLB_FSTORE_FS; + + /* Initialize the storage context */ + if (ctx->buffer_dir[strlen(ctx->buffer_dir) - 1] == '/') { + snprintf(tmp, sizeof(tmp), "%s%s", ctx->buffer_dir, ctx->azure_blob_buffer_key); + } + else { + snprintf(tmp, sizeof(tmp), "%s/%s", ctx->buffer_dir, ctx->azure_blob_buffer_key); + } + + /* Initialize the storage context */ + fs = flb_fstore_create(tmp, type); + if (!fs) { + return -1; + } + ctx->fs = fs; + + /* + * On every start we create a new stream, this stream in the file system + * is directory with the name using the date like '2020-10-03T13:00:02'. So + * all the 'new' data that is generated on this process is stored there. + * + * Note that previous data in similar directories from previous runs is + * considered backlog data, in the azure_blob plugin we need to differenciate the + * new v/s the older buffered data. + * + * Compose a stream name... + */ + now = time(NULL); + tm = localtime(&now); + +#ifdef FLB_SYSTEM_WINDOWS + /* Windows does not allow ':' in directory names */ + strftime(tmp, sizeof(tmp) - 1, "%Y-%m-%dT%H-%M-%S", tm); +#else + strftime(tmp, sizeof(tmp) - 1, "%Y-%m-%dT%H:%M:%S", tm); +#endif + + /* Create the stream */ + fs_stream = flb_fstore_stream_create(ctx->fs, tmp); + if (!fs_stream) { + /* Upon exception abort */ + flb_plg_error(ctx->ins, "could not initialize active stream: %s", tmp); + flb_fstore_destroy(fs); + ctx->fs = NULL; + return -1; + } + ctx->stream_active = fs_stream; + + set_files_context(ctx); + return 0; +} + +int azure_blob_store_exit(struct flb_azure_blob *ctx) +{ + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_stream *fs_stream; + struct flb_fstore_file *fsf; + struct azure_blob_file *azure_blob_file; + + if (!ctx->fs) { + return 0; + } + + /* release local context on non-multi upload files */ + mk_list_foreach(head, &ctx->fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + if (fs_stream == ctx->stream_upload) { + continue; + } + + mk_list_foreach(f_head, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + if (fsf->data != NULL) { + azure_blob_file = fsf->data; + flb_free(azure_blob_file); + } + } + } + + if (ctx->fs) { + flb_fstore_destroy(ctx->fs); + } + return 0; +} + +/* + * Check if the store has data. This function is only used on plugin + * initialization + */ +int azure_blob_store_has_data(struct flb_azure_blob *ctx) +{ + struct mk_list *head; + struct flb_fstore_stream *fs_stream; + + if (!ctx->fs) { + return FLB_FALSE; + } + + mk_list_foreach(head, &ctx->fs->streams) { + /* skip multi upload stream */ + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + if (fs_stream == ctx->stream_upload) { + continue; + } + + if (mk_list_size(&fs_stream->files) > 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +int azure_blob_store_has_uploads(struct flb_azure_blob *ctx) +{ + if (!ctx || !ctx->stream_upload) { + return FLB_FALSE; + } + + if (mk_list_size(&ctx->stream_upload->files) > 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +int azure_blob_store_file_inactive(struct flb_azure_blob *ctx, struct azure_blob_file *azure_blob_file) +{ + int ret; + struct flb_fstore_file *fsf; + + fsf = azure_blob_file->fsf; + + flb_free(azure_blob_file); + ret = flb_fstore_file_inactive(ctx->fs, fsf); + + return ret; +} + +int azure_blob_store_file_cleanup(struct flb_azure_blob *ctx, struct azure_blob_file *azure_blob_file) +{ + struct flb_fstore_file *fsf; + + fsf = azure_blob_file->fsf; + + /* permanent deletion */ + flb_fstore_file_delete(ctx->fs, fsf); + flb_free(azure_blob_file); + + return 0; +} + +int azure_blob_store_file_delete(struct flb_azure_blob *ctx, struct azure_blob_file *azure_blob_file) +{ + struct flb_fstore_file *fsf; + + fsf = azure_blob_file->fsf; + ctx->current_buffer_size -= azure_blob_file->size; + + /* permanent deletion */ + flb_fstore_file_delete(ctx->fs, fsf); + flb_free(azure_blob_file); + + return 0; +} + +int azure_blob_store_file_upload_read(struct flb_azure_blob *ctx, struct flb_fstore_file *fsf, + char **out_buf, size_t *out_size) +{ + int ret; + + ret = flb_fstore_file_content_copy(ctx->fs, fsf, + (void **) out_buf, out_size); + return ret; +} + +/* Always set an updated copy of metadata into the fs_store_file entry */ +int azure_blob_store_file_meta_get(struct flb_azure_blob *ctx, struct flb_fstore_file *fsf) +{ + return flb_fstore_file_meta_get(ctx->fs, fsf); +} + +void azure_blob_store_file_lock(struct azure_blob_file *azure_blob_file) +{ + azure_blob_file->locked = FLB_TRUE; +} + +void azure_blob_store_file_unlock(struct azure_blob_file *azure_blob_file) +{ + azure_blob_file->locked = FLB_FALSE; +} \ No newline at end of file diff --git a/plugins/out_azure_blob/azure_blob_store.h b/plugins/out_azure_blob/azure_blob_store.h new file mode 100644 index 00000000000..a4e7db823ce --- /dev/null +++ b/plugins/out_azure_blob/azure_blob_store.h @@ -0,0 +1,61 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_AZURE_BLOB_STORE_H +#define FLB_OUT_AZURE_BLOB_STORE_H + + +#include +#include +#include "azure_blob.h" + +struct azure_blob_file { + int locked; /* locked chunk is busy, cannot write to it */ + int failures; /* delivery failures */ + size_t size; /* file size */ + time_t create_time; /* creation time */ + flb_sds_t file_path; /* file path */ + int lock_fd; // File descriptor for locking + struct flb_fstore_file *fsf; /* reference to parent flb_fstore_file */ +}; + +int azure_blob_store_buffer_put(struct flb_azure_blob *ctx, struct azure_blob_file *azure_blob_file, + flb_sds_t tag, size_t tag_len, + flb_sds_t data, size_t bytes); + +int azure_blob_store_init(struct flb_azure_blob *ctx); +int azure_blob_store_exit(struct flb_azure_blob *ctx); + +int azure_blob_store_has_data(struct flb_azure_blob *ctx); +int azure_blob_store_has_uploads(struct flb_azure_blob *ctx); + +int azure_blob_store_file_inactive(struct flb_azure_blob *ctx, struct azure_blob_file *azure_blob_file); +struct azure_blob_file *azure_blob_store_file_get(struct flb_azure_blob *ctx, const char *tag, + int tag_len); +int azure_blob_store_file_cleanup(struct flb_azure_blob *ctx, struct azure_blob_file *azure_blob_file); +int azure_blob_store_file_delete(struct flb_azure_blob *ctx, struct azure_blob_file *azure_blob_file); +int azure_blob_store_file_upload_read(struct flb_azure_blob *ctx, struct flb_fstore_file *fsf, + char **out_buf, size_t *out_size); + +int azure_blob_store_file_meta_get(struct flb_azure_blob *ctx, struct flb_fstore_file *fsf); + +void azure_blob_store_file_lock(struct azure_blob_file *azure_blob_file); +void azure_blob_store_file_unlock(struct azure_blob_file *azure_blob_file); + +#endif