Skip to content

Commit 855c5d6

Browse files
hualong fenghualongfeng
authored andcommitted
compressor: Change data formt to QZ_DEFLATE_GZIP_EXT for QAT zlib
QAT zlib 'QZ_DEFLATE_RAW' data format cannot decompress by QAT hardware. So here we replace 'QZ_DEFLATE_GZIP_EXT' data format with 'QZ_DEFLATE_RAW'. 'QZ_DEFLATE_GZIP_EXT' data format need to add gz_header by deflateSetHeader() in QATzip. And it leads multi stream in one compression for hardware buffer. So the windows bit is important information for decompression, which related to if the inflate remove header. Add busy_polling setting for reducing latency Signed-off-by: Feng,Hualong <[email protected]>
1 parent 1272408 commit 855c5d6

File tree

4 files changed

+46
-20
lines changed

4 files changed

+46
-20
lines changed

src/common/options/global.yaml.in

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,11 @@ options:
790790
level: advanced
791791
desc: Set the maximum number of session within Qatzip when using QAT compressor
792792
default: 256
793+
- name: qat_compressor_busy_polling
794+
type: bool
795+
level: advanced
796+
desc: Set QAT busy bolling to reduce latency at the cost of potentially increasing CPU usage
797+
default: false
793798
- name: plugin_crypto_accelerator
794799
type: str
795800
level: advanced

src/compressor/QatAccel.cc

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "common/dout.h"
2020
#include "common/errno.h"
2121
#include "QatAccel.h"
22+
#include "zlib.h"
2223

2324
// -----------------------------------------------------------------------------
2425
#define dout_context g_ceph_context
@@ -33,6 +34,7 @@ static std::ostream& _prefix(std::ostream* _dout)
3334
// -----------------------------------------------------------------------------
3435
// default window size for Zlib 1.2.8, negated for raw deflate
3536
#define ZLIB_DEFAULT_WIN_SIZE -15
37+
#define GZIP_WRAPPER 16
3638

3739
/* Estimate data expansion after decompression */
3840
static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000};
@@ -42,6 +44,10 @@ void QzSessionDeleter::operator() (struct QzSession_S *session) {
4244
delete session;
4345
}
4446

47+
QzPollingMode_T busy_polling(bool isSet) {
48+
return isSet ? QZ_BUSY_POLLING : QZ_PERIODICAL_POLLING;
49+
}
50+
4551
static bool setup_session(const std::string &alg, QatAccel::session_ptr &session) {
4652
int rc;
4753
rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT);
@@ -52,10 +58,12 @@ static bool setup_session(const std::string &alg, QatAccel::session_ptr &session
5258
rc = qzGetDefaultsDeflate(&params);
5359
if (rc != QZ_OK)
5460
return false;
55-
params.data_fmt = QZ_DEFLATE_RAW;
61+
62+
params.data_fmt = QZ_DEFLATE_GZIP_EXT;
5663
params.common_params.comp_algorithm = QZ_DEFLATE;
5764
params.common_params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level;
5865
params.common_params.direction = QZ_DIR_BOTH;
66+
params.common_params.polling_mode = busy_polling(g_ceph_context->_conf.get_val<bool>("qat_compressor_busy_polling"));
5967
rc = qzSetupSessionDeflate(session.get(), &params);
6068
if (rc != QZ_OK)
6169
return false;
@@ -136,6 +144,8 @@ bool QatAccel::init(const std::string &alg) {
136144
}
137145

138146
alg_name = alg;
147+
windowBits = GZIP_WRAPPER + MAX_WBITS;
148+
139149
return true;
140150
}
141151

