Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 128 additions & 6 deletions plugins/out_azure_blob/azure_blob.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include <fluent-bit/flb_plugin.h>
#include <fluent-bit/flb_notification.h>
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_record_accessor.h>
#include <fluent-bit/flb_ra_key.h>

#include <msgpack.h>

Expand All @@ -53,6 +55,113 @@ struct worker_info {

FLB_TLS_DEFINE(struct worker_info, worker_info);

static flb_sds_t cb_azb_msgpack_extract_log_key(void *out_context, const char *data,
uint64_t bytes)
{
struct flb_azure_blob *ctx = out_context;
flb_sds_t out_buf = NULL;
msgpack_unpacked result;
msgpack_object root;
msgpack_object map;
struct flb_record_accessor *ra = NULL;
struct flb_ra_value *rval = NULL;
size_t off = 0;

ra = flb_ra_create(ctx->log_key, FLB_FALSE);
if (!ra) {
flb_plg_error(ctx->ins, "invalid record accessor pattern '%s'", ctx->log_key);
flb_errno();
return NULL;
}

/* Unpack the data */
msgpack_unpacked_init(&result);
while (1) {
msgpack_unpack_return ret = msgpack_unpack_next(&result, data, bytes, &off);
if (ret == MSGPACK_UNPACK_SUCCESS) {
root = result.data;
if (root.type != MSGPACK_OBJECT_ARRAY) {
continue;
}

if (root.via.array.size < 2) {
flb_plg_debug(ctx->ins, "msgpack array has insufficient elements");
continue;
}

map = root.via.array.ptr[1];

/* Get value using record accessor */
rval = flb_ra_get_value_object(ra, map);
if (!rval) {
flb_plg_error(ctx->ins, "could not find field '%s'", ctx->log_key);
continue;
}

/* Convert value based on its type */
if (rval->type == FLB_RA_STRING) {
out_buf = flb_sds_create_size(rval->o.via.str.size + 1);
if (out_buf) {
flb_sds_copy(out_buf, rval->o.via.str.ptr, rval->o.via.str.size);
flb_sds_cat(out_buf, "\n", 1);
}
}
else if (rval->type == FLB_RA_FLOAT) {
out_buf = flb_sds_create_size(64);
if (out_buf) {
flb_sds_printf(&out_buf, "%f\n", rval->val.f64);
}
}
else if (rval->type == FLB_RA_INT) {
out_buf = flb_sds_create_size(64);
if (out_buf) {
flb_sds_printf(&out_buf, "%" PRId64 "\n", rval->val.i64);
}
}
else {
flb_errno();
flb_plg_error(ctx->ins, "cannot convert given value for field '%s'", ctx->log_key);
flb_ra_key_value_destroy(rval);
rval = NULL;
break;
}

/* Check if buffer allocation succeeded */
if (!out_buf) {
flb_errno();
flb_plg_error(ctx->ins, "could not allocate output buffer");
}

flb_ra_key_value_destroy(rval);
rval = NULL;

/* Successfully found and processed log_key, exit loop */
break;
}
else if (ret == MSGPACK_UNPACK_CONTINUE) {
/* Buffer exhausted or truncated data, stop processing */
flb_plg_debug(ctx->ins, "msgpack unpack needs more data or data truncated");
break;
}
else if (ret == MSGPACK_UNPACK_PARSE_ERROR) {
flb_errno();
flb_plg_error(ctx->ins, "msgpack parse error");
break;
}
else {
flb_errno();
flb_plg_error(ctx->ins, "unexpected msgpack unpack return code %d", ret);
break;
}
}

/* Clean up */
msgpack_unpacked_destroy(&result);
flb_ra_destroy(ra);

return out_buf;
}

static int azure_blob_format(struct flb_config *config,
struct flb_input_instance *ins,
void *plugin_context,
Expand All @@ -65,10 +174,15 @@ static int azure_blob_format(struct flb_config *config,
flb_sds_t out_buf;
struct flb_azure_blob *ctx = plugin_context;

out_buf = flb_pack_msgpack_to_json_format(data, bytes,
FLB_PACK_JSON_FORMAT_LINES,
FLB_PACK_JSON_DATE_ISO8601,
ctx->date_key);
if (ctx->log_key) {
out_buf = cb_azb_msgpack_extract_log_key(ctx, data, bytes);
}
else {
out_buf = flb_pack_msgpack_to_json_format(data, bytes,
FLB_PACK_JSON_FORMAT_LINES,
FLB_PACK_JSON_DATE_ISO8601,
ctx->date_key);
}
if (!out_buf) {
return -1;
}
Expand Down Expand Up @@ -712,7 +826,7 @@ static int ensure_container(struct flb_azure_blob *ctx)
ctx->container_name);
return FLB_FALSE;
}

flb_plg_error(ctx->ins, "get container request failed, status=%i",
status);

Expand Down Expand Up @@ -1779,6 +1893,14 @@ static struct flb_config_map config_map[] = {
"Set the block type: appendblob or blockblob"
},

{
FLB_CONFIG_MAP_STR, "log_key", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, log_key),
"By default, the whole log record will be sent to blob storage. "
"If you specify a key name with this option, then only the value of "
"that key will be sent"
},

{
FLB_CONFIG_MAP_STR, "compress", NULL,
0, FLB_FALSE, 0,
Expand Down Expand Up @@ -1938,7 +2060,7 @@ static struct flb_config_map config_map[] = {
"Whether to delete the buffered file early after successful blob creation. Default is false"
},

{
{
FLB_CONFIG_MAP_INT, "blob_uri_length", "64",
0, FLB_TRUE, offsetof(struct flb_azure_blob, blob_uri_length),
"Set the length of generated blob uri before ingesting to Azure Kusto. Default is 64"
Expand Down
1 change: 1 addition & 0 deletions plugins/out_azure_blob/azure_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct flb_azure_blob {
flb_sds_t account_name;
flb_sds_t container_name;
flb_sds_t blob_type;
flb_sds_t log_key;
flb_sds_t shared_key;
flb_sds_t endpoint;
flb_sds_t path;
Expand Down