Skip to content

Commit 9f02fd9

Browse files
redm2daverigby
authored andcommitted
MB-23898: skip get() call if KeyOnly is true
Previously we would retrieve the value of a key even if the stream was key only. Instead, check if this is the case, and skip the retrieval if the key is not a SystemEvent (which still requires the value). Also add unit-testing for this. Change-Id: I6bdc567cd2c983ee5ef67f4064ba1198776cf487 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/171581 Tested-by: Build Bot <[email protected]> Reviewed-by: Jim Walker <[email protected]>
1 parent d46b32e commit 9f02fd9

File tree

6 files changed

+157
-17
lines changed

6 files changed

+157
-17
lines changed

engines/ep/src/dcp/backfill_disk.cc

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,44 @@ void CacheCallback::callback(CacheLookup& lookup) {
7070
return;
7171
}
7272

73-
auto gv = get(*vb, lookup, *stream_);
74-
if (gv.getStatus() == cb::engine_errc::success) {
75-
if (gv.item->getBySeqno() == lookup.getBySeqno()) {
76-
if (stream_->backfillReceived(std::move(gv.item),
77-
BACKFILL_FROM_MEMORY)) {
78-
setStatus(cb::engine_errc::key_already_exists);
79-
return;
80-
}
81-
setStatus(cb::engine_errc::no_memory); // Pause the backfill
73+
auto item = std::unique_ptr<Item>();
74+
75+
// We don't need to read the value if the stream is KeyOnly AND this key
76+
// is not a SystemEvent (which requires the full payload for all streams)
77+
if (stream_->isKeyOnly() &&
78+
!(lookup.getKey().getDocKey().isInSystemCollection())) {
79+
// Create an empty value to backfill with
80+
item = std::make_unique<Item>(
81+
lookup.getKey().getDocKey(), /* docKey */
82+
0, /* flags */
83+
0, /* expiry */
84+
nullptr, /* value */
85+
0, /* how much memory to allocate */
86+
PROTOCOL_BINARY_RAW_BYTES, /* binary protocol */
87+
0, /* cas */
88+
lookup.getBySeqno()); /* seqNo */
89+
90+
} else {
91+
auto gv = get(*vb, lookup, *stream_);
92+
// If we could not retrieve the value or the Item seqNo does not match,
93+
// return success so the stream continues onto the next key; no backfill
94+
if (gv.getStatus() != cb::engine_errc::success ||
95+
gv.item->getBySeqno() != lookup.getBySeqno()) {
96+
setStatus(cb::engine_errc::success);
8297
return;
8398
}
84-
}
85-
setStatus(cb::engine_errc::success);
99+
item = std::move(gv.item);
100+
}
101+
102+
// Perform the backfill and set status to key_already_exists if successful.
103+
// Otherwise pause stream backfill as op failed - not enough free memory
104+
bool backfillSuccess =
105+
stream_->backfillReceived(std::move(item), BACKFILL_FROM_MEMORY);
106+
cb::engine_errc newStatus = (backfillSuccess)
107+
? cb::engine_errc::key_already_exists
108+
: cb::engine_errc::no_memory;
109+
setStatus(newStatus);
110+
return;
86111
}
87112

