Skip to content

Commit 53f2e45

Browse files
andy-wang-fPettitWesley
authored andcommitted
out_s3: add gzip compression support for multipart uploads
fixing the logic part for choose mode for multipart current logic is we are going to compress data first then make decision which method(put_object or multipart) we are going to move compress file before put object remove compress data in upload add compress in put_all trunk. when compressed failed we will upload origin file The patch have been tested using valgrind Signed-off-by: Andy Wang <[email protected]>
1 parent 186b3e7 commit 53f2e45

File tree

2 files changed

+83
-50
lines changed

2 files changed

+83
-50
lines changed

plugins/out_s3/s3.c

Lines changed: 82 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
138138
if (ctx->content_type != NULL) {
139139
headers_len++;
140140
}
141-
if (ctx->compression == FLB_AWS_COMPRESS_GZIP && multipart_upload == FLB_FALSE) {
141+
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
142142
headers_len++;
143143
}
144144
if (ctx->canned_acl != NULL) {
@@ -168,7 +168,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
168168
s3_headers[n].val_len = strlen(ctx->content_type);
169169
n++;
170170
}
171-
if (ctx->compression == FLB_AWS_COMPRESS_GZIP && multipart_upload == FLB_FALSE) {
171+
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
172172
s3_headers[n] = content_encoding_header;
173173
n++;
174174
}
@@ -637,6 +637,25 @@ static int cb_s3_init(struct flb_output_instance *ins,
637637
ctx->use_put_object = FLB_TRUE;
638638
}
639639

640+
tmp = flb_output_get_property("compression", ins);
641+
if (tmp) {
642+
ret = flb_aws_compression_get_type(tmp);
643+
if (ret == -1) {
644+
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
645+
return -1;
646+
}
647+
if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) {
648+
flb_plg_error(ctx->ins,
649+
"use_put_object must be enabled when Apache Arrow is enabled");
650+
return -1;
651+
}
652+
ctx->compression = ret;
653+
}
654+
655+
tmp = flb_output_get_property("content_type", ins);
656+
if (tmp) {
657+
ctx->content_type = (char *) tmp;
658+
}
640659
if (ctx->use_put_object == FLB_FALSE) {
641660
/* upload_chunk_size */
642661
if (ctx->upload_chunk_size <= 0) {
@@ -652,9 +671,16 @@ static int cb_s3_init(struct flb_output_instance *ins,
652671
flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes");
653672
return -1;
654673
}
655-
if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) {
656-
flb_plg_error(ctx->ins, "Max upload_chunk_size is 50M");
657-
return -1;
674+
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
675+
if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) {
676+
flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB");
677+
return -1;
678+
}
679+
} else {
680+
if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) {
681+
flb_plg_error(ctx->ins, "Max upload_chunk_size is 50MB");
682+
return -1;
683+
}
658684
}
659685
}
660686

@@ -737,26 +763,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
737763
ctx->canned_acl = (char *) tmp;
738764
}
739765

740-
tmp = flb_output_get_property("compression", ins);
741-
if (tmp) {
742-
if (ctx->use_put_object == FLB_FALSE) {
743-
flb_plg_error(ctx->ins,
744-
"use_put_object must be enabled when compression is enabled");
745-
return -1;
746-
}
747-
ret = flb_aws_compression_get_type(tmp);
748-
if (ret == -1) {
749-
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
750-
return -1;
751-
}
752-
ctx->compression = ret;
753-
}
754-
755-
tmp = flb_output_get_property("content_type", ins);
756-
if (tmp) {
757-
ctx->content_type = (char *) tmp;
758-
}
759-
760766
tmp = flb_output_get_property("storage_class", ins);
761767
if (tmp) {
762768
ctx->storage_class = (char *) tmp;
@@ -978,6 +984,22 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
978984
int timeout_check = FLB_FALSE;
979985
time_t create_time;
980986
int ret;
987+
void *payload_buf = NULL;
988+
size_t payload_size = 0;
989+
size_t preCompress_size = 0;
990+
991+
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
992+
/* Map payload */
993+
ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size);
994+
if (ret == -1) {
995+
flb_plg_error(ctx->ins, "Failed to compress data");
996+
return FLB_RETRY;
997+
} else {
998+
preCompress_size = body_size;
999+
body = (void *) payload_buf;
1000+
body_size = payload_size;
1001+
}
1002+
}
9811003

9821004
if (ctx->use_put_object == FLB_TRUE) {
9831005
goto put_object;
@@ -1009,6 +1031,10 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
10091031
goto multipart;
10101032
}
10111033
else {
1034+
if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1035+
flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %d, After compression, chunk is only %d bytes, "
1036+
"the chunk was too small, using PutObject to upload", preCompress_size, body_size);
1037+
}
10121038
goto put_object;
10131039
}
10141040
}
@@ -1035,6 +1061,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
10351061
}
10361062

