Skip to content

Commit f6b9454

Browse files
Merge pull request ceph#61364 from ifed01/wip-ifed-no-compression-plugin-reload
os/bluestore: do cache locally compressor engines ever used. Reviewed-by: Adam Kupczyk <[email protected]>
2 parents cca243b + 1c1acfa commit f6b9454

File tree

3 files changed

+133
-102
lines changed

3 files changed

+133
-102
lines changed

src/os/bluestore/BlueStore.cc

Lines changed: 121 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -5894,8 +5894,6 @@ void BlueStore::_set_compression()
58945894
_set_compression_alert(true, s.c_str());
58955895
}
58965896

5897-
compressor = nullptr;
5898-
58995897
if (cct->_conf->bluestore_compression_min_blob_size) {
59005898
comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size;
59015899
} else {
@@ -5917,19 +5915,36 @@ void BlueStore::_set_compression()
59175915
comp_max_blob_size = cct->_conf->bluestore_compression_max_blob_size_ssd;
59185916
}
59195917
}
5920-
5921-
auto& alg_name = cct->_conf->bluestore_compression_algorithm;
5922-
if (!alg_name.empty()) {
5923-
compressor = Compressor::create(cct, alg_name);
5924-
if (!compressor) {
5918+
if (compressors.size() == 0) {
5919+
compressors.resize(Compressor::COMP_ALG_LAST);
5920+
compressors[Compressor::COMP_ALG_NONE] = nullptr;
5921+
int alg = Compressor::COMP_ALG_NONE + 1;
5922+
while (alg < Compressor::COMP_ALG_LAST) {
5923+
compressors[alg] = Compressor::create(cct, alg);
5924+
++alg;
5925+
}
5926+
}
5927+
auto alg_name = cct->_conf->bluestore_compression_algorithm;
5928+
CompressorRef c =
5929+
!alg_name.empty() ? Compressor::create(cct, alg_name) : CompressorRef();
5930+
if (c) {
5931+
ceph_assert(c->get_type() < int(compressors.size()));
5932+
def_compressor_alg = c->get_type();
5933+
alg_name = c->get_type_name(); // let's use actual resulting alg name
5934+
} else {
5935+
if (!alg_name.empty()) {
59255936
derr << __func__ << " unable to initialize " << alg_name.c_str() << " compressor"
59265937
<< dendl;
59275938
_set_compression_alert(false, alg_name.c_str());
5939+
} else {
5940+
_clear_compression_alert();
59285941
}
5942+
def_compressor_alg = Compressor::COMP_ALG_NONE;
5943+
alg_name = "(none)";
59295944
}
5930-
5945+
59315946
dout(10) << __func__ << " mode " << Compressor::get_comp_mode_name(comp_mode)
5932-
<< " alg " << (compressor ? compressor->get_type_name() : "(none)")
5947+
<< " alg " << alg_name
59335948
<< " min_blob " << comp_min_blob_size
59345949
<< " max_blob " << comp_max_blob_size
59355950
<< dendl;
@@ -12406,6 +12421,67 @@ int BlueStore::set_collection_opts(
1240612421
return -ENOENT;
1240712422
std::unique_lock l{c->lock};
1240812423
c->pool_opts = opts;
12424+
12425+
string val;
12426+
c->compression_algorithm.reset();
12427+
if (c->pool_opts.get(pool_opts_t::COMPRESSION_ALGORITHM, &val)) {
12428+
auto alg = Compressor::get_comp_alg_type(val);
12429+
CompressorRef cp;
12430+
if (alg.has_value() && *alg != Compressor::COMP_ALG_NONE) {
12431+
cp = *alg < compressors.size() ? compressors[*alg] : cp;
12432+
if (!cp) {
12433+
_set_compression_alert(false, val.c_str());
12434+
derr << __func__ << " unable to load compressor plugin for " << val.c_str()
12435+
<< dendl;
12436+
} else {
12437+
ceph_assert(cp->get_type() == *alg);
12438+
c->compression_algorithm = cp->get_type();
12439+
}
12440+
}
12441+
}
12442+
c->compression_mode.reset();
12443+
if (c->pool_opts.get(pool_opts_t::COMPRESSION_MODE, &val)) {
12444+
auto cm = Compressor::get_comp_mode_type(val);
12445+
if (!cm) {
12446+
derr << __func__ << " unrecognized compression mode: " << val.c_str()
12447+
<< dendl;
12448+
} else {
12449+
c->compression_mode = cm;
12450+
}
12451+
}
12452+
int64_t ival;
12453+
c->csum_type.reset();
12454+
if (c->pool_opts.get(pool_opts_t::CSUM_TYPE, &ival)) {
12455+
if (ival >= Checksummer::CSUM_MAX) {
12456+
derr << __func__ << " unrecognized checksum type: " << ival
12457+
<< dendl;
12458+
} else {
12459+
c->csum_type = Checksummer::CSumType(ival);
12460+
}
12461+
}
12462+
c->comp_min_blob_size.reset();
12463+
if (c->pool_opts.get(pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, &ival)) {
12464+
if (ival <= 0) {
12465+
derr << __func__ << " invalid min compression blob size: " << ival
12466+
<< dendl;
12467+
} else {
12468+
c->comp_min_blob_size = ival;
12469+
}
12470+
}
12471+
c->comp_max_blob_size.reset();
12472+
if (c->pool_opts.get(pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, &ival)) {
12473+
if (ival <= 0) {
12474+
derr << __func__ << " invalid max compression blob size: " << ival
12475+
<< dendl;
12476+
} else {
12477+
c->comp_max_blob_size = ival;
12478+
}
12479+
}
12480+
double dval;
12481+
c->compression_req_ratio.reset();
12482+
if (c->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &dval)) {
12483+
c->compression_req_ratio = dval;
12484+
}
1240912485
return 0;
1241012486
}
1241112487

@@ -12916,20 +12992,25 @@ int BlueStore::_decompress(bufferlist& source, bufferlist* result)
1291612992
bluestore_compression_header_t chdr;
1291712993
decode(chdr, i);
1291812994
int alg = int(chdr.type);
12919-
CompressorRef cp = compressor;
12920-
if (!cp || (int)cp->get_type() != alg) {
12921-
cp = Compressor::create(cct, alg);
12922-
}
12923-
12995+
CompressorRef cp =
12996+
alg < int(compressors.size()) ? compressors[alg] : CompressorRef();
1292412997
if (!cp.get()) {
12925-
// if compressor isn't available - error, because cannot return
12926-
// decompressed data?
12927-
12928-
const char* alg_name = Compressor::get_comp_alg_name(alg);
12929-
derr << __func__ << " can't load decompressor " << alg_name << dendl;
12930-
_set_compression_alert(false, alg_name);
12931-
r = -EIO;
12998+
if (alg != Compressor::COMP_ALG_NONE) {
12999+
// if compressor isn't available - error, because cannot return
13000+
// decompressed data?
13001+
const char* alg_name = Compressor::get_comp_alg_name(alg);
13002+
derr << __func__ << " can't locate compressor plugin for " << alg_name
13003+
<< dendl;
13004+
_set_compression_alert(false, alg_name);
13005+
r = -EIO;
13006+
} else {
13007+
dout(0) << __func__
13008+
<< " [warn] Compressed Blob has got no alg in the header "
13009+
<< dendl;
13010+
i.copy_all(*result);
13011+
}
1293213012
} else {
13013+
ceph_assert((int)cp->get_type() == alg);
1293313014
r = cp->decompress(i, chdr.length, *result, chdr.compressor_message);
1293413015
if (r < 0) {
1293513016
derr << __func__ << " decompression failed with exit code " << r << dendl;
@@ -16874,39 +16955,12 @@ int BlueStore::_do_alloc_write(
1687416955
CompressorRef c;
1687516956
double crr = 0;
1687616957
if (wctx->compress) {
16877-
c = select_option(
16878-
"compression_algorithm",
16879-
compressor,
16880-
[&]() {
16881-
string val;
16882-
if (coll->pool_opts.get(pool_opts_t::COMPRESSION_ALGORITHM, &val)) {
16883-
CompressorRef cp = compressor;
16884-
if (!cp || cp->get_type_name() != val) {
16885-
cp = Compressor::create(cct, val);
16886-
if (!cp) {
16887-
if (_set_compression_alert(false, val.c_str())) {
16888-
derr << __func__ << " unable to initialize " << val.c_str()
16889-
<< " compressor" << dendl;
16890-
}
16891-
}
16892-
}
16893-
return std::optional<CompressorRef>(cp);
16894-
}
16895-
return std::optional<CompressorRef>();
16896-
}
16897-
);
16898-
16899-
crr = select_option(
16900-
"compression_required_ratio",
16901-
cct->_conf->bluestore_compression_required_ratio,
16902-
[&]() {
16903-
double val;
16904-
if (coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) {
16905-
return std::optional<double>(val);
16906-
}
16907-
return std::optional<double>();
16908-
}
16909-
);
16958+
c = coll->compression_algorithm.has_value() ?
16959+
compressors[*(coll->compression_algorithm)]:
16960+
compressors[def_compressor_alg];
16961+
crr = coll->compression_req_ratio.has_value() ?
16962+
*(coll->compression_req_ratio) :
16963+
cct->_conf->bluestore_compression_required_ratio;
1691016964
}
1691116965

1691216966
// checksum
@@ -17315,34 +17369,16 @@ void BlueStore::_choose_write_options(
1731517369
wctx->csum_order = block_size_order;
1731617370

1731717371
// checksum
17318-
int64_t csum = csum_type.load();
17319-
csum = select_option(
17320-
"csum_type",
17321-
csum,
17322-
[&]() {
17323-
int64_t val;
17324-
if (c->pool_opts.get(pool_opts_t::CSUM_TYPE, &val)) {
17325-
return std::optional<int64_t>(val);
17326-
}
17327-
return std::optional<int64_t>();
17328-
}
17329-
);
17330-
wctx->csum_type = csum;
17372+
wctx->csum_type= c->csum_type.has_value() ?
17373+
*(c->csum_type):
17374+
csum_type.load();
1733117375

1733217376
// compression parameters
1733317377
unsigned alloc_hints = o->onode.alloc_hint_flags;
17334-
auto cm = select_option(
17335-
"compression_mode",
17336-
comp_mode.load(),
17337-
[&]() {
17338-
string val;
17339-
if (c->pool_opts.get(pool_opts_t::COMPRESSION_MODE, &val)) {
17340-
return std::optional<Compressor::CompressionMode>(
17341-
Compressor::get_comp_mode_type(val));
17342-
}
17343-
return std::optional<Compressor::CompressionMode>();
17344-
}
17345-
);
17378+
17379+
auto cm = c->compression_mode.has_value() ?
17380+
*(c->compression_mode) :
17381+
comp_mode.load();
1734617382

1734717383
wctx->compress = (cm != Compressor::COMP_NONE) &&
1734817384
((cm == Compressor::COMP_FORCE) ||
@@ -17367,31 +17403,15 @@ void BlueStore::_choose_write_options(
1736717403
}
1736817404

1736917405
if (wctx->compress) {
17370-
wctx->target_blob_size = select_option(
17371-
"compression_max_blob_size",
17372-
comp_max_blob_size.load(),
17373-
[&]() {
17374-
int64_t val;
17375-
if (c->pool_opts.get(pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, &val)) {
17376-
return std::optional<uint64_t>((uint64_t)val);
17377-
}
17378-
return std::optional<uint64_t>();
17379-
}
17380-
);
17406+
wctx->target_blob_size = c->comp_max_blob_size.has_value() ?
17407+
*(c->comp_max_blob_size):
17408+
comp_max_blob_size.load();
1738117409
}
1738217410
} else {
1738317411
if (wctx->compress) {
17384-
wctx->target_blob_size = select_option(
17385-
"compression_min_blob_size",
17386-
comp_min_blob_size.load(),
17387-
[&]() {
17388-
int64_t val;
17389-
if (c->pool_opts.get(pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, &val)) {
17390-
return std::optional<uint64_t>((uint64_t)val);
17391-
}
17392-
return std::optional<uint64_t>();
17393-
}
17394-
);
17412+
wctx->target_blob_size = c->comp_min_blob_size.has_value() ?
17413+
*(c->comp_min_blob_size):
17414+
comp_min_blob_size.load();
1739517415
}
1739617416
}
1739717417

src/os/bluestore/BlueStore.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,13 @@ class BlueStore : public ObjectStore,
16861686

16871687
//pool options
16881688
pool_opts_t pool_opts;
1689+
std::optional<int> compression_algorithm;
1690+
std::optional<int> compression_mode;
1691+
std::optional<int> csum_type;
1692+
std::optional<int64_t> comp_min_blob_size;
1693+
std::optional<int64_t> comp_max_blob_size;
1694+
std::optional<double> compression_req_ratio;
1695+
16891696
ContextQueue *commit_queue;
16901697

16911698
OnodeCacheShard* get_onode_cache() const {
@@ -2498,7 +2505,8 @@ class BlueStore : public ObjectStore,
24982505

24992506
std::atomic<Compressor::CompressionMode> comp_mode =
25002507
{Compressor::COMP_NONE}; ///< compression mode
2501-
CompressorRef compressor;
2508+
std::atomic<int> def_compressor_alg = {Compressor::COMP_ALG_NONE};
2509+
std::vector<CompressorRef> compressors;
25022510
std::atomic<uint64_t> comp_min_blob_size = {0};
25032511
std::atomic<uint64_t> comp_max_blob_size = {0};
25042512

src/test/objectstore/store_test.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,12 +1344,15 @@ void StoreTest::doCompressionTest()
13441344
EXPECT_EQ(store->mount(), 0);
13451345
ch = store->open_collection(cid);
13461346
{
1347+
C_SaferCond c;
13471348
ObjectStore::Transaction t;
13481349
t.remove(cid, hoid);
13491350
t.remove_collection(cid);
13501351
cerr << "Cleaning" << std::endl;
1352+
t.register_on_commit(&c);
13511353
r = queue_transaction(store, ch, std::move(t));
13521354
ASSERT_EQ(r, 0);
1355+
c.wait();
13531356
}
13541357
}
13551358

0 commit comments

Comments
 (0)