Skip to content

Commit f6c11ec

Browse files
authored
Merge pull request ceph#58046 from hualongfeng/wip_qat_zlib_data_format
Compressor: Add data format(QZ_DEFLATE_GZIP_EXT) for QAT Zlib Reviewed-by: Casey Bodley <[email protected]> Reviewed-by: Mark Kogan <[email protected]>
2 parents b32fadf + 8a20dff commit f6c11ec

File tree

9 files changed

+82
-33
lines changed

9 files changed

+82
-33
lines changed

src/common/options/global.yaml.in

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,11 @@ options:
790790
level: advanced
791791
desc: Set the maximum number of session within Qatzip when using QAT compressor
792792
default: 256
793+
- name: qat_compressor_busy_polling
794+
type: bool
795+
level: advanced
796+
desc: Set QAT busy bolling to reduce latency at the cost of potentially increasing CPU usage
797+
default: false
793798
- name: plugin_crypto_accelerator
794799
type: str
795800
level: advanced

src/compressor/QatAccel.cc

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "common/dout.h"
2020
#include "common/errno.h"
2121
#include "QatAccel.h"
22+
#include "zlib.h"
2223

2324
// -----------------------------------------------------------------------------
2425
#define dout_context g_ceph_context
@@ -33,6 +34,7 @@ static std::ostream& _prefix(std::ostream* _dout)
3334
// -----------------------------------------------------------------------------
3435
// default window size for Zlib 1.2.8, negated for raw deflate
3536
#define ZLIB_DEFAULT_WIN_SIZE -15
37+
#define GZIP_WRAPPER 16
3638

3739
/* Estimate data expansion after decompression */
3840
static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000};
@@ -42,6 +44,10 @@ void QzSessionDeleter::operator() (struct QzSession_S *session) {
4244
delete session;
4345
}
4446

47+
QzPollingMode_T busy_polling(bool isSet) {
48+
return isSet ? QZ_BUSY_POLLING : QZ_PERIODICAL_POLLING;
49+
}
50+
4551
static bool setup_session(const std::string &alg, QatAccel::session_ptr &session) {
4652
int rc;
4753
rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT);
@@ -52,10 +58,12 @@ static bool setup_session(const std::string &alg, QatAccel::session_ptr &session
5258
rc = qzGetDefaultsDeflate(&params);
5359
if (rc != QZ_OK)
5460
return false;
55-
params.data_fmt = QZ_DEFLATE_RAW;
61+
62+
params.data_fmt = QZ_DEFLATE_GZIP_EXT;
5663
params.common_params.comp_algorithm = QZ_DEFLATE;
5764
params.common_params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level;
5865
params.common_params.direction = QZ_DIR_BOTH;
66+
params.common_params.polling_mode = busy_polling(g_ceph_context->_conf.get_val<bool>("qat_compressor_busy_polling"));
5967
rc = qzSetupSessionDeflate(session.get(), &params);
6068
if (rc != QZ_OK)
6169
return false;
@@ -136,6 +144,8 @@ bool QatAccel::init(const std::string &alg) {
136144
}
137145

138146
alg_name = alg;
147+
windowBits = GZIP_WRAPPER + MAX_WBITS;
148+
139149
return true;
140150
}
141151

@@ -145,7 +155,8 @@ int QatAccel::compress(const bufferlist &in, bufferlist &out, std::optional<int3
145155
return -1; // session initialization failed
146156
}
147157
auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
148-
compressor_message = ZLIB_DEFAULT_WIN_SIZE;
158+
compressor_message = windowBits;
159+
149160
int begin = 1;
150161
for (auto &i : in.buffers()) {
151162
const unsigned char* c_in = (unsigned char*) i.c_str();
@@ -188,28 +199,31 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
188199

189200
int rc = 0;
190201
bufferlist tmp;
191-
size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len);
192-
193-
while (remaining) {
194-
unsigned int ratio_idx = 0;
195-
const char* c_in = nullptr;
196-
unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
197-
remaining -= len;
198-
len -= begin;
199-
c_in += begin;
200-
begin = 0;
202+
unsigned int ratio_idx = 0;
203+
const char* c_in = nullptr;
204+
p.copy_all(tmp);
205+
c_in = tmp.c_str();
206+
unsigned int len = std::min<unsigned int>(tmp.length(), compressed_len);
207+
208+
len -= begin;
209+
c_in += begin;
210+
begin = 0;
211+
212+
bufferptr ptr;
213+
do {
201214
unsigned int out_len = QZ_HW_BUFF_SZ;
202-
203-
bufferptr ptr;
215+
unsigned int len_current = len;
204216
do {
205-
while (out_len <= len * expansion_ratio[ratio_idx]) {
217+
while (out_len <= len_current * expansion_ratio[ratio_idx]) {
206218
out_len *= 2;
207219
}
208220

209221
ptr = buffer::create_small_page_aligned(out_len);
210-
rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len);
222+
rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len_current, (unsigned char*)ptr.c_str(), &out_len);
211223
ratio_idx++;
212224
} while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio));
225+
c_in += len_current;
226+
len -= len_current;
213227

