Skip to content

Commit 7447307

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-33332: Give all Ephemeral backfill items Majority level
Ephemeral only supports sync writes with the Majority level so instead of storing a durability level in each OrderedStoredValue we can simply assume that they are all have the Majority level. Change-Id: I3f7139c356d16b83469131cc52c1781c05a745ed Reviewed-on: http://review.couchbase.org/108785 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 9dd333f commit 7447307

File tree

3 files changed

+71
-5
lines changed

3 files changed

+71
-5
lines changed

engines/ep/src/dcp/backfill_memory.cc

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "ep_time.h"
2222
#include "ephemeral_vb.h"
2323
#include "seqlist.h"
24+
#include "stored-value.h"
2425

2526
#include <phosphor/phosphor.h>
2627

@@ -297,11 +298,19 @@ backfill_status_t DCPBackfillMemoryBuffered::scan() {
297298
// mutated with the HashBucketLock, so get the correct bucket lock
298299
// before calling StoredValue::toItem
299300
auto hbl = evb->ht.getLockedBucket((*rangeItr).getKey());
300-
// @todo-durability: Need to record the original durability
301-
// requirements in OrderedStoredValue (and then add into the
302-
// item we make here); otherwise we cannot correctly form the
303-
// DCP_PREPARE messages for Pending items.
304-
item = (*rangeItr).toItem(getVBucketId());
301+
// Ephemeral only supports a durable write level of Majority so
302+
// instead of storing a durability level in our OrderedStoredValues
303+
// we can just assume that all durable writes have the Majority
304+
// level. Given that this is a backfill item (we will send via DCP)
305+
// we also need to specify an infinite durable write timeout so that
306+
// we do not lose any durable writes. We can supply these items
307+
// for all stored values as they are only set if the underlying
308+
// StoredValue has the CommittedState of Pending.
309+
item = (*rangeItr).toItem(getVBucketId(),
310+
StoredValue::HideLockedCas::No,
311+
StoredValue::IncludeValue::Yes,
312+
{{cb::durability::Level::Majority,
313+
cb::durability::Timeout::Infinity()}});
305314
// A deleted ephemeral item stores the delete time under a delete
306315
// time field, this must be copied to the expiry time so that DCP
307316
// can transmit the original time of deletion

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "dcp_durability_stream_test.h"
1919

20+
#include "../../src/dcp/backfill-manager.h"
2021
#include "checkpoint_utils.h"
2122
#include "dcp/response.h"
2223
#include "dcp_utils.h"
@@ -205,6 +206,51 @@ TEST_P(DurabilityActiveStreamTest, SendDcpAbort) {
205206
ASSERT_FALSE(resp);
206207
}
207208

209+
TEST_P(DurabilityActiveStreamEphemeralTest, BackfillDurabilityLevel) {
210+
auto vb = engine->getVBucket(vbid);
211+
auto& ckptMgr = *vb->checkpointManager;
212+
// Get rid of set_vb_state and any other queue_op we are not interested in
213+
ckptMgr.clear(*vb, 0 /*seqno*/);
214+
215+
const auto key = makeStoredDocKey("key");
216+
const auto& value = "value";
217+
auto item = makePendingItem(
218+
key,
219+
value,
220+
cb::durability::Requirements(cb::durability::Level::Majority,
221+
1 /*timeout*/));
222+
VBQueueItemCtx ctx;
223+
ctx.durability =
224+
DurabilityItemCtx{item->getDurabilityReqs(), nullptr /*cookie*/};
225+
226+
EXPECT_EQ(MutationStatus::WasClean, public_processSet(*vb, *item, ctx));
227+
228+
// We don't account Prepares in VB stats
229+
EXPECT_EQ(0, vb->getNumItems());
230+
231+
stream->transitionStateToBackfilling();
232+
ASSERT_TRUE(stream->isBackfilling());
233+
234+
// Run the backfill we scheduled when we transitioned to the backfilling
235+
// state
236+
auto& bfm = producer->getBFM();
237+
bfm.backfill();
238+
239+
const auto& readyQ = stream->public_readyQ();
240+
EXPECT_EQ(2, readyQ.size());
241+
242+
// First item is a snapshot marker so just skip it
243+
auto resp = stream->public_popFromReadyQ();
244+
resp = stream->public_popFromReadyQ();
245+
ASSERT_TRUE(resp);
246+
EXPECT_EQ(DcpResponse::Event::Prepare, resp->getEvent());
247+
const auto& prep = static_cast<MutationResponse&>(*resp);
248+
const auto respItem = prep.getItem();
249+
EXPECT_EQ(cb::durability::Level::Majority,
250+
respItem->getDurabilityReqs().getLevel());
251+
EXPECT_TRUE(respItem->getDurabilityReqs().getTimeout().isInfinite());
252+
}
253+
208254
TEST_P(DurabilityActiveStreamTest, RemoveUnknownSeqnoAckAtDestruction) {
209255
auto vb = engine->getVBucket(vbid);
210256

@@ -798,3 +844,8 @@ INSTANTIATE_TEST_CASE_P(
798844
DurabilityPassiveStreamPersistentTest,
799845
STParameterizedBucketTest::persistentAllBackendsConfigValues(),
800846
STParameterizedBucketTest::PrintToStringParamName);
847+
848+
INSTANTIATE_TEST_CASE_P(AllBucketTypes,
849+
DurabilityActiveStreamEphemeralTest,
850+
STParameterizedBucketTest::ephConfigValues(),
851+
STParameterizedBucketTest::PrintToStringParamName);

engines/ep/tests/module_tests/dcp_durability_stream_test.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,9 @@ class DurabilityPassiveStreamTest : public SingleThreadedPassiveStreamTest {
5656
*/
5757
class DurabilityPassiveStreamPersistentTest
5858
: public DurabilityPassiveStreamTest {};
59+
60+
/**
61+
* ActiveStream tests for Durability against ephemeral buckets. Single-threaded.
62+
*/
63+
class DurabilityActiveStreamEphemeralTest : public DurabilityActiveStreamTest {
64+
};

0 commit comments

Comments
 (0)