Skip to content

Commit 2b05aaa

Browse files
committed
MB-56917: Check memory when fetching item from disk in Magma
Allow MagmaKVStore::makeItem to use createItemCb to determine if sufficient memory is available to create a new item. If there is available memory, then a GetValue with a pointer to a new item and an engine_errc::success is returned. Otherwise, a GetValue with nullptr is returned as an item and engine_errc::temp_faillure / no_memory. Change-Id: I0dd236fb7889d7ccc8858a215c102377f38a5005 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/203384 Reviewed-by: Jim Walker <[email protected]> Tested-by: Mohammad Zaeem <[email protected]>
1 parent ca71f59 commit 2b05aaa

File tree

3 files changed

+118
-81
lines changed

3 files changed

+118
-81
lines changed

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc

Lines changed: 56 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,7 +1156,7 @@ void MagmaKVStore::getMulti(Vbid vbid,
11561156
metaSlice,
11571157
valueSlice,
11581158
bg_itm_ctx->getValueFilter(),
1159-
std::move(createItemCb));
1159+
createItemCb);
11601160
GetValue* rv = &bg_itm_ctx->value;
11611161

11621162
for (auto& fetch : bg_itm_ctx->getRequests()) {
@@ -1165,8 +1165,7 @@ void MagmaKVStore::getMulti(Vbid vbid,
11651165
std::chrono::duration_cast<std::chrono::microseconds>(
11661166
std::chrono::steady_clock::now() -
11671167
fetch->initTime));
1168-
st.readSizeHisto.add(bg_itm_ctx->value.item->getKey().size() +
1169-
bg_itm_ctx->value.item->getNBytes());
1168+
st.readSizeHisto.add(op.Key.Len() + valueSlice.Len());
11701169
}
11711170
++st.io_bg_fetch_docs_read;
11721171
st.io_bgfetch_doc_bytes +=
@@ -1375,12 +1374,12 @@ bool MagmaKVStore::snapshotVBucket(Vbid vbid, const VB::Commit& meta) {
13751374
return true;
13761375
}
13771376