214228
if (rc == QZ_OK) {
215229
dst.append(ptr, 0, out_len);
@@ -223,7 +237,7 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
223237
dout(1) << "QAT compressor NOT OK" << dendl;
224238
return -1;
225239
}
226-
}
227240

241+
} while (len != 0);
228242
return 0;
229243
}

src/compressor/QatAccel.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class QatAccel {
4949
std::vector<session_ptr> sessions;
5050
std::mutex mutex;
5151
std::string alg_name;
52+
int windowBits;
5253
};
5354

5455
#endif

src/compressor/zlib/ZlibCompressor.cc

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ _prefix(std::ostream* _dout)
4949

5050
// default window size for Zlib 1.2.8, negated for raw deflate
5151
#define ZLIB_DEFAULT_WIN_SIZE -15
52+
#define GZIP_WRAPPER 16
5253

5354
// desired memory usage level. increasing to 9 doesn't speed things up
5455
// significantly (helps only on >=16K blocks) and sometimes degrades
@@ -205,8 +206,8 @@ int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, std::optiona
205206
int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, std::optional<int32_t> compressor_message)
206207
{
207208
#ifdef HAVE_QATZIP
208-
// QAT can only decompress with the default window size
209-
if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE))
209+
// QAT can only decompress with existing header, only for 'QZ_DEFLATE_GZIP_EXT'
210+
if (qat_enabled && compressor_message.has_value() && *compressor_message == GZIP_WRAPPER + MAX_WBITS)
210211
return qat_accel.decompress(p, compressed_size, out, compressor_message);
211212
#endif
212213

@@ -215,6 +216,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
215216
z_stream strm;
216217
const char* c_in;
217218
int begin = 1;
219+
bool multisteam = false;
218220

219221
/* allocate inflate state */
220222
strm.zalloc = Z_NULL;
@@ -226,6 +228,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
226228
// choose the variation of compressor
227229
if (!compressor_message)
228230
compressor_message = ZLIB_DEFAULT_WIN_SIZE;
231+
229232
ret = inflateInit2(&strm, *compressor_message);
230233
if (ret != Z_OK) {
231234
dout(1) << "Decompression init error: init return "
@@ -255,7 +258,10 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
255258
}
256259
have = MAX_LEN - strm.avail_out;
257260
out.append(ptr, 0, have);
258-
} while (strm.avail_out == 0);
261+
// There may be mutil stream to decompress
262+
multisteam = (strm.avail_in != 0 && ret == Z_STREAM_END);
263+
if (multisteam) inflateReset(&strm);
264+
} while (strm.avail_out == 0 || multisteam);
259265
}
260266

261267
/* clean up and return */

src/rgw/driver/daos/rgw_sal_daos.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1761,11 +1761,14 @@ int DaosMultipartUpload::complete(
17611761
bool part_compressed = (obj_part.cs_info.compression_type != "none");
17621762
if ((handled_parts > 0) &&
17631763
((part_compressed != compressed) ||
1764-
(cs_info.compression_type != obj_part.cs_info.compression_type))) {
1764+
(cs_info.compression_type != obj_part.cs_info.compression_type) ||
1765+
(cs_info.compressor_message.has_value() &&
1766+
(cs_info.compressor_message != obj_part.cs_info.compressor_message)))) {
17651767
ldpp_dout(dpp, 0)
1766-
<< "ERROR: compression type was changed during multipart upload ("
1767-
<< cs_info.compression_type << ">>"
1768-
<< obj_part.cs_info.compression_type << ")" << dendl;
1768+
<< "ERROR: compression type or compressor message was changed during multipart upload ("
1769+
<< cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << "),"
1770+
<< cs_info.compressor_message << ">>" << obj_part.cs_info.compressor_message << ") "
1771+
<< dendl;
17691772
ret = -ERR_INVALID_PART;
17701773
return ret;
17711774
}
@@ -1786,8 +1789,11 @@ int DaosMultipartUpload::complete(
17861789
cs_info.blocks.push_back(cb);
17871790
new_ofs = cb.new_ofs + cb.len;
17881791
}
1789-
if (!compressed)
1792+
if (!compressed) {
17901793
cs_info.compression_type = obj_part.cs_info.compression_type;
1794+
if (obj_part.cs_info.compressor_message.has_value())
1795+
cs_info.compressor_message = obj_part.cs_info.compressor_message;
1796+
}
17911797
cs_info.orig_size += obj_part.cs_info.orig_size;
17921798
compressed = true;
17931799
}

