Skip to content

Commit 15226e1

Browse files
authored
feat(tiering): Two phase stash (#5984)
1 parent aced3dc commit 15226e1

File tree

7 files changed

+73
-36
lines changed

7 files changed

+73
-36
lines changed

src/server/tiered_storage.cc

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -380,14 +380,28 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
380380

381381
tiering::OpManager::EntryId id;
382382
error_code ec;
383+
384+
// TODO(vlad): Replace with encoders for different types
385+
auto stash_string = [&](std::string_view str) {
386+
if (auto prepared = op_manager_->PrepareStash(str.size()); prepared) {
387+
auto [offset, buf] = *prepared;
388+
memcpy(buf.bytes.data(), str.data(), str.size());
389+
tiering::DiskSegment segment{offset, str.size()};
390+
op_manager_->Stash(id, segment, buf);
391+
} else {
392+
ec = prepared.error();
393+
}
394+
};
395+
383396
if (OccupiesWholePages(value->Size())) { // large enough for own page
384397
id = KeyRef(dbid, key);
385-
ec = op_manager_->Stash(id, raw_string.view());
398+
stash_string(raw_string.view());
386399
} else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) {
387400
id = bin->first;
388-
ec = op_manager_->Stash(id, bin->second);
401+
// TODO(vlad): Write bin to prepared buffer instead of allocating one
402+
stash_string(bin->second);
389403
} else {
390-
return {}; // Silently added to bin
404+
return {}; // silently added to bin
391405
}
392406

