@@ -28,8 +28,25 @@ class ZstdCompressor : public Compressor {
2828 ZstdCompressor (CephContext *cct) : Compressor(COMP_ALG_ZSTD, " zstd" ), cct(cct) {}
2929
3030 int compress (const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t > &compressor_message) override {
31- ZSTD_CStream *s = ZSTD_createCStream ();
32- ZSTD_initCStream_srcSize (s, cct->_conf ->compressor_zstd_level , src.length ());
31+ ZSTD_CCtx *s = ZSTD_createCCtx ();
32+ if (!s) {
33+ return -ENOMEM;
34+ }
35+ size_t res = ZSTD_CCtx_reset (s, ZSTD_reset_session_and_parameters);
36+ if (ZSTD_isError (res)) {
37+ ZSTD_freeCCtx (s);
38+ return -EINVAL;
39+ }
40+ res = ZSTD_CCtx_setParameter (s, ZSTD_c_compressionLevel, cct->_conf ->compressor_zstd_level );
41+ if (ZSTD_isError (res)) {
42+ ZSTD_freeCCtx (s);
43+ return -EINVAL;
44+ }
45+ res = ZSTD_CCtx_setPledgedSrcSize (s, src.length ());
46+ if (ZSTD_isError (res)) {
47+ ZSTD_freeCCtx (s);
48+ return -EINVAL;
49+ }
3350 auto p = src.begin ();
3451 size_t left = src.length ();
3552
@@ -49,12 +66,13 @@ class ZstdCompressor : public Compressor {
4966 ZSTD_EndDirective const zed = (left==0 ) ? ZSTD_e_end : ZSTD_e_continue;
5067 size_t r = ZSTD_compressStream2 (s, &outbuf, &inbuf, zed);
5168 if (ZSTD_isError (r)) {
52- return -EINVAL;
69+ ZSTD_freeCCtx (s);
70+ return -EINVAL;
5371 }
5472 }
5573 ceph_assert (p.get_remaining () == 0 );
5674
57- ZSTD_freeCStream (s);
75+ ZSTD_freeCCtx (s);
5876
5977 // prefix with decompressed length
6078 ceph::encode ((uint32_t )src.length (), dst);
0 commit comments