Skip to content

Commit fab5a70

Browse files
author
Andreas Ringlstetter
committed
Cache ZSTD compression/decompression context #48187
1 parent e4b5b11 commit fab5a70

File tree

1 file changed

+32
-6
lines changed

1 file changed

+32
-6
lines changed

cpp/src/arrow/util/compression_zstd.cc

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,27 @@ Status ZSTDError(size_t ret, const char* prefix_msg) {
4040
return Status::IOError(prefix_msg, ZSTD_getErrorName(ret));
4141
}
4242

43+
// ----------------------------------------------------------------------
44+
// ZSTD context deleter implementation
45+
struct ZSTDContextDeleter {
46+
void operator()(ZSTD_DCtx* r) {
47+
if (r) {
48+
ZSTD_freeDCtx(r);
49+
}
50+
}
51+
void operator()(ZSTD_CCtx* r) {
52+
if (r) {
53+
ZSTD_freeCCtx(r);
54+
}
55+
}
56+
};
57+
4358
// ----------------------------------------------------------------------
4459
// ZSTD decompressor implementation
4560

4661
class ZSTDDecompressor : public Decompressor {
4762
public:
48-
ZSTDDecompressor() : stream_(ZSTD_createDStream()) {}
63+
ZSTDDecompressor() : stream_(ZSTD_createDStream()), finished_(false) {}
4964

5065
~ZSTDDecompressor() override { ZSTD_freeDStream(stream_); }
5166

@@ -187,9 +202,13 @@ class ZSTDCodec : public Codec {
187202
DCHECK_EQ(output_buffer_len, 0);
188203
output_buffer = &empty_buffer;
189204
}
190-
191-
size_t ret = ZSTD_decompress(output_buffer, static_cast<size_t>(output_buffer_len),
192-
input, static_cast<size_t>(input_len));
205+
if (!decompression_context_) {
206+
decompression_context_ =
207+
decltype(decompression_context_)(ZSTD_createDCtx(), ZSTDContextDeleter{});
208+
}
209+
size_t ret = ZSTD_decompressDCtx(decompression_context_.get(), output_buffer,
210+
static_cast<size_t>(output_buffer_len), input,
211+
static_cast<size_t>(input_len));
193212
if (ZSTD_isError(ret)) {
194213
return ZSTDError(ret, "ZSTD decompression failed: ");
195214
}
@@ -207,8 +226,13 @@ class ZSTDCodec : public Codec {
207226

208227
Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
209228
int64_t output_buffer_len, uint8_t* output_buffer) override {
210-
size_t ret = ZSTD_compress(output_buffer, static_cast<size_t>(output_buffer_len),
211-
input, static_cast<size_t>(input_len), compression_level_);
229+
if (!compression_context_) {
230+
compression_context_ =
231+
decltype(compression_context_)(ZSTD_createCCtx(), ZSTDContextDeleter{});
232+
}
233+
size_t ret = ZSTD_compressCCtx(compression_context_.get(), output_buffer,
234+
static_cast<size_t>(output_buffer_len), input,
235+
static_cast<size_t>(input_len), compression_level_);
212236
if (ZSTD_isError(ret)) {
213237
return ZSTDError(ret, "ZSTD compression failed: ");
214238
}
@@ -236,6 +260,8 @@ class ZSTDCodec : public Codec {
236260

237261
private:
238262
const int compression_level_;
263+
std::unique_ptr<ZSTD_DCtx, ZSTDContextDeleter> decompression_context_;
264+
std::unique_ptr<ZSTD_CCtx, ZSTDContextDeleter> compression_context_;
239265
};
240266

241267
} // namespace

0 commit comments

Comments
 (0)