1378-
std::unique_ptr<Item> MagmaKVStore::makeItem(Vbid vb,
1379-
const Slice& keySlice,
1380-
const Slice& metaSlice,
1381-
const Slice& valueSlice,
1382-
ValueFilter filter,
1383-
CreateItemCB createItemcb) const {
1377+
GetValue MagmaKVStore::makeItem(Vbid vb,
1378+
const Slice& keySlice,
1379+
const Slice& metaSlice,
1380+
const Slice& valueSlice,
1381+
ValueFilter filter,
1382+
CreateItemCB createItemCb) const {
13841383
auto key = makeDiskDocKey(keySlice);
13851384
const auto meta = magmakv::getDocMeta(metaSlice);
13861385

@@ -1390,25 +1389,33 @@ std::unique_ptr<Item> MagmaKVStore::makeItem(Vbid vb,
13901389
const bool includeValue = filter != ValueFilter::KEYS_ONLY ||
13911390
key.getDocKey().isInSystemCollection();
13921391

1393-
value_t body;
1392+
size_t nbytes = 0;
13941393
// Only create the body for the value filter and when a valueSlice is given
13951394
if ((includeValue || forceValueFetch) && valueSlice.Data()) {
1396-
body.reset(TaggedPtr<Blob>(
1395+
nbytes = valueSlice.Len();
1396+
}
1397+
1398+
auto [status, item] = createItemCb(key.getDocKey(),
1399+
nbytes,
1400+
meta.getFlags(),
1401+
meta.getExptime(),
1402+
value_t(),
1403+
meta.getDatatype(),
1404+
meta.getCas(),
1405+
meta.getBySeqno(),
1406+
vb,
1407+
meta.getRevSeqno());
1408+
if (status != cb::engine_errc::success) {
1409+
return GetValue(nullptr, status);
1410+
}
1411+
1412+
// Blob creation is deferred to avoid exceeding quota.
1413+
if ((includeValue || forceValueFetch) && valueSlice.Data()) {
1414+
item->replaceValue(TaggedPtr<Blob>(
13971415
Blob::New(valueSlice.Data(), meta.getValueSize()),
13981416
TaggedPtrBase::NoTagValue));
13991417
}
14001418

1401-
auto item =
1402-
std::make_unique<Item>(key.getDocKey(),
1403-
meta.getFlags(),
1404-
meta.getExptime(),
1405-
body,
1406-
meta.getDatatype(),
1407-
meta.getCas(),
1408-
meta.getBySeqno(),
1409-
vb,
1410-
meta.getRevSeqno());
1411-
14121419
if (filter != ValueFilter::KEYS_ONLY) {
14131420
checkAndFixKVStoreCreatedItem(*item);
14141421
}
@@ -1445,13 +1452,13 @@ std::unique_ptr<Item> MagmaKVStore::makeItem(Vbid vb,
14451452
item->setPrepareSeqno(meta.getPrepareSeqno());
14461453
}
14471454

1448-
if (body) {
1455+
if (item->getValue()) {
14491456
// function shared with KEY_ONLY creation paths, so only sanitize when
14501457
// a body was created
14511458
checkAndFixKVStoreCreatedItem(*item);
14521459
}
14531460

1454-
return item;
1461+
return GetValue(std::move(item), status);
14551462
}
14561463

14571464
GetValue MagmaKVStore::makeGetValue(Vbid vb,
@@ -1460,11 +1467,12 @@ GetValue MagmaKVStore::makeGetValue(Vbid vb,
14601467
const Slice& valueSlice,
14611468
ValueFilter filter,
14621469
CreateItemCB createItemCb) const {
1463-
return GetValue(
1464-
makeItem(vb, keySlice, metaSlice, valueSlice, filter, createItemCb),
1465-
cb::engine_errc::success,
1466-
-1,
1467-
false);
1470+
return makeItem(vb,
1471+
keySlice,
1472+
metaSlice,
1473+
valueSlice,
1474+
filter,
1475+
std::move(createItemCb));
14681476
}
14691477

14701478
int MagmaKVStore::saveDocs(MagmaKVStoreTransactionContext& txnCtx,
@@ -2380,18 +2388,19 @@ ScanStatus MagmaKVStore::scanOne(
23802388
magmakv::isCompressed(docMeta));
23812389
}
23822390

2383-
auto itm = makeItem(ctx.vbid,
2384-
keySlice,
2385-
metaSlice,
2386-
value,
2387-
ctx.valFilter,
2388-
getDefaultCreateItemCallback());
2391+
auto gv = makeItem(ctx.vbid,
2392+
keySlice,
2393+
metaSlice,
2394+
value,
2395+
ctx.valFilter,
2396+
getDefaultCreateItemCallback());
2397+
Expects(gv.getStatus() == cb::engine_errc::success);
23892398

23902399
// When we are requested to return the values as compressed AND
23912400
// the value isn't compressed, attempt to compress the value.
23922401
if (ctx.valFilter == ValueFilter::VALUES_COMPRESSED &&
23932402
!magmakv::isCompressed(docMeta)) {
2394-
if (!itm->compressValue()) {
2403+
if (!gv.item->compressValue()) {
23952404
logger->warn(
23962405
"MagmaKVStore::scanOne failed to compress value - {} "
23972406
"key:{} "
@@ -2404,11 +2413,10 @@ ScanStatus MagmaKVStore::scanOne(
24042413
}
24052414
}
24062415

2407-
GetValue rv(std::move(itm),
2408-
cb::engine_errc::success,
2409-
-1,
2410-
ctx.valFilter == ValueFilter::KEYS_ONLY);
2411-
ctx.getValueCallback().callback(rv);
2416+
if (ctx.valFilter == ValueFilter::KEYS_ONLY) {
2417+
gv.setPartial();
2418+
}
2419+
ctx.getValueCallback().callback(gv);
24122420
auto callbackStatus = ctx.getValueCallback().getStatus();
24132421
if (callbackStatus == cb::engine_errc::success) {
24142422
return ScanStatus::Success;
@@ -4120,13 +4128,13 @@ GetValue MagmaKVStore::getBySeqno(KVFileHandle& handle,
41204128
}
41214129

41224130
if (status.IsOkDocFound()) {
4123-
rv.item = makeItem(vbid,
4124-
*key,
4125-
*meta,
4126-
*value,
4127-
filter,
4128-
getDefaultCreateItemCallback());
4129-
rv.setStatus(cb::engine_errc::success);
4131+
rv = makeItem(vbid,
4132+
*key,
4133+
*meta,
4134+
*value,
4135+
filter,
4136+
getDefaultCreateItemCallback());
4137+
Expects(rv.getStatus() == cb::engine_errc::success);
41304138
return rv;
41314139
}
41324140
return rv;

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -616,12 +616,12 @@ class MagmaKVStore : public KVStore {
616616
CompactDBStatus compactDBInternal(std::unique_lock<std::mutex>& vbLock,
617617
std::shared_ptr<CompactionContext> ctx);
618618

619-
std::unique_ptr<Item> makeItem(Vbid vb,
620-
const magma::Slice& keySlice,
621-
const magma::Slice& metaSlice,
622-
const magma::Slice& valueSlice,
623-
ValueFilter filter,
624-
CreateItemCB createItemCb) const;
619+
GetValue makeItem(Vbid vb,
620+
const magma::Slice& keySlice,
621+
const magma::Slice& metaSlice,
622+
const magma::Slice& valueSlice,
623+
ValueFilter filter,
624+
CreateItemCB createItemCb) const;
625625

626626
GetValue makeGetValue(Vbid vb,
627627
const magma::Slice& keySlice,

engines/ep/tests/module_tests/evp_store_test.cc

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -629,12 +629,14 @@ TEST_P(EPBucketFullEvictionTest, xattrExpiryOnFullyEvictedItem) {
629629
}
630630

631631
TEST_P(EPBucketFullEvictionTest, GetMultiShouldNotExceedMutationWatermark) {
632-
if (!isCouchstore()) {
632+
// This test relies on precise memory tracking
633+
const auto& stats = engine->getEpStats();
634+
if (!stats.isMemoryTrackingEnabled()) {
633635
GTEST_SKIP();
634636
}
637+
635638
ASSERT_EQ(cb::engine_errc::success,
636639
store->setVBucketState(vbid, vbucket_state_active, {}));
637-
const auto& stats = engine->getEpStats();
638640
EXPECT_LT(stats.getPreciseTotalMemoryUsed(), stats.getMaxDataSize());
639641

640642
// Load a document of size 1MiB
@@ -662,18 +664,18 @@ TEST_P(EPBucketFullEvictionTest, GetMultiShouldNotExceedMutationWatermark) {
662664
// run bgFetch
663665
runBGFetcherTask();
664666

665-
// BGFetch should fail to allocate memory above bucekt quota (now works for
666-
// couch-kvstore), however this still fails for magma
667667
auto vb = store->getVBucket(vbid);
668668
EXPECT_FALSE(vb->hasPendingBGFetchItems());
669669
EXPECT_LE(stats.getPreciseTotalMemoryUsed(), stats.getMaxDataSize());
670670
}
671671

672672
TEST_P(EPBucketFullEvictionTest, BgfetchSucceedsUntilMutationWatermark) {
673-
if (!isCouchstore()) {
673+
// This test involves adjusting the bucket quota for the backend. Since
674+
// Nexus utilises both Magma and Couchstore, manipulating the bucket quota
675+
// to accommodate one backend will result in failure for the other.
676+
if (isNexus()) {
674677
GTEST_SKIP();
675678
}
676-
677679
// This test relies on precise memory tracking
678680
const auto& stats = engine->getEpStats();
679681
if (!stats.isMemoryTrackingEnabled()) {
@@ -688,17 +690,17 @@ TEST_P(EPBucketFullEvictionTest, BgfetchSucceedsUntilMutationWatermark) {
688690

689691
// Load documents of size 1MiB
690692
const std::string value(valueSize, 'x');
691-
auto key1 = makeStoredDocKey("key_0");
692-
auto key2 = makeStoredDocKey("key_1");
693+
auto key0 = makeStoredDocKey("key_0");
694+
auto key1 = makeStoredDocKey("key_1");
693695

694-
// store an items
696+
// store items
697+
store_item(vbid, key0, value);
695698
store_item(vbid, key1, value);
696-
store_item(vbid, key2, value);
697699
flush_vbucket_to_disk(vbid, 2);
698700

699701
// evict items
702+
evict_key(vbid, key0);
700703
evict_key(vbid, key1);
701-
evict_key(vbid, key2);
702704

703705
// check item has been removed
704706
auto options = static_cast<get_options_t>(
@@ -707,37 +709,64 @@ TEST_P(EPBucketFullEvictionTest, BgfetchSucceedsUntilMutationWatermark) {
707709
auto cookie1 = create_mock_cookie();
708710
auto cookie2 = create_mock_cookie();
709711

710-
auto gv = store->get(key1, vbid, cookie1, options);
712+
// Couchstore sorts key pointers and retrieves the first item in the queue.
713+
// Initially, the queue is ["Key_1", "Key_0"], which is then sorted to
714+
// ["Key_0", "Key_1"]. In Magma, there is no sorting; instead, the first
715+
// item from the queue is popped and requested. When a new item is
716+
// requested, it is pushed to the front of the queue. To ensure Key_0 always
717+
// succeeds and Key_1 fails, we first request Key_1 ["Key_1"] and then
718+
// request Key_0 ["Key_0", "Key_1"].
719+
auto gv = store->get(key1, vbid, cookie2, options);
711720
EXPECT_EQ(cb::engine_errc::would_block, gv.getStatus());
712-
gv = store->get(key2, vbid, cookie2, options);
721+
gv = store->get(key0, vbid, cookie1, options);
713722
EXPECT_EQ(cb::engine_errc::would_block, gv.getStatus());
714723

715724
// set bucket quota to current memory usage + 3MiB such that the first
716725
// bgfetch will pass and the second one will fail:
717726
// couchstore fetches value + addition overhead = ~1.3MiB
718727
// We want an additional 1MiB for fetching the first item successfully
719-
engine->setMaxDataSize(stats.getPreciseTotalMemoryUsed() + (3 * valueSize));
728+
// Magma requires an addition 1MiB overhead than couchstore.
729+
if (engine->getConfiguration().getBackend() == "magma") {
730+
engine->setMaxDataSize(stats.getPreciseTotalMemoryUsed() +
731+
(4 * valueSize));
732+
} else {
733+
engine->setMaxDataSize(stats.getPreciseTotalMemoryUsed() +
734+
(3 * valueSize));
735+
}
720736

721737
int callbackCounter = 0;
722-
cookie1->setUserNotifyIoComplete(
723-
[&callbackCounter](cb::engine_errc status) {
724-
EXPECT_EQ(cb::engine_errc::success, status);
725-
callbackCounter++;
726-
});
727-
cookie2->setUserNotifyIoComplete(
728-
[&callbackCounter](cb::engine_errc status) {
729-
EXPECT_EQ(cb::engine_errc::temporary_failure, status);
730-
callbackCounter++;
731-
});
738+
int numSucess = 0;
739+
int numFailure = 0;
740+
cookie1->setUserNotifyIoComplete([&callbackCounter,
741+
&numSucess,
742+
&numFailure](cb::engine_errc status) {
743+
if (status == cb::engine_errc::success) {
744+
numSucess++;
745+
} else {
746+
numFailure++;
747+
}
748+
callbackCounter++;
749+
});
750+
cookie2->setUserNotifyIoComplete([&callbackCounter,
751+
&numSucess,
752+
&numFailure](cb::engine_errc status) {
753+
if (status == cb::engine_errc::success) {
754+
numSucess++;
755+
} else {
756+
numFailure++;
757+
}
758+
callbackCounter++;
759+
});
732760

733761
// run bgFetch
734762
runBGFetcherTask();
735763

736-
// BGFetch should fail to allocate memory above bucekt quota (now works for
737-
// couch-kvstore), however this still fails for magma
764+
// BGFetch should fail to allocate memory above bucekt quota
738765
auto vb = store->getVBucket(vbid);
739766
EXPECT_FALSE(vb->hasPendingBGFetchItems());
740767
EXPECT_LE(stats.getPreciseTotalMemoryUsed(), stats.getMaxDataSize());
768+
EXPECT_EQ(1, numSucess);
769+
EXPECT_EQ(1, numFailure);
741770
EXPECT_EQ(2, callbackCounter);
742771

743772
destroy_mock_cookie(cookie1);
@@ -1075,7 +1104,7 @@ TEST_P(EPBucketFullEvictionTest, ExpiryFindsPrepareWithSameCas) {
10751104
vb.reset();
10761105
// Before destroying the engine, prevent createItemCallback from becoming
10771106
// invalid
1078-
createItemCallback = nullptr;
1107+
createItemCallback = KVStoreIface::getDefaultCreateItemCallback();
10791108
resetEngineAndWarmup();
10801109
createItemCallback = this->engine->getCreateItemCallback();
10811110
vb = store->getVBucket(vbid);

0 commit comments

Comments
 (0)