Skip to content

Commit 5d25607

Browse files
committed
compressor: move QatAccel out of common
move the QatAccel instance out of the Compressor base class and into the zlib and lz4 compressors that can use it this avoids having to link QAT into the ceph-common library, and only the plugins where it's necessary had to add LZ4Compressor.cc to store the new static variable Signed-off-by: Casey Bodley <[email protected]>
1 parent ec2e4b6 commit 5d25607

File tree

11 files changed

+200
-162
lines changed

11 files changed

+200
-162
lines changed

src/CMakeLists.txt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -505,11 +505,6 @@ if(NOT WITH_SYSTEM_BOOST)
505505
list(APPEND ceph_common_deps ${ZLIB_LIBRARIES})
506506
endif()
507507

508-
if(HAVE_QATZIP)
509-
# TODO: only the compression plugins should depend on QAT
510-
list(APPEND ceph_common_deps QAT::zip)
511-
endif()
512-
513508
if(WITH_DPDK)
514509
list(APPEND ceph_common_deps common_async_dpdk)
515510
endif()

src/compressor/CMakeLists.txt

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
1-
2-
set(compressor_srcs
3-
Compressor.cc)
4-
if (HAVE_QATZIP)
5-
list(APPEND compressor_srcs QatAccel.cc)
6-
endif()
7-
add_library(compressor_objs OBJECT ${compressor_srcs})
1+
add_library(compressor_objs OBJECT Compressor.cc)
82
add_dependencies(compressor_objs common-objs)
3+
add_dependencies(compressor_objs legacy-option-headers)
4+
95
if(HAVE_QATZIP AND HAVE_QAT)
10-
target_link_libraries(compressor_objs PRIVATE
6+
add_library(qat_compressor OBJECT QatAccel.cc)
7+
target_link_libraries(qat_compressor PUBLIC
118
QAT::qat
129
QAT::usdm
1310
QAT::zip
1411
)
1512
endif()
16-
add_dependencies(compressor_objs legacy-option-headers)
1713

1814
## compressor plugins
1915

@@ -31,8 +27,8 @@ if(HAVE_BROTLI)
3127
add_subdirectory(brotli)
3228
endif()
3329

34-
add_library(compressor STATIC $<TARGET_OBJECTS:compressor_objs>)
35-
target_link_libraries(compressor PRIVATE compressor_objs)
30+
add_library(compressor STATIC)
31+
target_link_libraries(compressor PUBLIC compressor_objs)
3632

3733
set(ceph_compressor_libs
3834
ceph_snappy

src/compressor/Compressor.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@
2626

2727
namespace TOPNSPC {
2828

29-
#ifdef HAVE_QATZIP
30-
QatAccel Compressor::qat_accel;
31-
#endif
32-
3329
const char* Compressor::get_comp_alg_name(int a) {
3430

3531
auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms),

src/compressor/Compressor.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
#include "include/common_fwd.h"
2424
#include "include/buffer.h"
2525
#include "include/int_types.h"
26-
#ifdef HAVE_QATZIP
27-
#include "QatAccel.h"
28-
#endif
2926

3027
namespace TOPNSPC {
3128

@@ -70,11 +67,6 @@ class Compressor {
7067
COMP_FORCE ///< compress always
7168
};
7269

73-
#ifdef HAVE_QATZIP
74-
bool qat_enabled;
75-
static QatAccel qat_accel;
76-
#endif
77-
7870
static const char* get_comp_alg_name(int a);
7971
static std::optional<CompressionAlgorithm> get_comp_alg_type(std::string_view s);
8072

src/compressor/lz4/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22

33
set(lz4_sources
44
CompressionPluginLZ4.cc
5+
LZ4Compressor.cc
56
)
67

78
add_library(ceph_lz4 SHARED ${lz4_sources})
89
target_link_libraries(ceph_lz4
910
PRIVATE LZ4::LZ4 compressor $<$<PLATFORM_ID:Windows>:ceph-common>)
11+
if(HAVE_QATZIP AND HAVE_QAT)
12+
target_link_libraries(ceph_lz4 PRIVATE qat_compressor)
13+
endif()
1014
set_target_properties(ceph_lz4 PROPERTIES
1115
VERSION 2.0.0
1216
SOVERSION 2
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* Copyright contributors to the Ceph project
7+
*
8+
* This is free software; you can redistribute it and/or
9+
* modify it under the terms of the GNU Lesser General Public
10+
* License version 2.1, as published by the Free Software
11+
* Foundation. See file COPYING.
12+
*
13+
*/
14+
15+
#include "LZ4Compressor.h"
16+
#include "common/ceph_context.h"
17+
#ifdef HAVE_QATZIP
18+
#include "compressor/QatAccel.h"
19+
#endif
20+
21+
#ifdef HAVE_QATZIP
22+
QatAccel LZ4Compressor::qat_accel;
23+
#endif
24+
25+
LZ4Compressor::LZ4Compressor(CephContext* cct)
26+
: Compressor(COMP_ALG_LZ4, "lz4")
27+
{
28+
#ifdef HAVE_QATZIP
29+
if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
30+
qat_enabled = true;
31+
else
32+
qat_enabled = false;
33+
#endif
34+
}
35+
36+
int LZ4Compressor::compress(const ceph::buffer::list &src,
37+
ceph::buffer::list &dst,
38+
std::optional<int32_t> &compressor_message)
39+
{
40+
// older versions of liblz4 introduce bit errors when compressing
41+
// fragmented buffers. this was fixed in lz4 commit
42+
// af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first
43+
// appeared in v1.8.2.
44+
//
45+
// workaround: rebuild if not contiguous.
46+
if (!src.is_contiguous()) {
47+
ceph::buffer::list new_src = src;
48+
new_src.rebuild();
49+
return compress(new_src, dst, compressor_message);
50+
}
51+
52+
#ifdef HAVE_QATZIP
53+
if (qat_enabled)
54+
return qat_accel.compress(src, dst, compressor_message);
55+
#endif
56+
ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(
57+
LZ4_compressBound(src.length()));
58+
LZ4_stream_t lz4_stream;
59+
LZ4_resetStream(&lz4_stream);
60+
61+
using ceph::encode;
62+
63+
auto p = src.begin();
64+
size_t left = src.length();
65+
int pos = 0;
66+
const char *data;
67+
unsigned num = src.get_num_buffers();
68+
encode((uint32_t)num, dst);
69+
while (left) {
70+
uint32_t origin_len = p.get_ptr_and_advance(left, &data);
71+
int compressed_len = LZ4_compress_fast_continue(
72+
&lz4_stream, data, outptr.c_str()+pos, origin_len,
73+
outptr.length()-pos, 1);
74+
if (compressed_len <= 0)
75+
return -1;
76+
pos += compressed_len;
77+
left -= origin_len;
78+
encode(origin_len, dst);
79+
encode((uint32_t)compressed_len, dst);
80+
}
81+
ceph_assert(p.end());
82+
83+
dst.append(outptr, 0, pos);
84+
return 0;
85+
}
86+
87+
int LZ4Compressor::decompress(const ceph::buffer::list &src,
88+
ceph::buffer::list &dst,
89+
std::optional<int32_t> compressor_message)
90+
{
91+
#ifdef HAVE_QATZIP
92+
if (qat_enabled)
93+
return qat_accel.decompress(src, dst, compressor_message);
94+
#endif
95+
auto i = std::cbegin(src);
96+
return decompress(i, src.length(), dst, compressor_message);
97+
}
98+
99+
int LZ4Compressor::decompress(ceph::buffer::list::const_iterator &p,
100+
size_t compressed_len,
101+
ceph::buffer::list &dst,
102+
std::optional<int32_t> compressor_message)
103+
{
104+
#ifdef HAVE_QATZIP
105+
if (qat_enabled)
106+
return qat_accel.decompress(p, compressed_len, dst, compressor_message);
107+
#endif
108+
using ceph::decode;
109+
uint32_t count;
110+
decode(count, p);
111+
std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs(count);
112+
uint32_t total_origin = 0;
113+
for (auto& [dst_size, src_size] : compressed_pairs) {
114+
decode(dst_size, p);
115+
decode(src_size, p);
116+
total_origin += dst_size;
117+
}
118+
compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2);
119+
120+
ceph::buffer::ptr dstptr(total_origin);
121+
LZ4_streamDecode_t lz4_stream_decode;
122+
LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0);
123+
124+
ceph::buffer::ptr cur_ptr = p.get_current_ptr();
125+
ceph::buffer::ptr *ptr = &cur_ptr;
126+
std::optional<ceph::buffer::ptr> data_holder;
127+
if (compressed_len != cur_ptr.length()) {
128+
data_holder.emplace(compressed_len);
129+
p.copy_deep(compressed_len, *data_holder);
130+
ptr = &*data_holder;
131+
}
132+
133+
char *c_in = ptr->c_str();
134+
char *c_out = dstptr.c_str();
135+
for (unsigned i = 0; i < count; ++i) {
136+
int r = LZ4_decompress_safe_continue(
137+
&lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first);
138+
if (r == (int)compressed_pairs[i].first) {
139+
c_in += compressed_pairs[i].second;
140+
c_out += compressed_pairs[i].first;
141+
} else if (r < 0) {
142+
return -1;
143+
} else {
144+
return -2;
145+
}
146+
}
147+
dst.push_back(std::move(dstptr));
148+
return 0;
149+
}

src/compressor/lz4/LZ4Compressor.h

Lines changed: 12 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -23,125 +23,29 @@
2323
#include "include/encoding.h"
2424
#include "common/config.h"
2525

26+
class QatAccel;
2627

2728
class LZ4Compressor : public Compressor {
28-
public:
29-
LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") {
3029
#ifdef HAVE_QATZIP
31-
if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
32-
qat_enabled = true;
33-
else
34-
qat_enabled = false;
30+
bool qat_enabled;
31+
static QatAccel qat_accel;
3532
#endif
36-
}
37-
38-
int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> &compressor_message) override {
39-
// older versions of liblz4 introduce bit errors when compressing
40-
// fragmented buffers. this was fixed in lz4 commit
41-
// af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first
42-
// appeared in v1.8.2.
43-
//
44-
// workaround: rebuild if not contiguous.
45-
if (!src.is_contiguous()) {
46-
ceph::buffer::list new_src = src;
47-
new_src.rebuild();
48-
return compress(new_src, dst, compressor_message);
49-
}
5033

51-
#ifdef HAVE_QATZIP
52-
if (qat_enabled)
53-
return qat_accel.compress(src, dst, compressor_message);
54-
#endif
55-
ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(
56-
LZ4_compressBound(src.length()));
57-
LZ4_stream_t lz4_stream;
58-
LZ4_resetStream(&lz4_stream);
59-
60-
using ceph::encode;
61-
62-
auto p = src.begin();
63-
size_t left = src.length();
64-
int pos = 0;
65-
const char *data;
66-
unsigned num = src.get_num_buffers();
67-
encode((uint32_t)num, dst);
68-
while (left) {
69-
uint32_t origin_len = p.get_ptr_and_advance(left, &data);
70-
int compressed_len = LZ4_compress_fast_continue(
71-
&lz4_stream, data, outptr.c_str()+pos, origin_len,
72-
outptr.length()-pos, 1);
73-
if (compressed_len <= 0)
74-
return -1;
75-
pos += compressed_len;
76-
left -= origin_len;
77-
encode(origin_len, dst);
78-
encode((uint32_t)compressed_len, dst);
79-
}
80-
ceph_assert(p.end());
34+
public:
35+
explicit LZ4Compressor(CephContext* cct);
8136

82-
dst.append(outptr, 0, pos);
83-
return 0;
84-
}
37+
int compress(const ceph::buffer::list &src,
38+
ceph::buffer::list &dst,
39+
std::optional<int32_t> &compressor_message) override;
8540

86-
int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> compressor_message) override {
87-
#ifdef HAVE_QATZIP
88-
if (qat_enabled)
89-
return qat_accel.decompress(src, dst, compressor_message);
90-
#endif
91-
auto i = std::cbegin(src);
92-
return decompress(i, src.length(), dst, compressor_message);
93-
}
41+
int decompress(const ceph::buffer::list &src,
42+
ceph::buffer::list &dst,
43+
std::optional<int32_t> compressor_message) override;
9444

9545
int decompress(ceph::buffer::list::const_iterator &p,
9646
size_t compressed_len,
9747
ceph::buffer::list &dst,
98-
std::optional<int32_t> compressor_message) override {
99-
#ifdef HAVE_QATZIP
100-
if (qat_enabled)
101-
return qat_accel.decompress(p, compressed_len, dst, compressor_message);
102-
#endif
103-
using ceph::decode;
104-
uint32_t count;
105-
decode(count, p);
106-
std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs(count);
107-
uint32_t total_origin = 0;
108-
for (auto& [dst_size, src_size] : compressed_pairs) {
109-
decode(dst_size, p);
110-
decode(src_size, p);
111-
total_origin += dst_size;
112-
}
113-
compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2);
114-
115-
ceph::buffer::ptr dstptr(total_origin);
116-
LZ4_streamDecode_t lz4_stream_decode;
117-
LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0);
118-
119-
ceph::buffer::ptr cur_ptr = p.get_current_ptr();
120-
ceph::buffer::ptr *ptr = &cur_ptr;
121-
std::optional<ceph::buffer::ptr> data_holder;
122-
if (compressed_len != cur_ptr.length()) {
123-
data_holder.emplace(compressed_len);
124-
p.copy_deep(compressed_len, *data_holder);
125-
ptr = &*data_holder;
126-
}
127-
128-
char *c_in = ptr->c_str();
129-
char *c_out = dstptr.c_str();
130-
for (unsigned i = 0; i < count; ++i) {
131-
int r = LZ4_decompress_safe_continue(
132-
&lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first);
133-
if (r == (int)compressed_pairs[i].first) {
134-
c_in += compressed_pairs[i].second;
135-
c_out += compressed_pairs[i].first;
136-
} else if (r < 0) {
137-
return -1;
138-
} else {
139-
return -2;
140-
}
141-
}
142-
dst.push_back(std::move(dstptr));
143-
return 0;
144-
}
48+
std::optional<int32_t> compressor_message) override;
14549
};
14650

14751
#endif

0 commit comments

Comments
 (0)