393407
if (ec) {

src/server/tiering/disk_storage.cc

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,33 +144,40 @@ void DiskStorage::MarkAsFree(DiskSegment segment) {
144144
alloc_.Free(segment.offset, segment.length);
145145
}
146146

147-
std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
148-
DCHECK_GT(bytes.length(), 0u);
147+
io::Result<std::pair<size_t, UringBuf>> DiskStorage::PrepareStash(size_t length) {
148+
using namespace nonstd;
149149

150-
size_t len = bytes.size();
151-
int64_t offset = alloc_.Malloc(len);
150+
int64_t offset = alloc_.Malloc(length);
151+
if (offset >= 0)
152+
return std::make_pair(offset, PrepareBuf(length));
152153

153154
// If we don't have enough space, request grow and return to avoid blocking
154155
if (offset < 0) {
155156
auto ec = RequestGrow(-offset);
156-
return ec ? ec : make_error_code(errc::operation_would_block);
157+
return make_unexpected(ec ? ec : make_error_code(errc::operation_would_block));
157158
}
158159

159-
UringBuf buf = PrepareBuf(len);
160-
memcpy(buf.bytes.data(), bytes.data(), bytes.length());
160+
offset = alloc_.Malloc(length);
161+
if (offset < 0) // we can't fit it even after resizing
162+
return make_unexpected(make_error_code(errc::file_too_large));
163+
164+
return std::make_pair(offset, PrepareBuf(length));
165+
}
161166

162-
auto io_cb = [this, cb, offset, buf, len](int io_res) {
167+
void DiskStorage::Stash(DiskSegment segment, UringBuf buf, StashCb cb) {
168+
auto io_cb = [this, cb, buf, segment](int io_res) {
163169
if (io_res < 0) {
164-
MarkAsFree({size_t(offset), len});
165-
cb(nonstd::make_unexpected(error_code{-io_res, std::system_category()}));
170+
MarkAsFree(segment);
171+
cb(error_code{-io_res, std::system_category()});
166172
} else {
167-
cb(DiskSegment{size_t(offset), len});
173+
cb({});
168174
}
169175
ReturnBuf(buf);
170176
pending_ops_--;
171177
};
172178

173179
pending_ops_++;
180+
size_t offset = segment.offset;
174181
if (buf.buf_idx)
175182
backing_file_->WriteFixedAsync(buf.bytes, offset, *buf.buf_idx, std::move(io_cb));
176183
else
@@ -183,8 +190,6 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
183190
auto ec = RequestGrow(256_MB);
184191
LOG_IF(ERROR, ec && ec != errc::file_too_large) << "Could not call grow :" << ec.message();
185192
}
186-
187-
return {}; // Must succeed after the operation was scheduled to run cleanup
188193
}
189194

190195
DiskStorage::Stats DiskStorage::GetStats() const {

src/server/tiering/disk_storage.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class DiskStorage {
2929
};
3030

3131
using ReadCb = std::function<void(io::Result<std::string_view>)>;
32-
using StashCb = std::function<void(io::Result<DiskSegment>)>;
32+
using StashCb = std::function<void(std::error_code)>;
3333

3434
explicit DiskStorage(size_t max_size);
3535

@@ -42,11 +42,13 @@ class DiskStorage {
4242
// Mark segment as free, performed immediately
4343
void MarkAsFree(DiskSegment segment);
4444

45-
// Request bytes to be stored, cb will be called with assigned segment on completion. Can block to
46-
// grow backing file. Returns error code if operation failed immediately (most likely it failed
47-
// to grow the backing file) or passes an empty segment if the final write operation failed.
48-
// Bytes are copied and can be dropped before cb is resolved
49-
std::error_code Stash(io::Bytes bytes, StashCb cb);
45+
// Allocate segment of at least given length and prepare buffer. Might block to grow backing file.
46+
// Return error if not enough space is available or growing failed.
47+
// Every successful preparation must end in a Stash(), otherwise resources are leaked.
48+
io::Result<std::pair<size_t /* offset */, util::fb2::UringBuf>> PrepareStash(size_t length);
49+
50+
// Write prepared buffer to given segment and resolve completion callback when write is done.
51+
void Stash(DiskSegment segment, util::fb2::UringBuf buf, StashCb cb);
5052

5153
Stats GetStats() const;
5254

src/server/tiering/disk_storage_test.cc

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@ struct DiskStorageTest : public PoolTestBase {
3939

4040
void Stash(size_t index, string value) {
4141
pending_ops_++;
42-
auto buf = make_shared<string>(value);
43-
storage_->Stash(io::Buffer(*buf), [this, index, buf](io::Result<DiskSegment> segment) {
44-
if (segment.has_value()) {
45-
EXPECT_GT(segment->length, 0u);
46-
}
42+
43+
auto prepared = storage_->PrepareStash(value.length());
44+
EXPECT_TRUE(prepared.has_value());
45+
auto [offset, buf] = *prepared;
46+
memcpy(buf.bytes.data(), value.data(), value.size());
47+
48+
DiskSegment segment{offset, value.size()};
49+
storage_->Stash({offset, value.size()}, buf, [this, index, segment](std::error_code ec) {
4750
segments_[index] = segment;
4851
pending_ops_--;
4952
});

src/server/tiering/op_manager.cc

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,17 @@ void OpManager::DeleteOffloaded(DiskSegment segment) {
8787
}
8888
}
8989

90-
std::error_code OpManager::Stash(EntryId id_ref, std::string_view value) {
90+
void OpManager::Stash(EntryId id_ref, tiering::DiskSegment segment, util::fb2::UringBuf buf) {
9191
auto id = ToOwned(id_ref);
9292
unsigned version = pending_stash_ver_[id] = ++pending_stash_counter_;
9393

94-
io::Bytes buf_view = io::Buffer(value);
95-
auto io_cb = [this, version, id = std::move(id)](io::Result<DiskSegment> segment) {
96-
ProcessStashed(Borrowed(id), version, segment);
94+
auto io_cb = [this, version, id = std::move(id), segment](std::error_code ec) {
95+
ProcessStashed(Borrowed(id), version,
96+
ec ? nonstd::make_unexpected(ec) : io::Result<DiskSegment>(segment));
9797
};
9898

9999
// May block due to blocking call to Grow.
100-
auto ec = storage_.Stash(buf_view, std::move(io_cb));
101-
if (ec)
102-
pending_stash_ver_.erase(ToOwned(id_ref));
103-
return ec;
100+
storage_.Stash(segment, buf, std::move(io_cb));
104101
}
105102

106103
OpManager::ReadOp& OpManager::PrepareRead(DiskSegment aligned_segment) {

src/server/tiering/op_manager.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,12 @@ class OpManager {
6363
// Delete offloaded entry located at the segment.
6464
void DeleteOffloaded(DiskSegment segment);
6565

66+
auto PrepareStash(size_t length) {
67+
return storage_.PrepareStash(length);
68+
}
69+
6670
// Stash value to be offloaded. It is opaque to OpManager.
67-
std::error_code Stash(EntryId id, std::string_view value);
71+
void Stash(EntryId id, tiering::DiskSegment segment, util::fb2::UringBuf buf);
6872

6973
Stats GetStats() const;
7074

src/server/tiering/op_manager_test.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ struct OpManagerTest : PoolTestBase, OpManager {
8282
return true;
8383
}
8484

85+
std::error_code Stash(EntryId id, std::string_view value) {
86+
auto prepared = storage_.PrepareStash(value.size());
87+
if (!prepared.has_value())
88+
return prepared.error();
89+
90+
auto [offset, buf] = *prepared;
91+
memcpy(buf.bytes.data(), value.data(), value.size());
92+
DiskSegment segment{offset, value.size()};
93+
OpManager::Stash(id, segment, buf);
94+
return {};
95+
}
96+
8597
absl::flat_hash_map<EntryId, std::string> fetched_;
8698
absl::flat_hash_map<EntryId, DiskSegment> stashed_;
8799
};

0 commit comments

Comments
 (0)