Skip to content

Commit a211154

Browse files
committed
MB-35496: Remove completed prepares at compaction
When we compact a database file we can remove all completed prepares to save space. A prepare is only needed for the lifetime of a SyncWrite to ensure that we still have it should we shutdown and restart. We don't need to worry about the metadata purge interval when it comes to prepares because we stream Mutations instead of Commits when streaming from disk. Change-Id: I50925f11e72f3db38d92f5e0129d9a8dfe87b014 Reviewed-on: http://review.couchbase.org/113383 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 68b5190 commit a211154

File tree

4 files changed

+136
-0
lines changed

4 files changed

+136
-0
lines changed

engines/ep/src/couch-kvstore/couch-kvstore-metadata.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,10 @@ class MetaData {
332332
return operation == Operation::Commit;
333333
}
334334

335+
bool isPrepare() const {
336+
return operation == Operation::Pending;
337+
}
338+
335339
private:
336340
// Assigning a whole byte to this (see MetaDataV3 class comment)
337341
// although only currently need 2 bits.
@@ -536,6 +540,11 @@ class MetaData {
536540
allMeta.v3.isCommit();
537541
}
538542

543+
bool isPrepare() const {
544+
return getVersionInitialisedFrom() == MetaData::Version::V3 &&
545+
allMeta.v3.isPrepare();
546+
}
547+
539548
cb::uint48_t getPrepareSeqno() const {
540549
return allMeta.v3.getPrepareSeqno();
541550
}

