Skip to content

Commit a7cf962

Browse files
committed
zstd: extend support for unknown compressed size
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 69b1ead commit a7cf962

File tree

1 file changed

+82
-0
lines changed

1 file changed

+82
-0
lines changed

src/flb_zstd.c

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
#include <fluent-bit/flb_compression.h>
2525
#include <fluent-bit/flb_zstd.h>
2626

27+
28+
#define FLB_ZSTD_DEFAULT_CHUNK 64 * 1024 /* 64 KB buffer */
29+
2730
size_t flb_zstd_compress(void *in_data, size_t in_len, void **out_data, size_t *out_len)
2831
{
2932
void *buf;
@@ -50,8 +53,83 @@ size_t flb_zstd_compress(void *in_data, size_t in_len, void **out_data, size_t *
5053
return 0;
5154
}
5255

56+
static int zstd_uncompress_unknown_size(void *in_data, size_t in_len, void **out_data, size_t *out_len)
57+
{
58+
int ret = 0;
59+
size_t out_size;
60+
char *tmp;
61+
void *buf;
62+
63+
ZSTD_DCtx *dctx;
64+
ZSTD_inBuffer input;
65+
ZSTD_outBuffer output;
66+
67+
/* create decompression context */
68+
dctx = ZSTD_createDCtx();
69+
if (!dctx) {
70+
flb_error("[zstd] cannot create decompression context");
71+
return -1;
72+
}
73+
74+
/* initial output buffer */
75+
out_size = FLB_ZSTD_DEFAULT_CHUNK;
76+
buf = flb_malloc(out_size);
77+
if (!buf) {
78+
flb_errno();
79+
ZSTD_freeDCtx(dctx);
80+
return -1;
81+
}
82+
83+
/* input */
84+
input.src = in_data;
85+
input.size = in_len;
86+
input.pos = 0;
87+
88+
/* start the decompress loop */
89+
output.dst = buf;
90+
output.pos = 0;
91+
output.size = out_size;
92+
93+
while (input.pos < input.size) {
94+
ret = ZSTD_decompressStream(dctx, &output, &input);
95+
if (ZSTD_isError(ret)) {
96+
flb_error("[zstd] decompression failed: %s", ZSTD_getErrorName(ret));
97+
flb_free(buf);
98+
ZSTD_freeDCtx(dctx);
99+
return -1;
100+
}
101+
102+
/* check if we need more space */
103+
if (output.pos == out_size) {
104+
out_size *= 2;
105+
tmp = flb_realloc(buf, out_size);
106+
if (!tmp) {
107+
flb_errno();
108+
flb_free(buf);
109+
ZSTD_freeDCtx(dctx);
110+
return -1;
111+
}
112+
buf = tmp;
113+
output.dst = buf;
114+
output.size = out_size;
115+
}
116+
117+
/* check if we have finished */
118+
if (ret == 0) {
119+
break;
120+
}
121+
}
122+
123+
ZSTD_freeDCtx(dctx);
124+
125+
*out_data = buf;
126+
*out_len = output.pos;
127+
return 0;
128+
}
129+
53130
size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t *out_len)
54131
{
132+
int ret;
55133
void *buf;
56134
size_t size;
57135

@@ -60,6 +138,10 @@ size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t
60138
flb_error("[zstd] invalid content size");
61139
return -1;
62140
}
141+
else if (size == ZSTD_CONTENTSIZE_UNKNOWN) {
142+
ret = zstd_uncompress_unknown_size(in_data, in_len, out_data, out_len);
143+
return ret;
144+
}
63145

64146
buf = flb_malloc(size);
65147
if (!buf) {

0 commit comments

Comments
 (0)