88113
DiskCallback::DiskCallback(std::shared_ptr<ActiveStream> s) : streamPtr(s) {

engines/ep/src/dcp/backfill_disk.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,23 @@ enum backfill_state_t {
3333
class CacheCallback : public StatusCallback<CacheLookup> {
3434
public:
3535
CacheCallback(KVBucket& bucket, std::shared_ptr<ActiveStream> s);
36-
36+
/**
37+
* Function called for each key during stream backfill. Informs the caller
38+
* if the information required to populate the DCP stream for the item is
39+
* in-memory.
40+
*
41+
* If all required information is in-memory, the item is backfilled and
42+
* status is set to cb::engine_errc::key_already_exists.
43+
*
44+
* If there is additional information required that isn't in-memory, or the
45+
* item should not be included in the stream for some reason, the status is
46+
* set to cb::engine_errc::success.
47+
*
48+
* The status is set to cb::engine_errc::no_memory only if there is not
49+
* enough memory to backfill, pausing backfilling temporarily.
50+
*
51+
* @param lookup a reference to a CacheLookup
52+
*/
3753
void callback(CacheLookup& lookup) override;
3854

3955
private:

engines/ep/src/vbucket.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2784,11 +2784,10 @@ GetValue VBucket::getInternal(const CookieIface* cookie,
27842784
const bool getDeletedValue = (options & GET_DELETED_VALUE);
27852785
const bool bgFetchRequired = (options & QUEUE_BG_FETCH);
27862786

2787-
auto res = fetchValidValue(WantsDeleted::Yes,
2788-
trackReference,
2789-
cHandle,
2790-
getReplicaItem);
2787+
auto res = fetchValidValue(
2788+
WantsDeleted::Yes, trackReference, cHandle, getReplicaItem);
27912789

2790+
this->isCalledHook();
27922791
auto* v = res.storedValue;
27932792
if (v) {
27942793
// If the fetched value is a Prepared SyncWrite which may already have

engines/ep/src/vbucket.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2516,6 +2516,9 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
25162516
// prepare (if the associated Commit/Abort may have been deduped)
25172517
int64_t allowedDuplicatePrepareThreshold = 0;
25182518

2519+
// Test hook used to determine if the method .getInternal() is called
2520+
TestingHook<> isCalledHook;
2521+
25192522
friend class DurabilityMonitorTest;
25202523
friend class SingleThreadedActiveStreamTest;
25212524
friend class VBucketTestBase;

engines/ep/tests/module_tests/backfill_disk_test.cc

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,20 @@
2121
#include "tests/mock/mock_kvstore.h"
2222
#include "tests/mock/mock_stream.h"
2323
#include "tests/mock/mock_synchronous_ep_engine.h"
24+
#include "tests/module_tests/vbucket_utils.h"
25+
2426
#include <kv_bucket.h>
2527

2628
using namespace ::testing;
2729

2830
/// Test fixture for DCPBackfillDisk class tests.
29-
class DCPBackfillDiskTest : public SingleThreadedEPBucketTest {};
31+
class DCPBackfillDiskTest : public SingleThreadedEPBucketTest {
32+
protected:
33+
void backfillGetDriver(IncludeValue incVal,
34+
IncludeXattrs incXattr,
35+
IncludeDeletedUserXattrs incDeletedXattr,
36+
int expectedGetCalls);
37+
};
3038

3139
/**
3240
* Regression test for MB-47790 - if a backfill fails during the scan() phase
@@ -96,3 +104,88 @@ TEST_F(DCPBackfillDiskTest, ScanDiskError) {
96104
// Replace the MockKVStore with the real one so we can tidy up correctly
97105
MockKVStore::restoreOriginalRWKVStore(*store);
98106
}
107+
108+
void DCPBackfillDiskTest::backfillGetDriver(
109+
IncludeValue incVal,
110+
IncludeXattrs incXattr,
111+
IncludeDeletedUserXattrs incDeletedXattr,
112+
int expectedGetCalls) {
113+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
114+
// Create item, checkpoint and flush so item is to be backfilled from disk
115+
store_item(vbid, makeStoredDocKey("key"), "value");
116+
flushAndRemoveCheckpoints(vbid);
117+
118+
// Set up and define expectations for the hook that is called in the body of
119+
// CacheCallback::get(), i.e., when an item's value is retrieved from cache
120+
auto& vb = *engine->getKVBucket()->getVBucket(vbid);
121+
testing::StrictMock<testing::MockFunction<void()>> getInternalHook;
122+
VBucketTestIntrospector::setIsCalledHook(vb,
123+
getInternalHook.AsStdFunction());
124+
EXPECT_CALL(getInternalHook, Call()).Times(expectedGetCalls);
125+
126+
// Items now only on disk, create producer
127+
auto producer = std::make_shared<MockDcpProducer>(
128+
*engine, cookie, "test-producer", 0 /*flags*/, false /*startTask*/);
129+
130+
auto stream = std::make_shared<MockActiveStream>(
131+
engine.get(),
132+
producer,
133+
0, /* flags */
134+
0, /* opaque */
135+
vb, /* vbucket */
136+
0, /* start seqNo */
137+
1, /* end seqNo */
138+
0, /* vbucket uuid */
139+
0, /* snapshot start seqNo */
140+
0, /* snapshot end seqNo */
141+
incVal, /* includeValue */
142+
incXattr, /* includeXattrs */
143+
incDeletedXattr, /* includeDeletedUserXattrs */
144+
std::string{}); /* jsonFilter */
145+
146+
stream->setActive();
147+
ASSERT_TRUE(stream->isBackfilling());
148+
149+
auto& bfm = producer->getBFM();
150+
ASSERT_EQ(backfill_success, bfm.backfill()); // initialize backfill
151+
152+
auto backfillRemaining = stream->getNumBackfillItemsRemaining();
153+
ASSERT_TRUE(backfillRemaining);
154+
155+
ASSERT_EQ(backfill_success, bfm.backfill()); // scan
156+
ASSERT_EQ(2, stream->public_readyQSize());
157+
158+
auto response = stream->public_popFromReadyQ();
159+
EXPECT_EQ(DcpResponse::Event::SnapshotMarker, response->getEvent());
160+
161+
// Ensure an item mutation is in the DCP stream
162+
response = stream->public_popFromReadyQ();
163+
EXPECT_EQ(DcpResponse::Event::Mutation, response->getEvent());
164+
165+
// Ensure this item has the correct key, and value (if IncludeValue::Yes)
166+
MutationResponse mutResponse = dynamic_cast<MutationResponse&>(*response);
167+
SingleThreadedRCPtr item = mutResponse.getItem();
168+
EXPECT_EQ(item->getKey(), makeStoredDocKey("key"));
169+
170+
if (!(stream->isKeyOnly())) {
171+
EXPECT_EQ(item->getValue()->to_s(), "value");
172+
}
173+
}
174+
175+
// Tests that CacheCallback::get is never called when a stream is keyOnly.
176+
TEST_F(DCPBackfillDiskTest, KeyOnlyBackfillSkipsScan) {
177+
DCPBackfillDiskTest::backfillGetDriver(IncludeValue::No,
178+
IncludeXattrs::No,
179+
IncludeDeletedUserXattrs::No,
180+
0);
181+
}
182+
183+
// Complement to KeyOnlyBackfillSkipGet. Other tests already cover all cases,
184+
// but not using the hook. This test validates the hook, and thus
185+
// KeyOnlyBackfillSkipGet itself, is performing correctly and can be trusted.
186+
TEST_F(DCPBackfillDiskTest, ValueBackfillRegressionTest) {
187+
DCPBackfillDiskTest::backfillGetDriver(IncludeValue::Yes,
188+
IncludeXattrs::Yes,
189+
IncludeDeletedUserXattrs::Yes,
190+
1);
191+
}

engines/ep/tests/module_tests/vbucket_utils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,8 @@ class VBucketTestIntrospector {
3838
static void destroyDM(VBucket& vb) {
3939
vb.durabilityMonitor.reset();
4040
}
41+
42+
static void setIsCalledHook(VBucket& vb, std::function<void()> hook) {
43+
vb.isCalledHook = hook;
44+
}
4145
};

0 commit comments

Comments
 (0)