@@ -145,7 +155,8 @@ int QatAccel::compress(const bufferlist &in, bufferlist &out, std::optional<int3
145155
return -1; // session initialization failed
146156
}
147157
auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
148-
compressor_message = ZLIB_DEFAULT_WIN_SIZE;
158+
compressor_message = windowBits;
159+
149160
int begin = 1;
150161
for (auto &i : in.buffers()) {
151162
const unsigned char* c_in = (unsigned char*) i.c_str();
@@ -188,28 +199,31 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
188199

189200
int rc = 0;
190201
bufferlist tmp;
191-
size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len);
192-
193-
while (remaining) {
194-
unsigned int ratio_idx = 0;
195-
const char* c_in = nullptr;
196-
unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
197-
remaining -= len;
198-
len -= begin;
199-
c_in += begin;
200-
begin = 0;
202+
unsigned int ratio_idx = 0;
203+
const char* c_in = nullptr;
204+
p.copy_all(tmp);
205+
c_in = tmp.c_str();
206+
unsigned int len = std::min<unsigned int>(tmp.length(), compressed_len);
207+
208+
len -= begin;
209+
c_in += begin;
210+
begin = 0;
211+
212+
bufferptr ptr;
213+
do {
201214
unsigned int out_len = QZ_HW_BUFF_SZ;
202-
203-
bufferptr ptr;
215+
unsigned int len_current = len;
204216
do {
205-
while (out_len <= len * expansion_ratio[ratio_idx]) {
217+
while (out_len <= len_current * expansion_ratio[ratio_idx]) {
206218
out_len *= 2;
207219
}
208220

209221
ptr = buffer::create_small_page_aligned(out_len);
210-
rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len);
222+
rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len_current, (unsigned char*)ptr.c_str(), &out_len);
211223
ratio_idx++;
212224
} while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio));
225+
c_in += len_current;
226+
len -= len_current;
213227

214228
if (rc == QZ_OK) {
215229
dst.append(ptr, 0, out_len);
@@ -223,7 +237,7 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
223237
dout(1) << "QAT compressor NOT OK" << dendl;
224238
return -1;
225239
}
226-
}
227240

241+
} while (len != 0);
228242
return 0;
229243
}

src/compressor/QatAccel.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class QatAccel {
4949
std::vector<session_ptr> sessions;
5050
std::mutex mutex;
5151
std::string alg_name;
52+
int windowBits;
5253
};
5354

5455
#endif

src/compressor/zlib/ZlibCompressor.cc

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ _prefix(std::ostream* _dout)
4949

5050
// default window size for Zlib 1.2.8, negated for raw deflate
5151
#define ZLIB_DEFAULT_WIN_SIZE -15
52+
#define GZIP_WRAPPER 16
5253

5354
// desired memory usage level. increasing to 9 doesn't speed things up
5455
// significantly (helps only on >=16K blocks) and sometimes degrades
@@ -205,8 +206,8 @@ int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, std::optiona
205206
int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, std::optional<int32_t> compressor_message)
206207
{
207208
#ifdef HAVE_QATZIP
208-
// QAT can only decompress with the default window size
209-
if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE))
209+
// QAT can only decompress with existing header, only for 'QZ_DEFLATE_GZIP_EXT'
210+
if (qat_enabled && compressor_message.has_value() && *compressor_message == GZIP_WRAPPER + MAX_WBITS)
210211
return qat_accel.decompress(p, compressed_size, out, compressor_message);
211212
#endif
212213

@@ -215,6 +216,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
215216
z_stream strm;
216217
const char* c_in;
217218
int begin = 1;
219+
bool multisteam = false;
218220

219221
/* allocate inflate state */
220222
strm.zalloc = Z_NULL;
@@ -226,6 +228,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
226228
// choose the variation of compressor
227229
if (!compressor_message)
228230
compressor_message = ZLIB_DEFAULT_WIN_SIZE;
231+
229232
ret = inflateInit2(&strm, *compressor_message);
230233
if (ret != Z_OK) {
231234
dout(1) << "Decompression init error: init return "
@@ -255,7 +258,10 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
255258
}
256259
have = MAX_LEN - strm.avail_out;
257260
out.append(ptr, 0, have);
258-
} while (strm.avail_out == 0);
261+
// There may be mutil stream to decompress
262+
multisteam = (strm.avail_in != 0 && ret == Z_STREAM_END);
263+
if (multisteam) inflateReset(&strm);
264+
} while (strm.avail_out == 0 || multisteam);
259265
}
260266

261267
/* clean up and return */

0 commit comments

Comments
 (0)