Skip to content

Commit 528813e

Browse files
committed
os/bluestore: Add do_write_v2_compressed()
Modify do_write_v2() to branch into do_write_v2_compressed(). Segmented and regular cases are recognized and handled properly. New do_write_v2_compressed() oversees compression / recompression. Make one Estimator per Collection. It makes possible for estimator to learn in collection specific compressibility. In write_v2_compressed use compressor already selected in choose_write_options. Make Collection create Estimator on first use. Signed-off-by: Adam Kupczyk <[email protected]>
1 parent 1ad8679 commit 528813e

File tree

4 files changed

+152
-23
lines changed

4 files changed

+152
-23
lines changed

src/os/bluestore/BlueStore.cc

Lines changed: 125 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <bit>
1616
#include <utility>
17+
#include <memory>
1718
#include <unistd.h>
1819
#include <stdlib.h>
1920
#include <sys/types.h>
@@ -12886,6 +12887,22 @@ int BlueStore::_do_read(
1288612887
return r;
1288712888
}
1288812889

12890+
void inline BlueStore::_do_read_and_pad(
12891+
Collection* c,
12892+
OnodeRef& o,
12893+
uint32_t offset,
12894+
uint32_t length,
12895+
ceph::buffer::list& bl)
12896+
{
12897+
int r = _do_read(c, o, offset, length, bl, 0);
12898+
ceph_assert(r >= 0 && r <= (int)length);
12899+
size_t zlen = length - r;
12900+
if (zlen > 0) {
12901+
bl.append_zero(zlen);
12902+
logger->inc(l_bluestore_write_pad_bytes, zlen);
12903+
}
12904+
}
12905+
1288912906
int BlueStore::_verify_csum(OnodeRef& o,
1289012907
const bluestore_blob_t* blob, uint64_t blob_xoffset,
1289112908
const bufferlist& bl,
@@ -17559,31 +17576,120 @@ int BlueStore::_do_write_v2(
1755917576
if (length == 0) {
1756017577
return 0;
1756117578
}
17562-
WriteContext wctx;
17563-
_choose_write_options(c, o, fadvise_flags, &wctx);
17564-
if (wctx.compress) {
17565-
// if we have compression, skip to write_v1
17566-
return _do_write(txc, c, o, offset, length, bl, fadvise_flags);
17567-
}
17568-
if (o->onode.segment_size != 0 && wctx.target_blob_size > o->onode.segment_size) {
17569-
wctx.target_blob_size = o->onode.segment_size;
17570-
}
17579+
1757117580
if (bl.length() != length) {
1757217581
bl.splice(length, bl.length() - length);
1757317582
}
17574-
BlueStore::Writer wr(this, txc, &wctx, o);
17575-
uint64_t start = p2align(offset, min_alloc_size);
17576-
uint64_t end = p2roundup(offset + length, min_alloc_size);
17577-
wr.left_affected_range = start;
17578-
wr.right_affected_range = end;
17579-
std::tie(wr.left_shard_bound, wr.right_shard_bound) =
17580-
o->extent_map.fault_range_ex(db, start, end - start);
17581-
wr.do_write(offset, bl);
17582-
o->extent_map.dirty_range(wr.left_affected_range, wr.right_affected_range - wr.left_affected_range);
17583-
o->extent_map.maybe_reshard(wr.left_affected_range, wr.right_affected_range);
17583+
17584+
WriteContext wctx;
17585+
_choose_write_options(c, o, fadvise_flags, &wctx);
17586+
if (wctx.compressor) {
17587+
uint32_t end = offset + length;
17588+
uint32_t segment_size = o->onode.segment_size;
17589+
if (segment_size) {
17590+
// split data into segments
17591+
// first and last segments will do lookaround scan
17592+
uint32_t write_offset = offset;
17593+
while (write_offset != end) {
17594+
uint32_t this_segment_begin = p2align(write_offset, segment_size);
17595+
uint32_t this_segment_end = this_segment_begin + segment_size;
17596+
uint32_t write_length = std::min(this_segment_end, end) - write_offset;
17597+
bufferlist chunk;
17598+
chunk.substr_of(bl, 0, write_length);
17599+
bl.splice(0, write_length);
17600+
_do_write_v2_compressed(txc, c, o, wctx, write_offset, write_length, chunk,
17601+
this_segment_begin, this_segment_end);
17602+
write_offset += write_length;
17603+
};
17604+
} else {
17605+
const uint32_t scan_range = 0x20000; //128kB
17606+
uint32_t scan_left = offset < scan_range ? 0: offset - scan_range;
17607+
uint32_t scan_right = end + scan_range;
17608+
_do_write_v2_compressed(txc, c, o, wctx, offset, length, bl,
17609+
scan_left, scan_right);
17610+
}
17611+
} else {
17612+
// normal uncompressed path
17613+
BlueStore::Writer wr(this, txc, &wctx, o);
17614+
uint64_t start = p2align(offset, min_alloc_size);
17615+
uint64_t end = p2roundup(offset + length, min_alloc_size);
17616+
wr.left_affected_range = start;
17617+
wr.right_affected_range = end;
17618+
std::tie(wr.left_shard_bound, wr.right_shard_bound) =
17619+
o->extent_map.fault_range_ex(db, start, end - start);
17620+
wr.do_write(offset, bl);
17621+
o->extent_map.dirty_range(wr.left_affected_range, wr.right_affected_range - wr.left_affected_range);
17622+
o->extent_map.maybe_reshard(wr.left_affected_range, wr.right_affected_range);
17623+
}
1758417624
return r;
1758517625
}
1758617626

17627+
int BlueStore::_do_write_v2_compressed(
17628+
TransContext *txc,
17629+
CollectionRef &c,
17630+
OnodeRef& o,
17631+
WriteContext& wctx,
17632+
uint32_t offset, uint32_t length,
17633+
ceph::buffer::list& input_bl,
17634+
uint32_t scan_left, uint32_t scan_right)
17635+
{
17636+
o->extent_map.fault_range(db, scan_left, scan_right - scan_left);
17637+
if (!c->estimator) c->estimator.reset(create_estimator());
17638+
Estimator* estimator = c->estimator.get();
17639+
Scanner scanner(this);
17640+
scanner.write_lookaround(o.get(), offset, length, scan_left, scan_right, estimator);
17641+
std::vector<Estimator::region_t> regions;
17642+
estimator->get_regions(regions);
17643+
dout(15) << __func__ << " " << std::hex << offset << "~" << length << " -> ";
17644+
for (const auto& i : regions) {
17645+
*_dout << i.offset << "~" << i.length << " ";
17646+
}
17647+
*_dout << std::dec << dendl;
17648+
for (const auto& i : regions) {
17649+
ceph::buffer::list data_bl;
17650+
if (i.offset <= offset && offset < i.offset + i.length) {
17651+
// the starting point is withing the region, so the end must too
17652+
ceph_assert(offset + length <= i.offset + i.length);
17653+
if (i.offset < offset) {
17654+
_do_read_and_pad(c.get(), o, i.offset, offset - i.offset, data_bl);
17655+
}
17656+
data_bl.claim_append(input_bl);
17657+
if (offset + length < i.offset + i.length) {
17658+
ceph::buffer::list right_bl;
17659+
_do_read_and_pad(c.get(), o, offset + length,
17660+
i.offset + i.length - (offset + length), right_bl);
17661+
data_bl.claim_append(right_bl);
17662+
}
17663+
} else {
17664+
// the starting point is not within region, so the end is not allowed either
17665+
ceph_assert(offset + length < i.offset || offset + length >= i.offset + i.length);
17666+
_do_read_and_pad(c.get(), o, i.offset, i.length, data_bl);
17667+
}
17668+
ceph_assert(data_bl.length() == i.length);
17669+
Writer::blob_vec bd;
17670+
int32_t disk_for_compressed;
17671+
int32_t disk_for_raw;
17672+
uint32_t au_size = min_alloc_size;
17673+
uint32_t max_blob_size = c->pool_opts.value_or(
17674+
pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, (int64_t)comp_max_blob_size.load());
17675+
disk_for_compressed = estimator->split_and_compress(wctx.compressor, max_blob_size, data_bl, bd);
17676+
disk_for_raw = p2roundup(i.offset + i.length, au_size) - p2align(i.offset, au_size);
17677+
BlueStore::Writer wr(this, txc, &wctx, o);
17678+
if (disk_for_compressed < disk_for_raw) {
17679+
wr.do_write_with_blobs(i.offset, i.offset + i.length, i.offset + i.length, bd);
17680+
} else {
17681+
wr.do_write(i.offset, data_bl);
17682+
}
17683+
}
17684+
estimator->finish();
17685+
uint32_t changes_start = regions.front().offset;
17686+
uint32_t changes_end = regions.back().offset + regions.back().length;
17687+
o->extent_map.compress_extent_map(changes_start, changes_end - changes_start);
17688+
o->extent_map.dirty_range(changes_start, changes_end - changes_start);
17689+
o->extent_map.maybe_reshard(changes_start, changes_end);
17690+
return 0;
17691+
}
17692+
1758717693
int BlueStore::_write(TransContext *txc,
1758817694
CollectionRef& c,
1758917695
OnodeRef& o,

src/os/bluestore/BlueStore.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,8 @@ class BlueStore : public ObjectStore,
273273
struct Onode;
274274
class Scanner;
275275
class Estimator;
276+
Estimator* create_estimator();
277+
276278
typedef boost::intrusive_ptr<Collection> CollectionRef;
277279
typedef boost::intrusive_ptr<Onode> OnodeRef;
278280

@@ -1691,6 +1693,7 @@ class BlueStore : public ObjectStore,
16911693
std::optional<double> compression_req_ratio;
16921694

16931695
ContextQueue *commit_queue;
1696+
std::unique_ptr<Estimator> estimator;
16941697

16951698
OnodeCacheShard* get_onode_cache() const {
16961699
return onode_space.cache;
@@ -3310,6 +3313,13 @@ class BlueStore : public ObjectStore,
33103313
uint32_t op_flags = 0,
33113314
uint64_t retry_count = 0);
33123315

3316+
void _do_read_and_pad(
3317+
Collection* c,
3318+
OnodeRef& o,
3319+
uint32_t offset,
3320+
uint32_t length,
3321+
ceph::buffer::list& bl);
3322+
33133323
int _do_readv(
33143324
Collection *c,
33153325
OnodeRef& o,
@@ -3834,7 +3844,14 @@ class BlueStore : public ObjectStore,
38343844
uint64_t offset, uint64_t length,
38353845
ceph::buffer::list& bl,
38363846
uint32_t fadvise_flags);
3837-
3847+
int _do_write_v2_compressed(
3848+
TransContext *txc,
3849+
CollectionRef &c,
3850+
OnodeRef& o,
3851+
WriteContext& wctx,
3852+
uint32_t offset, uint32_t length,
3853+
ceph::buffer::list& bl,
3854+
uint32_t scan_left, uint32_t scan_right);
38383855
int _touch(TransContext *txc,
38393856
CollectionRef& c,
38403857
OnodeRef& o);

src/os/bluestore/Compression.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ using P = BlueStore::printer;
5050
using Estimator = BlueStore::Estimator;
5151
using P = BlueStore::printer;
5252

53-
void Estimator::reset()
53+
void Estimator::cleanup()
5454
{
5555
new_size = 0;
5656
uncompressed_size = 0;
@@ -220,6 +220,12 @@ void Estimator::finish()
220220
dout(25) << "exp_comp_factor=" << expected_compression_factor
221221
<< " exp_recomp_err=" << expected_recompression_error
222222
<< " exp_pad_exp=" << expected_pad_expansion << dendl;
223+
cleanup();
224+
}
225+
226+
Estimator* BlueStore::create_estimator()
227+
{
228+
return new Estimator(this);
223229
}
224230

225231
struct scan_blob_element_t {

src/os/bluestore/Compression.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ class BlueStore::Estimator {
2222
Estimator(BlueStore* bluestore)
2323
:bluestore(bluestore) {}
2424

25-
// Prepare for new write
26-
void reset();
2725
// Inform estimator that an extent is a candidate for recompression.
2826
// Estimator has to calculate (guess) the cost (size) of the referenced data.
2927
// 'gain' is the size that will be released should extent be recompressed.
@@ -70,6 +68,8 @@ class BlueStore::Estimator {
7068
uint32_t actual_compressed = 0;
7169
uint32_t actual_compressed_plus_pad = 0;
7270
std::map<uint32_t, uint32_t> extra_recompress;
71+
// Prepare for new write
72+
void cleanup();
7373
};
7474

7575

0 commit comments

Comments
 (0)