Skip to content

Commit fcf50e3

Browse files
committed
out_stackdriver: fix batch drop on invalid labels
When a single record contains a logging.googleapis.com/labels field that is not a map, the entire batch is dropped. This causes data loss for all other valid records in the batch. Extract a shared should_skip_record() helper that validates both insertId and labels fields. Use it in the prescan loop (for array sizing) and at the top of the main packing loop (before any field extraction), so invalid records are skipped cleanly without complex cleanup paths. This change only addresses the labels-not-a-map case. Other batch- level drop scenarios (k8s local_resource_id failures, decoder init failures, JSON serialization failures) remain unchanged as they affect shared batch-level state by design. Signed-off-by: Yu Yi <yiyu@yiyu.me>
1 parent a1d9c2a commit fcf50e3

File tree

5 files changed

+561
-60
lines changed

5 files changed

+561
-60
lines changed

plugins/out_stackdriver/stackdriver.c

Lines changed: 54 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1768,6 +1768,48 @@ static int pack_payload(int insert_id_extracted,
17681768
return ret;
17691769
}
17701770

1771+
/*
1772+
* should_skip_record
1773+
* Check if a record should be skipped due to invalid fields.
1774+
* Returns FLB_TRUE if the record should be dropped.
1775+
*
1776+
* log_errors: if FLB_TRUE, log why the record is being dropped.
1777+
* Set to FLB_FALSE during prescan to avoid duplicate logging.
1778+
*/
1779+
static int should_skip_record(struct flb_stackdriver *ctx,
1780+
msgpack_object *obj,
1781+
int log_errors)
1782+
{
1783+
insert_id_status in_status;
1784+
msgpack_object insert_id_obj;
1785+
msgpack_object *payload_labels_ptr;
1786+
1787+
/* Check insertId */
1788+
in_status = validate_insert_id(&insert_id_obj, obj);
1789+
if (in_status == INSERTID_INVALID) {
1790+
if (log_errors == FLB_TRUE) {
1791+
flb_plg_error(ctx->ins,
1792+
"Incorrect insertId received. "
1793+
"InsertId should be non-empty string.");
1794+
}
1795+
return FLB_TRUE;
1796+
}
1797+
1798+
/* Check labels type */
1799+
payload_labels_ptr = get_payload_labels(ctx, obj);
1800+
if (payload_labels_ptr != NULL &&
1801+
payload_labels_ptr->type != MSGPACK_OBJECT_MAP) {
1802+
if (log_errors == FLB_TRUE) {
1803+
flb_plg_error(ctx->ins,
1804+
"the type of payload labels should be map, "
1805+
"dropping record");
1806+
}
1807+
return FLB_TRUE;
1808+
}
1809+
1810+
return FLB_FALSE;
1811+
}
1812+
17711813
static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
17721814
int total_records,
17731815
const char *tag, int tag_len,
@@ -1868,20 +1910,14 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
18681910
}
18691911

18701912
/*
1871-
* Search each entry and validate insertId.
1872-
* Reject the entry if insertId is invalid.
1913+
* Search each entry and validate record fields.
1914+
* Reject entries with invalid insertId or non-map labels.
18731915
* If all the entries are rejected, stop formatting.
1874-
*
18751916
*/
18761917
while ((ret = flb_log_event_decoder_next(
18771918
&log_decoder,
18781919
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
1879-
/* Extract insertId */
1880-
in_status = validate_insert_id(&insert_id_obj, log_event.body);
1881-
1882-
if (in_status == INSERTID_INVALID) {
1883-
flb_plg_error(ctx->ins,
1884-
"Incorrect insertId received. InsertId should be non-empty string.");
1920+
if (should_skip_record(ctx, log_event.body, FLB_FALSE) == FLB_TRUE) {
18851921
array_size -= 1;
18861922
}
18871923
}
@@ -2308,6 +2344,12 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
23082344
&log_decoder,
23092345
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
23102346
obj = log_event.body;
2347+
2348+
/* Skip records with invalid fields */
2349+
if (should_skip_record(ctx, obj, FLB_TRUE) == FLB_TRUE) {
2350+
continue;
2351+
}
2352+
23112353
tms_status = extract_timestamp(obj, &log_event.timestamp);
23122354

23132355
/*
@@ -2372,33 +2414,14 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
23722414
log_name_extracted = FLB_TRUE;
23732415
}
23742416

2375-
/* Extract insertId */
2417+
/* Extract insertId (INVALID case already handled by should_skip_record) */
23762418
in_status = validate_insert_id(&insert_id_obj, obj);
23772419
if (in_status == INSERTID_VALID) {
23782420
insert_id_extracted = FLB_TRUE;
23792421
entry_size += 1;
23802422
}
2381-
else if (in_status == INSERTID_NOT_PRESENT) {
2382-
insert_id_extracted = FLB_FALSE;
2383-
}
23842423
else {
2385-
if (trace_extracted == FLB_TRUE) {
2386-
flb_sds_destroy(trace);
2387-
}
2388-
2389-
if (span_id_extracted == FLB_TRUE) {
2390-
flb_sds_destroy(span_id);
2391-
}
2392-
2393-
if (project_id_extracted == FLB_TRUE) {
2394-
flb_sds_destroy(project_id_key);
2395-
}
2396-
2397-
if (log_name_extracted == FLB_TRUE) {
2398-
flb_sds_destroy(log_name);
2399-
}
2400-
2401-
continue;
2424+
insert_id_extracted = FLB_FALSE;
24022425
}
24032426

24042427
/* Extract operation */
@@ -2441,37 +2464,8 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
24412464
entry_size += 1;
24422465
}
24432466

2444-
/* Extract payload labels */
2467+
/* Extract payload labels (non-map case already handled by should_skip_record) */
24452468
payload_labels_ptr = get_payload_labels(ctx, obj);
2446-
if (payload_labels_ptr != NULL &&
2447-
payload_labels_ptr->type != MSGPACK_OBJECT_MAP) {
2448-
flb_plg_error(ctx->ins, "the type of payload labels should be map");
2449-
flb_sds_destroy(operation_id);
2450-
flb_sds_destroy(operation_producer);
2451-
flb_sds_destroy(source_location_file);
2452-
flb_sds_destroy(source_location_function);
2453-
2454-
if (trace_extracted == FLB_TRUE) {
2455-
flb_sds_destroy(trace);
2456-
}
2457-
2458-
if (span_id_extracted == FLB_TRUE) {
2459-
flb_sds_destroy(span_id);
2460-
}
2461-
2462-
if (project_id_extracted == FLB_TRUE) {
2463-
flb_sds_destroy(project_id_key);
2464-
}
2465-
2466-
if (log_name_extracted == FLB_TRUE) {
2467-
flb_sds_destroy(log_name);
2468-
}
2469-
2470-
flb_log_event_decoder_destroy(&log_decoder);
2471-
msgpack_sbuffer_destroy(&mp_sbuf);
2472-
2473-
return NULL;
2474-
}
24752469

24762470
/* Number of parsed labels */
24772471
labels_size = mk_list_size(&ctx->config_labels);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"message":"valid record with labels", "logging.googleapis.com/labels": {"testA": "valA", "testB": "valB"}}
2+
{"message":"bad labels - not a map", "logging.googleapis.com/labels": "not_a_map"}
3+
{"message":"valid record without labels"}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"message":"valid record no special fields"}
2+
{"message":"bad insertId", "logging.googleapis.com/insertId": 123}
3+
{"message":"bad labels", "logging.googleapis.com/labels": "not_a_map"}
4+
{"message":"valid record with labels", "logging.googleapis.com/labels": {"testA": "valA"}}