10371063
ret = s3_put_object(ctx, tag, create_time, body, body_size);
1064+
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1065+
flb_free(payload_buf);
1066+
}
10381067
if (ret < 0) {
10391068
/* re-add chunk to list */
10401069
if (chunk) {
@@ -1059,6 +1088,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
10591088
if (chunk) {
10601089
s3_store_file_unlock(chunk);
10611090
}
1091+
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1092+
flb_free(payload_buf);
1093+
}
10621094
return FLB_RETRY;
10631095
}
10641096
}
@@ -1070,13 +1102,19 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
10701102
if (chunk) {
10711103
s3_store_file_unlock(chunk);
10721104
}
1105+
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1106+
flb_free(payload_buf);
1107+
}
10731108
return FLB_RETRY;
10741109
}
10751110
m_upload->upload_state = MULTIPART_UPLOAD_STATE_CREATED;
10761111
}
10771112

10781113
ret = upload_part(ctx, m_upload, body, body_size);
10791114
if (ret < 0) {
1115+
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1116+
flb_free(payload_buf);
1117+
}
10801118
m_upload->upload_errors += 1;
10811119
/* re-add chunk to list */
10821120
if (chunk) {
@@ -1095,13 +1133,14 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
10951133
return FLB_RETRY;
10961134
}
10971135
m_upload->part_number += 1;
1098-
10991136
/* data was sent successfully- delete the local buffer */
11001137
if (chunk) {
11011138
s3_store_file_delete(ctx, chunk);
11021139
chunk = NULL;
11031140
}
1104-
1141+
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1142+
flb_free(payload_buf);
1143+
}
11051144
if (m_upload->bytes >= ctx->file_size) {
11061145
size_check = FLB_TRUE;
11071146
flb_plg_info(ctx->ins, "Will complete upload for %s because uploaded data is greater"
@@ -1144,6 +1183,8 @@ static int put_all_chunks(struct flb_s3 *ctx)
11441183
struct mk_list *f_head;
11451184
struct flb_fstore_file *fsf;
11461185
struct flb_fstore_stream *fs_stream;
1186+
void *payload_buf = NULL;
1187+
size_t payload_size = 0;
11471188
char *buffer = NULL;
11481189
size_t buffer_size;
11491190
int ret;
@@ -1186,6 +1227,18 @@ static int put_all_chunks(struct flb_s3 *ctx)
11861227
return -1;
11871228
}
11881229

1230+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
1231+
/* Map payload */
1232+
ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size);
1233+
if (ret == -1) {
1234+
flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss");
1235+
} else {
1236+
flb_plg_info(ctx->ins, "Pre-compression chunk size is %d, After compression, chunk is %d bytes", buffer_size, payload_size);
1237+
buffer = (void *) payload_buf;
1238+
buffer_size = payload_size;
1239+
}
1240+
}
1241+
11891242
ret = s3_put_object(ctx, (const char *)
11901243
fsf->meta_buf,
11911244
chunk->create_time, buffer, buffer_size);
@@ -1283,9 +1336,6 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
12831336
char *final_key;
12841337
flb_sds_t uri;
12851338
flb_sds_t tmp;
1286-
void *compressed_body;
1287-
char *final_body;
1288-
size_t final_body_size;
12891339
char final_body_md5[25];
12901340

12911341
s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters,
@@ -1332,24 +1382,9 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
13321382
flb_sds_destroy(s3_key);
13331383
uri = tmp;
13341384

1335-
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
1336-
ret = flb_aws_compression_compress(ctx->compression, body, body_size,
1337-
&compressed_body, &final_body_size);
1338-
if (ret == -1) {
1339-
flb_plg_error(ctx->ins, "Failed to compress data");
1340-
flb_sds_destroy(uri);
1341-
return -1;
1342-
}
1343-
final_body = (char *) compressed_body;
1344-
}
1345-
else {
1346-
final_body = body;
1347-
final_body_size = body_size;
1348-
}
1349-
13501385
memset(final_body_md5, 0, sizeof(final_body_md5));
13511386
if (ctx->send_content_md5 == FLB_TRUE) {
1352-
ret = get_md5_base64(final_body, final_body_size,
1387+
ret = get_md5_base64(body, body_size,
13531388
final_body_md5, sizeof(final_body_md5));
13541389
if (ret != 0) {
13551390
flb_plg_error(ctx->ins, "Failed to create Content-MD5 header");
@@ -1383,11 +1418,8 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
13831418
goto decrement_index;
13841419
}
13851420
c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
1386-
uri, final_body, final_body_size,
1421+
uri, body, body_size,
13871422
headers, num_headers);
1388-
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
1389-
flb_free(compressed_body);
1390-
}
13911423
flb_free(headers);
13921424
}
13931425
if (c) {

plugins/out_s3/s3.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
/* Upload data to S3 in 5MB chunks */
3030
#define MIN_CHUNKED_UPLOAD_SIZE 5242880
3131
#define MAX_CHUNKED_UPLOAD_SIZE 50000000
32+
#define MAX_CHUNKED_UPLOAD_COMPRESS_SIZE 5000000000
3233

3334
#define UPLOAD_TIMER_MAX_WAIT 60000
3435
#define UPLOAD_TIMER_MIN_WAIT 6000

0 commit comments

Comments
 (0)