src/rgw/driver/motr/rgw_sal_motr.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2762,9 +2762,13 @@ int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp,
27622762
bool part_compressed = (part->cs_info.compression_type != "none");
27632763
if ((handled_parts > 0) &&
27642764
((part_compressed != compressed) ||
2765-
(cs_info.compression_type != part->cs_info.compression_type))) {
2766-
ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload ("
2767-
<< cs_info.compression_type << ">>" << part->cs_info.compression_type << ")" << dendl;
2765+
(cs_info.compression_type != obj_part.cs_info.compression_type) ||
2766+
(cs_info.compressor_message.has_value() &&
2767+
(cs_info.compressor_message != obj_part.cs_info.compressor_message)))) {
2768+
ldpp_dout(dpp, 0) << "ERROR: compression type or compressor message was changed during multipart upload ("
2769+
<< cs_info.compression_type << ">>" << part->cs_info.compression_type << "),"
2770+
<< cs_info.compressor_message << ">>" << obj_part.cs_info.compressor_message << ")"
2771+
<< dendl;
27682772
rc = -ERR_INVALID_PART;
27692773
return rc;
27702774
}
@@ -2784,8 +2788,11 @@ int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp,
27842788
cs_info.blocks.push_back(cb);
27852789
new_ofs = cb.new_ofs + cb.len;
27862790
}
2787-
if (!compressed)
2791+
if (!compressed) {
27882792
cs_info.compression_type = part->cs_info.compression_type;
2793+
if (obj_part.cs_info.compressor_message.has_value())
2794+
cs_info.compressor_message = obj_part.cs_info.compressor_message;
2795+
}
27892796
cs_info.orig_size += part->cs_info.orig_size;
27902797
compressed = true;
27912798
}

src/rgw/driver/rados/rgw_sal_rados.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3357,9 +3357,13 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp,
33573357
bool part_compressed = (obj_part.cs_info.compression_type != "none");
33583358
if ((handled_parts > 0) &&
33593359
((part_compressed != compressed) ||
3360-
(cs_info.compression_type != obj_part.cs_info.compression_type))) {
3361-
ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload ("
3362-
<< cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl;
3360+
(cs_info.compression_type != obj_part.cs_info.compression_type) ||
3361+
(cs_info.compressor_message.has_value() &&
3362+
(cs_info.compressor_message != obj_part.cs_info.compressor_message)))) {
3363+
ldpp_dout(dpp, 0) << "ERROR: compression type or compressor message was changed during multipart upload ("
3364+
<< cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << "), "
3365+
<< cs_info.compressor_message << ">>" << obj_part.cs_info.compressor_message << ") "
3366+
<< dendl;
33633367
ret = -ERR_INVALID_PART;
33643368
return ret;
33653369
}
@@ -3378,8 +3382,11 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp,
33783382
cs_info.blocks.push_back(cb);
33793383
new_ofs = cb.new_ofs + cb.len;
33803384
}
3381-
if (!compressed)
3385+
if (!compressed) {
33823386
cs_info.compression_type = obj_part.cs_info.compression_type;
3387+
if (obj_part.cs_info.compressor_message.has_value())
3388+
cs_info.compressor_message = obj_part.cs_info.compressor_message;
3389+
}
33833390
cs_info.orig_size += obj_part.cs_info.orig_size;
33843391
compressed = true;
33853392
}

src/rgw/rgw_file.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1967,12 +1967,14 @@ namespace rgw {
19671967
RGWCompressionInfo cs_info;
19681968
cs_info.compression_type = plugin->get_type_name();
19691969
cs_info.orig_size = state->obj_size;
1970+
cs_info.compressor_message = compressor->get_compressor_message();
19701971
cs_info.blocks = std::move(compressor->get_compression_blocks());
19711972
encode(cs_info, tmp);
19721973
attrs[RGW_ATTR_COMPRESSION] = tmp;
19731974
ldpp_dout(this, 20) << "storing " << RGW_ATTR_COMPRESSION
19741975
<< " with type=" << cs_info.compression_type
19751976
<< ", orig_size=" << cs_info.orig_size
1977+
<< ", compressor_message=" << cs_info.compressor_message
19761978
<< ", blocks=" << cs_info.blocks.size() << dendl;
19771979
}
19781980

src/rgw/rgw_op.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4469,6 +4469,7 @@ void RGWPutObj::execute(optional_yield y)
44694469
ldpp_dout(this, 20) << "storing " << RGW_ATTR_COMPRESSION
44704470
<< " with type=" << cs_info.compression_type
44714471
<< ", orig_size=" << cs_info.orig_size
4472+
<< ", compressor_message=" << cs_info.compressor_message
44724473
<< ", blocks=" << cs_info.blocks.size() << dendl;
44734474
}
44744475
if (torrent) {

0 commit comments

Comments
 (0)