Skip to content

Commit c4d6803

Browse files
authored
stackdriver: Support writing to textPayload field of Cloud Logging LogEntry. (#8850)
Write payload to textPayload field of LogEntry if the text_payload_key is string format and the only field after stripping special fields. Signed-off-by: shuaichen <[email protected]>
1 parent bf1a7e5 commit c4d6803

File tree

4 files changed

+345
-35
lines changed

4 files changed

+345
-35
lines changed

plugins/out_stackdriver/stackdriver.c

Lines changed: 68 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx)
372372
if (time(NULL) >= cached_expiration) {
373373
return output;
374374
} else {
375-
/*
375+
/*
376376
* Cached token is expired. Wait on lock to use up-to-date token
377377
* by either waiting for it to be refreshed or refresh it ourselves.
378378
*/
@@ -1068,7 +1068,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx,
10681068
if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) {
10691069
flb_mp_map_header_append(mh);
10701070
msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
1071-
msgpack_pack_str_body(mp_pck, label_kv->key,
1071+
msgpack_pack_str_body(mp_pck, label_kv->key,
10721072
flb_sds_len(label_kv->key));
10731073
msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string));
10741074
msgpack_pack_str_body(mp_pck, rval->val.string,
@@ -1082,7 +1082,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx,
10821082
} else {
10831083
flb_mp_map_header_append(mh);
10841084
msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
1085-
msgpack_pack_str_body(mp_pck, label_kv->key,
1085+
msgpack_pack_str_body(mp_pck, label_kv->key,
10861086
flb_sds_len(label_kv->key));
10871087
msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val));
10881088
msgpack_pack_str_body(mp_pck, label_kv->val,
@@ -1284,7 +1284,7 @@ static int cb_stackdriver_init(struct flb_output_instance *ins,
12841284
return -1;
12851285
}
12861286

1287-
if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE
1287+
if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE
12881288
&& ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) {
12891289
ret = gce_metadata_read_zone(ctx);
12901290
if (ret == -1) {
@@ -1434,13 +1434,13 @@ static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * s
14341434
{
14351435
msgpack_object tmp;
14361436
int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN);
1437-
1437+
14381438
if (ret == 0 && tmp.via.boolean == true) {
14391439
*trace_sampled_value = FLB_TRUE;
14401440
return 0;
14411441
} else if (ret == 0 && tmp.via.boolean == false) {
14421442
*trace_sampled_value = FLB_FALSE;
1443-
return 0;
1443+
return 0;
14441444
}
14451445

14461446
return -1;
@@ -1476,15 +1476,16 @@ static insert_id_status validate_insert_id(msgpack_object * insert_id_value,
14761476
return ret;
14771477
}
14781478

1479-
static int pack_json_payload(int insert_id_extracted,
1480-
int operation_extracted, int operation_extra_size,
1481-
int source_location_extracted,
1482-
int source_location_extra_size,
1483-
int http_request_extracted,
1484-
int http_request_extra_size,
1485-
timestamp_status tms_status,
1486-
msgpack_packer *mp_pck, msgpack_object *obj,
1487-
struct flb_stackdriver *ctx)
1479+
static int pack_payload(int insert_id_extracted,
1480+
int operation_extracted,
1481+
int operation_extra_size,
1482+
int source_location_extracted,
1483+
int source_location_extra_size,
1484+
int http_request_extracted,
1485+
int http_request_extra_size,
1486+
timestamp_status tms_status,
1487+
msgpack_packer *mp_pck, msgpack_object *obj,
1488+
struct flb_stackdriver *ctx)
14881489
{
14891490
/* Specified fields include local_resource_id, operation, sourceLocation ... */
14901491
int i, j;
@@ -1495,10 +1496,14 @@ static int pack_json_payload(int insert_id_extracted,
14951496
int len;
14961497
int len_to_be_removed;
14971498
int key_not_found;
1499+
int text_payload_len = 0;
1500+
int is_string_text_payload = FLB_FALSE;
1501+
int write_to_textpayload_field = FLB_FALSE;
14981502
flb_sds_t removed;
14991503
flb_sds_t monitored_resource_key;
15001504
flb_sds_t local_resource_id_key;
15011505
flb_sds_t stream;
1506+
flb_sds_t text_payload = NULL;
15021507
msgpack_object_kv *kv = obj->via.map.ptr;
15031508
msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;
15041509

@@ -1565,14 +1570,36 @@ static int pack_json_payload(int insert_id_extracted,
15651570

15661571
new_map_size = map_size - to_remove;
15671572

1568-
ret = msgpack_pack_map(mp_pck, new_map_size);
1569-
if (ret < 0) {
1570-
goto error;
1573+
if (ctx->text_payload_key && get_string(&text_payload, obj, ctx->text_payload_key) == 0) {
1574+
is_string_text_payload = FLB_TRUE;
1575+
}
1576+
1577+
/* write to textPayload if text_payload_key is the only residual string field*/
1578+
if ((new_map_size == 1) && is_string_text_payload) {
1579+
write_to_textpayload_field = FLB_TRUE;
1580+
}
1581+
1582+
if (write_to_textpayload_field) {
1583+
msgpack_pack_str(mp_pck, 11);
1584+
msgpack_pack_str_body(mp_pck, "textPayload", 11);
1585+
1586+
text_payload_len = flb_sds_len(text_payload);
1587+
msgpack_pack_str(mp_pck, text_payload_len);
1588+
msgpack_pack_str_body(mp_pck, text_payload, text_payload_len);
1589+
} else {
1590+
/* jsonPayload */
1591+
msgpack_pack_str(mp_pck, 11);
1592+
msgpack_pack_str_body(mp_pck, "jsonPayload", 11);
1593+
1594+
ret = msgpack_pack_map(mp_pck, new_map_size);
1595+
if (ret < 0) {
1596+
goto error;
1597+
}
15711598
}
15721599

15731600
/* points back to the beginning of map */
15741601
kv = obj->via.map.ptr;
1575-
for(; kv != kvend; ++kv ) {
1602+
for(; kv != kvend; ++kv) {
15761603
key_not_found = 1;
15771604

15781605
/* processing logging.googleapis.com/insertId */
@@ -1639,7 +1666,8 @@ static int pack_json_payload(int insert_id_extracted,
16391666
}
16401667
}
16411668

1642-
if (key_not_found) {
1669+
/* write residual log fields to jsonPayload */
1670+
if (key_not_found && !write_to_textpayload_field) {
16431671
ret = msgpack_pack_object(mp_pck, kv->key);
16441672
if (ret < 0) {
16451673
goto error;
@@ -1654,12 +1682,14 @@ static int pack_json_payload(int insert_id_extracted,
16541682
flb_sds_destroy(monitored_resource_key);
16551683
flb_sds_destroy(local_resource_id_key);
16561684
flb_sds_destroy(stream);
1685+
flb_sds_destroy(text_payload);
16571686
return 0;
16581687

16591688
error:
16601689
flb_sds_destroy(monitored_resource_key);
16611690
flb_sds_destroy(local_resource_id_key);
16621691
flb_sds_destroy(stream);
1692+
flb_sds_destroy(text_payload);
16631693
return ret;
16641694
}
16651695

@@ -1821,7 +1851,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
18211851
msgpack_pack_str_body(&mp_pck, "labels", 6);
18221852

18231853
ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes);
1824-
if (ret != 0) {
1854+
if (ret != 0) {
18251855
if (ctx->resource_type == RESOURCE_TYPE_K8S) {
18261856
ret = extract_local_resource_id(data, bytes, ctx, tag);
18271857
if (ret != 0) {
@@ -2314,7 +2344,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
23142344
/* Extract httpRequest */
23152345
init_http_request(&http_request);
23162346
http_request_extra_size = 0;
2317-
http_request_extracted = extract_http_request(&http_request,
2347+
http_request_extracted = extract_http_request(&http_request,
23182348
ctx->http_request_key,
23192349
ctx->http_request_key_size,
23202350
obj, &http_request_extra_size);
@@ -2432,17 +2462,16 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
24322462
flb_sds_destroy(source_location_function);
24332463
destroy_http_request(&http_request);
24342464

2435-
/* jsonPayload */
2436-
msgpack_pack_str(&mp_pck, 11);
2437-
msgpack_pack_str_body(&mp_pck, "jsonPayload", 11);
2438-
pack_json_payload(insert_id_extracted,
2439-
operation_extracted, operation_extra_size,
2440-
source_location_extracted,
2441-
source_location_extra_size,
2442-
http_request_extracted,
2443-
http_request_extra_size,
2444-
tms_status,
2445-
&mp_pck, obj, ctx);
2465+
/* both textPayload and jsonPayload are supported */
2466+
pack_payload(insert_id_extracted,
2467+
operation_extracted,
2468+
operation_extra_size,
2469+
source_location_extracted,
2470+
source_location_extra_size,
2471+
http_request_extracted,
2472+
http_request_extra_size,
2473+
tms_status,
2474+
&mp_pck, obj, ctx);
24462475

24472476
/* avoid modifying the original tag */
24482477
newtag = tag;
@@ -2594,7 +2623,7 @@ static void update_retry_metric(struct flb_stackdriver *ctx,
25942623
uint64_t ts,
25952624
int http_status)
25962625
{
2597-
char tmp[32];
2626+
char tmp[32];
25982627
char *name = (char *) flb_output_name(ctx->ins);
25992628

26002629
/* convert status to string format */
@@ -3154,6 +3183,11 @@ static struct flb_config_map config_map[] = {
31543183
0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels),
31553184
"Set the resource labels"
31563185
},
3186+
{
3187+
FLB_CONFIG_MAP_STR, "text_payload_key", (char *)NULL,
3188+
0, FLB_TRUE, offsetof(struct flb_stackdriver, text_payload_key),
3189+
"Set key for extracting text payload"
3190+
},
31573191
{
31583192
FLB_CONFIG_MAP_BOOL, "test_log_entry_format", "false",
31593193
0, FLB_TRUE, offsetof(struct flb_stackdriver, test_log_entry_format),

plugins/out_stackdriver/stackdriver.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ struct flb_stackdriver {
208208
/* upstream context for metadata end-point */
209209
struct flb_upstream *metadata_u;
210210

211+
/* the key to extract unstructured text payload from */
212+
flb_sds_t text_payload_key;
213+
211214
#ifdef FLB_HAVE_METRICS
212215
/* metrics */
213216
struct cmt_counter *cmt_successful_requests;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#define STRING_TEXT_PAYLOAD "[" \
2+
"1595349600," \
3+
"{" \
4+
"\"message\": \"The application errored out\"," \
5+
"\"logging.googleapis.com/severity\": \"ERROR\"" \
6+
"}]"
7+
8+
#define STRING_TEXT_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \
9+
"1595349600," \
10+
"{" \
11+
"\"message\": \"The application errored out\"," \
12+
"\"logging.googleapis.com/severity\": \"ERROR\"," \
13+
"\"errorCode\": \"400\"" \
14+
"}]"
15+
16+
#define NON_SCALAR_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \
17+
"1595349600," \
18+
"{" \
19+
"\"message\": " \
20+
"{" \
21+
"\"application_name\": \"my_application\"," \
22+
"\"error_message\": \"The application errored out\"," \
23+
"}," \
24+
"\"logging.googleapis.com/severity\": \"ERROR\"," \
25+
"\"errorCode\": \"400\"" \
26+
"}]"

0 commit comments

Comments
 (0)