Skip to content

Commit 8cd266d

Browse files
committed
pipeline: outputs: azure_blob: add log_key to outputs
1 parent 653f8d5 commit 8cd266d

File tree

1 file changed

+76
-127
lines changed

1 file changed

+76
-127
lines changed

plugins/out_azure_blob/azure_blob.c

Lines changed: 76 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
#include <fluent-bit/flb_log_event_decoder.h>
3131
#include <fluent-bit/flb_plugin.h>
3232
#include <fluent-bit/flb_notification.h>
33+
#include <fluent-bit/flb_record_accessor.h>
34+
#include <fluent-bit/flb_ra_key.h>
3335

3436
#include <msgpack.h>
3537

@@ -54,153 +56,100 @@ FLB_TLS_DEFINE(struct worker_info, worker_info);
5456
static flb_sds_t cb_azb_msgpack_extract_log_key(void *out_context, const char *data,
5557
uint64_t bytes)
5658
{
57-
int i;
58-
int records = 0;
59-
int map_size;
60-
int check = FLB_FALSE;
61-
int found = FLB_FALSE;
62-
int log_key_missing = 0;
63-
int ret;
64-
int alloc_error = 0;
6559
struct flb_azure_blob *ctx = out_context;
66-
char *val_buf;
67-
char *key_str = NULL;
68-
size_t key_str_size = 0;
69-
size_t msgpack_size = bytes + bytes / 4;
70-
size_t val_offset = 0;
71-
flb_sds_t out_buf;
60+
flb_sds_t out_buf = NULL;
61+
msgpack_unpacked result;
62+
msgpack_object root;
7263
msgpack_object map;
73-
msgpack_object key;
74-
msgpack_object val;
75-
struct flb_log_event_decoder log_decoder;
76-
struct flb_log_event log_event;
64+
struct flb_record_accessor *ra = NULL;
65+
struct flb_ra_value *rval = NULL;
66+
size_t off = 0;
7767

78-
/* Iterate the original buffer and perform adjustments */
79-
records = flb_mp_count(data, bytes);
80-
if (records <= 0) {
81-
return NULL;
82-
}
83-
84-
/* Allocate buffer to store log_key contents */
85-
val_buf = flb_calloc(1, msgpack_size);
86-
if (val_buf == NULL) {
87-
flb_plg_error(ctx->ins, "Could not allocate enough "
88-
"memory to read record");
68+
ra = flb_ra_create(ctx->log_key, FLB_FALSE);
69+
if (!ra) {
70+
flb_plg_error(ctx->ins, "invalid record accessor pattern '%s'", ctx->log_key);
8971
flb_errno();
9072
return NULL;
9173
}
9274

93-
ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
94-
95-
if (ret != FLB_EVENT_DECODER_SUCCESS) {
96-
flb_plg_error(ctx->ins,
97-
"Log event decoder initialization error : %d", ret);
98-
99-
flb_free(val_buf);
100-
101-
return NULL;
102-
}
103-
104-
105-
while (!alloc_error &&
106-
(ret = flb_log_event_decoder_next(
107-
&log_decoder,
108-
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
109-
110-
/* Get the record/map */
111-
map = *log_event.body;
112-
113-
if (map.type != MSGPACK_OBJECT_MAP) {
114-
continue;
115-
}
75+
/* Unpack the data */
76+
msgpack_unpacked_init(&result);
77+
while (1) {
78+
msgpack_unpack_return ret = msgpack_unpack_next(&result, data, bytes, &off);
79+
if (ret == MSGPACK_UNPACK_SUCCESS) {
80+
root = result.data;
81+
if (root.type != MSGPACK_OBJECT_ARRAY) {
82+
continue;
83+
}
11684

117-
map_size = map.via.map.size;
85+
if (root.via.array.size < 2) {
86+
flb_plg_debug(ctx->ins, "msgpack array has insufficient elements");
87+
continue;
88+
}
11889

119-
/* Reset variables for found log_key and correct type */
120-
found = FLB_FALSE;
121-
check = FLB_FALSE;
90+
map = root.via.array.ptr[1];
12291

123-
/* Extract log_key from record and append to output buffer */
124-
for (i = 0; i < map_size; i++) {
125-
key = map.via.map.ptr[i].key;
126-
val = map.via.map.ptr[i].val;
92+
/* Get value using record accessor */
93+
rval = flb_ra_get_value_object(ra, map);
94+
if (!rval) {
95+
flb_plg_error(ctx->ins, "could not find field '%s'", ctx->log_key);
96+
continue;
97+
}
12798

128-
if (key.type == MSGPACK_OBJECT_BIN) {
129-
key_str = (char *) key.via.bin.ptr;
130-
key_str_size = key.via.bin.size;
131-
check = FLB_TRUE;
99+
/* Convert value based on its type */
100+
if (rval->type == FLB_RA_STRING) {
101+
out_buf = flb_sds_create_len(rval->o.via.str.ptr, rval->o.via.str.size);
132102
}
133-
if (key.type == MSGPACK_OBJECT_STR) {
134-
key_str = (char *) key.via.str.ptr;
135-
key_str_size = key.via.str.size;
136-
check = FLB_TRUE;
103+
else if (rval->type == FLB_RA_FLOAT) {
104+
out_buf = flb_sds_create_size(64);
105+
if (out_buf) {
106+
flb_sds_printf(&out_buf, "%f", rval->val.f64);
107+
}
137108
}
138-
139-
if (check == FLB_TRUE) {
140-
if (strncmp(ctx->log_key, key_str, key_str_size) == 0) {
141-
found = FLB_TRUE;
142-
143-
/*
144-
* Copy contents of value into buffer. Necessary to copy
145-
* strings because flb_msgpack_to_json does not handle nested
146-
* JSON gracefully and double escapes them.
147-
*/
148-
if (val.type == MSGPACK_OBJECT_BIN) {
149-
memcpy(val_buf + val_offset, val.via.bin.ptr, val.via.bin.size);
150-
val_offset += val.via.bin.size;
151-
val_buf[val_offset] = '\n';
152-
val_offset++;
153-
}
154-
else if (val.type == MSGPACK_OBJECT_STR) {
155-
memcpy(val_buf + val_offset, val.via.str.ptr, val.via.str.size);
156-
val_offset += val.via.str.size;
157-
val_buf[val_offset] = '\n';
158-
val_offset++;
159-
}
160-
else {
161-
ret = flb_msgpack_to_json(val_buf + val_offset,
162-
msgpack_size - val_offset, &val);
163-
if (ret < 0) {
164-
break;
165-
}
166-
val_offset += ret;
167-
val_buf[val_offset] = '\n';
168-
val_offset++;
169-
}
170-
/* Exit early once log_key has been found for current record */
171-
break;
109+
else if (rval->type == FLB_RA_INT) {
110+
out_buf = flb_sds_create_size(64);
111+
if (out_buf) {
112+
flb_sds_printf(&out_buf, "%" PRId64, rval->val.i64);
172113
}
173114
}
174-
}
175-
176-
/* If log_key was not found in the current record, mark log key as missing */
177-
if (found == FLB_FALSE) {
178-
log_key_missing++;
179-
}
180-
}
115+
else {
116+
flb_errno();
117+
flb_plg_error(ctx->ins, "cannot convert given value for field '%s'", ctx->log_key);
118+
flb_ra_key_value_destroy(rval);
119+
rval = NULL;
120+
break;
121+
}
181122

182-
/* Throw error once per chunk if at least one log key was not found */
183-
if (log_key_missing > 0) {
184-
flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records",
185-
ctx->log_key, log_key_missing);
186-
}
123+
/* Check if buffer allocation succeeded */
124+
if (!out_buf) {
125+
flb_errno();
126+
flb_plg_error(ctx->ins, "could not allocate output buffer");
127+
}
187128

188-
flb_log_event_decoder_destroy(&log_decoder);
129+
flb_ra_key_value_destroy(rval);
130+
rval = NULL;
189131

190-
/* If nothing was read, destroy buffer */
191-
if (val_offset == 0) {
192-
flb_free(val_buf);
193-
return NULL;
132+
break;
133+
}
134+
else if (ret == MSGPACK_UNPACK_CONTINUE) {
135+
/* Continue unpacking */
136+
continue;
137+
}
138+
else if (ret == MSGPACK_UNPACK_PARSE_ERROR) {
139+
flb_errno();
140+
flb_plg_error(ctx->ins, "msgpack parse error");
141+
break;
142+
}
143+
else {
144+
flb_errno();
145+
flb_plg_error(ctx->ins, "unexpected msgpack unpack return code");
146+
break;
147+
}
194148
}
195-
val_buf[val_offset] = '\0';
196149

197-
/* Create output buffer to store contents */
198-
out_buf = flb_sds_create(val_buf);
199-
if (out_buf == NULL) {
200-
flb_plg_error(ctx->ins, "Error creating buffer to store log_key contents.");
201-
flb_errno();
202-
}
203-
flb_free(val_buf);
150+
/* Clean up */
151+
msgpack_unpacked_destroy(&result);
152+
flb_ra_destroy(ra);
204153

205154
return out_buf;
206155
}

0 commit comments

Comments
 (0)