Skip to content

Commit 708dfd7

Browse files
out_stackdriver: support gzip compression (#7101)
Implement gzip compression Signed-off-by: Catherine Fang <[email protected]> Co-authored-by: igorpeshansky <[email protected]>
1 parent 52969be commit 708dfd7

File tree

3 files changed

+44
-4
lines changed

3 files changed

+44
-4
lines changed

plugins/out_stackdriver/stackdriver.c

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include <fluent-bit/flb_ra_key.h>
3333
#include <fluent-bit/flb_record_accessor.h>
3434
#include <fluent-bit/flb_log_event_decoder.h>
35+
#include <fluent-bit/flb_gzip.h>
3536

3637
#include <msgpack.h>
3738

@@ -2522,10 +2523,12 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
25222523
size_t b_sent;
25232524
flb_sds_t token;
25242525
flb_sds_t payload_buf;
2525-
size_t payload_size;
2526+
void *compressed_payload_buffer = NULL;
2527+
size_t compressed_payload_size;
25262528
struct flb_stackdriver *ctx = out_context;
25272529
struct flb_connection *u_conn;
25282530
struct flb_http_client *c;
2531+
int compressed = FLB_FALSE;
25292532
#ifdef FLB_HAVE_METRICS
25302533
char *name = (char *) flb_output_name(ctx->ins);
25312534
uint64_t ts = cfl_time_now();
@@ -2563,7 +2566,6 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
25632566
flb_upstream_conn_release(u_conn);
25642567
FLB_OUTPUT_RETURN(FLB_RETRY);
25652568
}
2566-
payload_size = flb_sds_len(payload_buf);
25672569

25682570
/* Get or renew Token */
25692571
token = get_google_token(ctx);
@@ -2581,9 +2583,22 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
25812583
FLB_OUTPUT_RETURN(FLB_RETRY);
25822584
}
25832585

2586+
compressed_payload_buffer = payload_buf;
2587+
compressed_payload_size = flb_sds_len(payload_buf);
2588+
if (ctx->compress_gzip == FLB_TRUE) {
2589+
ret = flb_gzip_compress((void *) payload_buf, flb_sds_len(payload_buf),
2590+
&compressed_payload_buffer, &compressed_payload_size);
2591+
if (ret == -1) {
2592+
flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression");
2593+
} else {
2594+
compressed = FLB_TRUE;
2595+
flb_sds_destroy(payload_buf);
2596+
}
2597+
}
2598+
25842599
/* Compose HTTP Client request */
25852600
c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_STD_WRITE_URI,
2586-
payload_buf, payload_size, NULL, 0, NULL, 0);
2601+
compressed_payload_buffer, compressed_payload_size, NULL, 0, NULL, 0);
25872602

25882603
flb_http_buffer_size(c, 4192);
25892604

@@ -2598,6 +2613,10 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
25982613

25992614
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
26002615
flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
2616+
/* Content Encoding: gzip */
2617+
if (compressed == FLB_TRUE) {
2618+
flb_http_set_content_encoding_gzip(c);
2619+
}
26012620

26022621
/* Send HTTP request */
26032622
ret = flb_http_do(c, &b_sent);
@@ -2661,8 +2680,14 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
26612680
update_retry_metric(ctx, event_chunk, ts, c->resp.status, ret_code);
26622681
#endif
26632682

2683+
26642684
/* Cleanup */
2665-
flb_sds_destroy(payload_buf);
2685+
if (compressed == FLB_TRUE) {
2686+
flb_free(compressed_payload_buffer);
2687+
}
2688+
else {
2689+
flb_sds_destroy(payload_buf);
2690+
}
26662691
flb_sds_destroy(token);
26672692
flb_http_client_destroy(c);
26682693
flb_upstream_conn_release(u_conn);
@@ -2785,6 +2810,11 @@ static struct flb_config_map config_map[] = {
27852810
0, FLB_TRUE, offsetof(struct flb_stackdriver, task_id),
27862811
"Set the resource task id"
27872812
},
2813+
{
2814+
FLB_CONFIG_MAP_STR, "compress", NULL,
2815+
0, FLB_FALSE, 0,
2816+
"Set log payload compression method. Option available is 'gzip'"
2817+
},
27882818
{
27892819
FLB_CONFIG_MAP_CLIST, "labels", NULL,
27902820
0, FLB_TRUE, offsetof(struct flb_stackdriver, labels),

plugins/out_stackdriver/stackdriver.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ struct flb_stackdriver {
160160
flb_sds_t job;
161161
flb_sds_t task_id;
162162

163+
/* Internal variable to reduce string comparisons */
164+
int compress_gzip;
165+
163166
/* other */
164167
flb_sds_t export_to_project_id;
165168
flb_sds_t resource;

plugins/out_stackdriver/stackdriver_conf.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,13 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance *
279279
return NULL;
280280
}
281281

282+
/* Compress (gzip) */
283+
tmp = flb_output_get_property("compress", ins);
284+
ctx->compress_gzip = FLB_FALSE;
285+
if (tmp && strcasecmp(tmp, "gzip") == 0) {
286+
ctx->compress_gzip = FLB_TRUE;
287+
}
288+
282289
/* labels */
283290
flb_kv_init(&ctx->config_labels);
284291
ret = parse_configuration_labels((void *)ctx);

0 commit comments

Comments
 (0)