engines/ep/src/couch-kvstore/couch-kvstore.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -953,6 +953,16 @@ static int time_purge_hook(Db* d, DocInfo* info, sized_buf item, void* ctx_p) {
953953
}
954954
}
955955
} else {
956+
// We can remove any prepares that have been completed. This works
957+
// because we send Mutations intead of Commits when streaming from
958+
// Disk so we do not need to send a Prepare message to keep things
959+
// consistent on a replica.
960+
if (metadata->isPrepare()) {
961+
if (info->db_seq <= ctx->highCompletedSeqno) {
962+
return COUCHSTORE_COMPACT_DROP_ITEM;
963+
}
964+
}
965+
956966
time_t currtime = ep_real_time();
957967
if (exptime && exptime < currtime && metadata->isCommit()) {
958968
int ret;
@@ -1085,6 +1095,20 @@ bool CouchKVStore::compactDBInternal(compaction_ctx* hook_ctx,
10851095
flags |= couchstore_encode_periodic_sync_flags(periodicSyncBytes);
10861096
}
10871097

1098+
// It would seem logical to grab the state from disk her (readVBState(...))
1099+
// but we cannot do that. If we do, we may race with a concurrent scan as
1100+
// readVBState will overwrite the cached vbucket_state. As such, just use
1101+
// the cached vbucket_state.
1102+
vbucket_state* vbState = getVBucketState(vbid);
1103+
if (!vbState) {
1104+
EP_LOG_WARN(
1105+
"CouchKVStore::compactDBInternal ({}) Failed to obtain vbState "
1106+
"for the highCompletedSeqno",
1107+
vbid);
1108+
return false;
1109+
}
1110+
hook_ctx->highCompletedSeqno = vbState->highCompletedSeqno;
1111+
10881112
// Perform COMPACTION of vbucket.couch.rev into vbucket.couch.rev.compact
10891113
errCode = couchstore_compact_db_ex(compactdb,
10901114
compact_file.c_str(),

engines/ep/src/kvstore.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ struct compaction_ctx {
129129
/// pointer as context cannot be constructed until deeper inside storage
130130
std::unique_ptr<Collections::VB::EraserContext> eraserContext;
131131
Collections::KVStore::DroppedCb droppedKeyCb;
132+
133+
/// The SyncRepl HCS, can purge any prepares before the HCS.
134+
uint64_t highCompletedSeqno = 0;
132135
};
133136

134137
struct kvstats_ctx {

engines/ep/tests/module_tests/evp_store_durability_test.cc

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,11 @@ class DurabilityEPBucketTest : public STParameterizedBucketTest {
173173
uint64_t expectedValue);
174174
};
175175

176+
/**
177+
* Test fixtures for persistent bucket tests that only run under couchstore
178+
*/
179+
class DurabilityCouchstoreBucketTest : public DurabilityEPBucketTest {};
180+
176181
/**
177182
* Test fixture for Durability-related tests applicable to ephemeral and
178183
* persistent buckets with either eviction modes.
@@ -1801,6 +1806,95 @@ TEST_P(DurabilityEPBucketTest, DoNotExpirePendingItem) {
18011806
EXPECT_TRUE(gv.item->isDeleted());
18021807
}
18031808

1809+
// @TODO Rocksdb when we have manual compaction/compaction filtering this test
1810+
// should be made to pass.
1811+
TEST_P(DurabilityCouchstoreBucketTest, RemoveCommittedPreparesAtCompaction) {
1812+
setVBucketToActiveWithValidTopology();
1813+
using namespace cb::durability;
1814+
1815+
auto key = makeStoredDocKey("key");
1816+
auto req = Requirements(Level::Majority, Timeout(1000));
1817+
auto pending = makePendingItem(key, "value", req);
1818+
EXPECT_EQ(ENGINE_EWOULDBLOCK, store->set(*pending, cookie));
1819+
1820+
auto vb = store->getVBucket(vbid);
1821+
vb->commit(key,
1822+
1 /*prepareSeqno*/,
1823+
{} /*commitSeqno*/,
1824+
vb->lockCollections(key));
1825+
1826+
flushVBucketToDiskIfPersistent(vbid, 2);
1827+
1828+
CompactionConfig config;
1829+
compaction_ctx cctx(config, 0);
1830+
cctx.expiryCallback = std::make_shared<FailOnExpiryCallback>();
1831+
1832+
auto* kvstore = store->getOneRWUnderlying();
1833+
1834+
// Sanity - prepare exists before compaction
1835+
DiskDocKey prefixedKey(key, true /*prepare*/);
1836+
auto gv = kvstore->get(prefixedKey, Vbid(0));
1837+
EXPECT_EQ(ENGINE_SUCCESS, gv.getStatus());
1838+
1839+
EXPECT_TRUE(kvstore->compactDB(&cctx));
1840+
1841+
// Check the committed item on disk.
1842+
gv = kvstore->get(DiskDocKey(key), Vbid(0));
1843+
EXPECT_EQ(ENGINE_SUCCESS, gv.getStatus());
1844+
EXPECT_EQ("value", gv.item->getValue()->to_s());
1845+
1846+
// Check the Prepare on disk
1847+
gv = kvstore->get(prefixedKey, Vbid(0));
1848+
EXPECT_EQ(ENGINE_KEY_ENOENT, gv.getStatus());
1849+
}
1850+
1851+
TEST_P(DurabilityCouchstoreBucketTest, RemoveAbortedPreparedAtCompaction) {
1852+
setVBucketToActiveWithValidTopology();
1853+
using namespace cb::durability;
1854+
1855+
auto key = makeStoredDocKey("key");
1856+
auto req = Requirements(Level::Majority, Timeout(1000));
1857+
auto pending = makePendingItem(key, "value", req);
1858+
EXPECT_EQ(ENGINE_EWOULDBLOCK, store->set(*pending, cookie));
1859+
1860+
// Flush prepare
1861+
flushVBucketToDiskIfPersistent(vbid, 1);
1862+
1863+
auto vb = store->getVBucket(vbid);
1864+
vb->abort(key,
1865+
1 /*prepareSeqno*/,
1866+
{} /*commitSeqno*/,
1867+
vb->lockCollections(key));
1868+
1869+
// We can't purge the last item so write a dummy
1870+
auto dummyKey = makeStoredDocKey("dummy");
1871+
auto dummyItem = makeCommittedItem(dummyKey, "dummyValue");
1872+
EXPECT_EQ(ENGINE_SUCCESS, store->set(*dummyItem, cookie));
1873+
1874+
// Flush Abort and dummy
1875+
flushVBucketToDiskIfPersistent(vbid, 2);
1876+
1877+
CompactionConfig config;
1878+
compaction_ctx cctx(config, 0);
1879+
cctx.expiryCallback = std::make_shared<FailOnExpiryCallback>();
1880+
auto* kvstore = store->getOneRWUnderlying();
1881+
EXPECT_TRUE(kvstore->compactDB(&cctx));
1882+
1883+
// Check the Abort on disk. We won't remove it until the purge interval has
1884+
// passed because we need it to ensure we can resume a replica that had an
1885+
// outstanding prepare within the purge interval.
1886+
DiskDocKey prefixedKey(key, true /*prepare*/);
1887+
auto gv = kvstore->get(prefixedKey, Vbid(0));
1888+
EXPECT_EQ(ENGINE_SUCCESS, gv.getStatus());
1889+
1890+
cctx.compactConfig.purge_before_ts = std::numeric_limits<uint64_t>::max();
1891+
EXPECT_TRUE(kvstore->compactDB(&cctx));
1892+
1893+
// Now the Abort should be gone
1894+
gv = kvstore->get(prefixedKey, Vbid(0));
1895+
EXPECT_EQ(ENGINE_KEY_ENOENT, gv.getStatus());
1896+
}
1897+
18041898
template <typename F>
18051899
void DurabilityEphemeralBucketTest::testPurgeCompletedPrepare(F& func) {
18061900
setVBucketStateAndRunPersistTask(
@@ -1961,6 +2055,12 @@ TEST_P(DurabilityBucketTest, ActiveToReplicaAndCommit) {
19612055
ASSERT_EQ(ENGINE_SUCCESS, vb.commit(key, 1, 4, vb.lockCollections(key)));
19622056
}
19632057

2058+
// Test cases which run against couchstore
2059+
INSTANTIATE_TEST_CASE_P(AllBackends,
2060+
DurabilityCouchstoreBucketTest,
2061+
STParameterizedBucketTest::persistentConfigValues(),
2062+
STParameterizedBucketTest::PrintToStringParamName);
2063+
19642064
// Test cases which run against all persistent storage backends.
19652065
INSTANTIATE_TEST_CASE_P(
19662066
AllBackends,

0 commit comments

Comments
 (0)