Skip to content

Commit 3234daf

Browse files
Thomas WilkStephen Lee
andcommitted
plugins: out_azure_blob: add support for logging a specific key
Co-authored-by: Stephen Lee <[email protected]> Signed-off-by: Thomas Wilk <[email protected]>
1 parent addf261 commit 3234daf

File tree

2 files changed

+174
-6
lines changed

2 files changed

+174
-6
lines changed

plugins/out_azure_blob/azure_blob.c

Lines changed: 173 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,160 @@ struct worker_info {
5151

5252
FLB_TLS_DEFINE(struct worker_info, worker_info);
5353

54+
static flb_sds_t cb_azb_msgpack_extract_log_key(void *out_context, const char *data,
55+
uint64_t bytes)
56+
{
57+
int i;
58+
int records = 0;
59+
int map_size;
60+
int check = FLB_FALSE;
61+
int found = FLB_FALSE;
62+
int log_key_missing = 0;
63+
int ret;
64+
int alloc_error = 0;
65+
struct flb_azure_blob *ctx = out_context;
66+
char *val_buf;
67+
char *key_str = NULL;
68+
size_t key_str_size = 0;
69+
size_t msgpack_size = bytes + bytes / 4;
70+
size_t val_offset = 0;
71+
flb_sds_t out_buf;
72+
msgpack_object map;
73+
msgpack_object key;
74+
msgpack_object val;
75+
struct flb_log_event_decoder log_decoder;
76+
struct flb_log_event log_event;
77+
78+
/* Iterate the original buffer and perform adjustments */
79+
records = flb_mp_count(data, bytes);
80+
if (records <= 0) {
81+
return NULL;
82+
}
83+
84+
/* Allocate buffer to store log_key contents */
85+
val_buf = flb_calloc(1, msgpack_size);
86+
if (val_buf == NULL) {
87+
flb_plg_error(ctx->ins, "Could not allocate enough "
88+
"memory to read record");
89+
flb_errno();
90+
return NULL;
91+
}
92+
93+
ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
94+
95+
if (ret != FLB_EVENT_DECODER_SUCCESS) {
96+
flb_plg_error(ctx->ins,
97+
"Log event decoder initialization error : %d", ret);
98+
99+
flb_free(val_buf);
100+
101+
return NULL;
102+
}
103+
104+
105+
while (!alloc_error &&
106+
(ret = flb_log_event_decoder_next(
107+
&log_decoder,
108+
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
109+
110+
/* Get the record/map */
111+
map = *log_event.body;
112+
113+
if (map.type != MSGPACK_OBJECT_MAP) {
114+
continue;
115+
}
116+
117+
map_size = map.via.map.size;
118+
119+
/* Reset variables for found log_key and correct type */
120+
found = FLB_FALSE;
121+
check = FLB_FALSE;
122+
123+
/* Extract log_key from record and append to output buffer */
124+
for (i = 0; i < map_size; i++) {
125+
key = map.via.map.ptr[i].key;
126+
val = map.via.map.ptr[i].val;
127+
128+
if (key.type == MSGPACK_OBJECT_BIN) {
129+
key_str = (char *) key.via.bin.ptr;
130+
key_str_size = key.via.bin.size;
131+
check = FLB_TRUE;
132+
}
133+
if (key.type == MSGPACK_OBJECT_STR) {
134+
key_str = (char *) key.via.str.ptr;
135+
key_str_size = key.via.str.size;
136+
check = FLB_TRUE;
137+
}
138+
139+
if (check == FLB_TRUE) {
140+
if (strncmp(ctx->log_key, key_str, key_str_size) == 0) {
141+
found = FLB_TRUE;
142+
143+
/*
144+
* Copy contents of value into buffer. Necessary to copy
145+
* strings because flb_msgpack_to_json does not handle nested
146+
* JSON gracefully and double escapes them.
147+
*/
148+
if (val.type == MSGPACK_OBJECT_BIN) {
149+
memcpy(val_buf + val_offset, val.via.bin.ptr, val.via.bin.size);
150+
val_offset += val.via.bin.size;
151+
val_buf[val_offset] = '\n';
152+
val_offset++;
153+
}
154+
else if (val.type == MSGPACK_OBJECT_STR) {
155+
memcpy(val_buf + val_offset, val.via.str.ptr, val.via.str.size);
156+
val_offset += val.via.str.size;
157+
val_buf[val_offset] = '\n';
158+
val_offset++;
159+
}
160+
else {
161+
ret = flb_msgpack_to_json(val_buf + val_offset,
162+
msgpack_size - val_offset, &val);
163+
if (ret < 0) {
164+
break;
165+
}
166+
val_offset += ret;
167+
val_buf[val_offset] = '\n';
168+
val_offset++;
169+
}
170+
/* Exit early once log_key has been found for current record */
171+
break;
172+
}
173+
}
174+
}
175+
176+
/* If log_key was not found in the current record, mark log key as missing */
177+
if (found == FLB_FALSE) {
178+
log_key_missing++;
179+
}
180+
}
181+
182+
/* Throw error once per chunk if at least one log key was not found */
183+
if (log_key_missing > 0) {
184+
flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records",
185+
ctx->log_key, log_key_missing);
186+
}
187+
188+
flb_log_event_decoder_destroy(&log_decoder);
189+
190+
/* If nothing was read, destroy buffer */
191+
if (val_offset == 0) {
192+
flb_free(val_buf);
193+
return NULL;
194+
}
195+
val_buf[val_offset] = '\0';
196+
197+
/* Create output buffer to store contents */
198+
out_buf = flb_sds_create(val_buf);
199+
if (out_buf == NULL) {
200+
flb_plg_error(ctx->ins, "Error creating buffer to store log_key contents.");
201+
flb_errno();
202+
}
203+
flb_free(val_buf);
204+
205+
return out_buf;
206+
}
207+
54208
static int azure_blob_format(struct flb_config *config,
55209
struct flb_input_instance *ins,
56210
void *plugin_context,
@@ -63,10 +217,15 @@ static int azure_blob_format(struct flb_config *config,
63217
flb_sds_t out_buf;
64218
struct flb_azure_blob *ctx = plugin_context;
65219

66-
out_buf = flb_pack_msgpack_to_json_format(data, bytes,
67-
FLB_PACK_JSON_FORMAT_LINES,
68-
FLB_PACK_JSON_DATE_ISO8601,
69-
ctx->date_key);
220+
if (ctx->log_key) {
221+
out_buf = cb_azb_msgpack_extract_log_key(ctx, data, bytes);
222+
}
223+
else {
224+
out_buf = flb_pack_msgpack_to_json_format(data, bytes,
225+
FLB_PACK_JSON_FORMAT_LINES,
226+
FLB_PACK_JSON_DATE_ISO8601,
227+
ctx->date_key);
228+
}
70229
if (!out_buf) {
71230
return -1;
72231
}
@@ -593,13 +752,13 @@ static int ensure_container(struct flb_azure_blob *ctx)
593752
else if (status == 200) {
594753
flb_plg_info(ctx->ins, "container '%s' already exists", ctx->container_name);
595754
return FLB_TRUE;
596-
}
755+
}
597756
else if (status == 403) {
598757
flb_plg_error(ctx->ins, "failed getting container '%s', access denied",
599758
ctx->container_name);
600759
return FLB_FALSE;
601760
}
602-
761+
603762
flb_plg_error(ctx->ins, "get container request failed, status=%i",
604763
status);
605764

@@ -1171,6 +1330,14 @@ static struct flb_config_map config_map[] = {
11711330
"Set the block type: appendblob or blockblob"
11721331
},
11731332

1333+
{
1334+
FLB_CONFIG_MAP_STR, "log_key", NULL,
1335+
0, FLB_TRUE, offsetof(struct flb_azure_blob, log_key),
1336+
"By default, the whole log record will be sent to blob storage. "
1337+
"If you specify a key name with this option, then only the value of "
1338+
"that key will be sent"
1339+
},
1340+
11741341
{
11751342
FLB_CONFIG_MAP_STR, "compress", NULL,
11761343
0, FLB_FALSE, 0,

plugins/out_azure_blob/azure_blob.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct flb_azure_blob {
5353
flb_sds_t account_name;
5454
flb_sds_t container_name;
5555
flb_sds_t blob_type;
56+
flb_sds_t log_key;
5657
flb_sds_t shared_key;
5758
flb_sds_t endpoint;
5859
flb_sds_t path;

0 commit comments

Comments
 (0)