tests/runtime/data/stackdriver/stackdriver_test_labels.h

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,75 @@
4242
"}" \
4343
"}]"
4444

45+
/* labels value is a string instead of a map - should cause record to be dropped */
46+
#define LABELS_NOT_A_MAP "[" \
47+
"1591649196," \
48+
"{" \
49+
"\"logging.googleapis.com/labels\": \"not_a_map\"" \
50+
"}]"
51+
52+
/* custom labels key with string value instead of map */
53+
#define LABELS_NOT_A_MAP_CUSTOM_KEY "[" \
54+
"1591649196," \
55+
"{" \
56+
"\"logging.googleapis.com/customlabels\": \"not_a_map\"" \
57+
"}]"
58+
59+
/* invalid labels record with extracted fields (httpRequest, operation, */
60+
/* sourceLocation, trace, spanId) to verify cleanup on skip path */
61+
#define LABELS_NOT_A_MAP_WITH_FIELDS "[" \
62+
"1591649196," \
63+
"{" \
64+
"\"logging.googleapis.com/labels\": \"not_a_map\"," \
65+
"\"logging.googleapis.com/trace\": \"test_trace\"," \
66+
"\"logging.googleapis.com/spanId\": \"test_span\"," \
67+
"\"logging.googleapis.com/operation\": " \
68+
"{" \
69+
"\"id\": \"test_id\"," \
70+
"\"producer\": \"test_producer\"," \
71+
"\"first\": true," \
72+
"\"last\": true" \
73+
"}," \
74+
"\"logging.googleapis.com/sourceLocation\": " \
75+
"{" \
76+
"\"file\": \"test_file\"," \
77+
"\"line\": 123," \
78+
"\"function\": \"test_function\"" \
79+
"}," \
80+
"\"logging.googleapis.com/http_request\": " \
81+
"{" \
82+
"\"requestMethod\": \"GET\"," \
83+
"\"requestUrl\": \"https://example.com\"," \
84+
"\"status\": 200" \
85+
"}" \
86+
"}]"
87+
88+
/* two-record batch: first record has invalid labels, second is valid */
89+
#define BATCH_FIRST_RECORD_LABELS_NOT_A_MAP "[" \
90+
"1591649196," \
91+
"{" \
92+
"\"logging.googleapis.com/labels\": \"not_a_map\"," \
93+
"\"message\": \"bad first record\"" \
94+
"}]"
95+
96+
#define BATCH_FIRST_RECORD_VALID "[" \
97+
"1591649196," \
98+
"{" \
99+
"\"message\": \"valid second record\"" \
100+
"}]"
101+
102+
/* all records have invalid labels */
103+
#define BATCH_ALL_LABELS_NOT_A_MAP_1 "[" \
104+
"1591649196," \
105+
"{" \
106+
"\"logging.googleapis.com/labels\": \"not_a_map\"," \
107+
"\"message\": \"bad record 1\"" \
108+
"}]"
109+
110+
#define BATCH_ALL_LABELS_NOT_A_MAP_2 "[" \
111+
"1591649196," \
112+
"{" \
113+
"\"logging.googleapis.com/labels\": 12345," \
114+
"\"message\": \"bad record 2\"" \
115+
"}]"
116+

0 commit comments

Comments
 (0)