Skip to content

Commit 584d3a1

Browse files
committed
MB-48399: Prevent shard count change of initialized magma bucket
Prevent the shard count of a magma bucket from changing by writing a shard count to a file during first bucket initialization then on subsequent initialization read that file and use the previously persisted shard count. Change-Id: I1961c2664ee397540169bd5e3fb7bb940a3b091d Reviewed-on: http://review.couchbase.org/c/kv_engine/+/161959 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 139cd2d commit 584d3a1

File tree

4 files changed

+236
-5
lines changed

4 files changed

+236
-5
lines changed

engines/ep/src/ep_engine.cc

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
#include <boost/algorithm/string/classification.hpp>
5151
#include <boost/algorithm/string/split.hpp>
5252
#include <boost/algorithm/string/trim.hpp>
53+
#include <boost/filesystem.hpp>
54+
#include <folly/portability/Unistd.h>
5355
#include <logger/logger.h>
5456
#include <memcached/audit_interface.h>
5557
#include <memcached/collections.h>
@@ -1933,6 +1935,96 @@ class EpEngineValueChangeListener : public ValueChangedListener {
19331935
EventuallyPersistentEngine &engine;
19341936
};
19351937

1938+
size_t EventuallyPersistentEngine::getShardCount() {
1939+
auto configShardCount = configuration.getMaxNumShards();
1940+
if (configuration.getBackend() != "magma") {
1941+
return configuration.getMaxNumShards();
1942+
}
1943+
1944+
auto diskShardCount = getShardCountFromDisk();
1945+
if (!diskShardCount) {
1946+
return configShardCount;
1947+
}
1948+
1949+
return diskShardCount.value();
1950+
}
1951+
1952+
constexpr std::string_view magmaShardFile = "/magmaShardCount";
1953+
1954+
std::optional<size_t> EventuallyPersistentEngine::getShardCountFromDisk() {
1955+
Expects(configuration.getBackend() == "magma");
1956+
1957+
// Look for the file
1958+
const auto shardFile = boost::filesystem::path(
1959+
configuration.getDbname().append(magmaShardFile));
1960+
if (boost::filesystem::exists(shardFile)) {
1961+
boost::filesystem::ifstream ifs(shardFile);
1962+
std::string data;
1963+
std::getline(ifs, data);
1964+
EP_LOG_INFO("Found shard file for magma with {} shards", data);
1965+
uint64_t shards;
1966+
if (safe_strtoull(data, shards)) {
1967+
return shards;
1968+
}
1969+
1970+
auto msg = "Couldn't read shard file or found invalid data";
1971+
EP_LOG_CRITICAL_RAW(msg);
1972+
throw std::logic_error(msg);
1973+
}
1974+
1975+
return {};
1976+
}
1977+
1978+
void EventuallyPersistentEngine::maybeSaveShardCount(WorkLoadPolicy& workload) {
1979+
if (configuration.getBackend() == "magma") {
1980+
// We should have created this directory already
1981+
Expects(boost::filesystem::exists(configuration.getDbname()));
1982+
1983+
const auto shardFilePath = boost::filesystem::path(
1984+
configuration.getDbname().append(magmaShardFile));
1985+
1986+
if (boost::filesystem::exists(shardFilePath)) {
1987+
// File already exists, don't overwrite it (we should have the same
1988+
// of shards, it's just pointless).
1989+
return;
1990+
}
1991+
1992+
auto* file = fopen(shardFilePath.string().c_str(), "w");
1993+
if (!file) {
1994+
throw std::runtime_error(
1995+
"EventuallyPersistentEngine::maybeSaveShardCount: Could "
1996+
"not load magma shard file");
1997+
}
1998+
1999+
auto shardStr = std::to_string(workload.getNumShards());
2000+
2001+
auto count = fwrite(shardStr.data(), shardStr.size(), 1, file);
2002+
if (!count) {
2003+
throw std::runtime_error(
2004+
"EventuallyPersistentEngine::maybeSaveShardCount: Error "
2005+
"writing shard count to file");
2006+
}
2007+
2008+
auto ret = fflush(file);
2009+
if (ret != 0) {
2010+
throw std::runtime_error(
2011+
"EventuallyPersistentEngine::maybeSaveShardCount: Error "
2012+
"flushing shard file: " +
2013+
std::to_string(ret));
2014+
}
2015+
2016+
ret = fclose(file);
2017+
if (ret != 0) {
2018+
throw std::runtime_error(
2019+
"EventuallyPersistentEngine::maybeSaveShardCount: Error "
2020+
"closing shard file: " +
2021+
std::to_string(ret));
2022+
}
2023+
2024+
Ensures(boost::filesystem::exists(shardFilePath));
2025+
}
2026+
}
2027+
19362028
cb::engine_errc EventuallyPersistentEngine::initialize(const char* config) {
19372029
auto switchToEngine = acquireEngine(this);
19382030
resetStats();
@@ -2020,9 +2112,16 @@ cb::engine_errc EventuallyPersistentEngine::initialize(const char* config) {
20202112
"allow_sanitize_value_in_deletion",
20212113
std::make_unique<EpEngineValueChangeListener>(*this));
20222114

2023-
auto numShards = configuration.getMaxNumShards();
2115+
// The number of shards for a magma bucket cannot be changed after the first
2116+
// bucket instantiation. This is because the number of shards determines
2117+
// the on disk structure of the data. To solve this problem we store a file
2118+
// to the data directory on first bucket creation that tells us how many
2119+
// shards are to be used. We read this file here if it exists and use that
2120+
// number, if not, this should be the first bucket creation.
20242121
workload = std::make_unique<WorkLoadPolicy>(
2025-
configuration.getMaxNumWorkers(), numShards);
2122+
configuration.getMaxNumWorkers(), getShardCount());
2123+
2124+
maybeSaveShardCount(*workload);
20262125

20272126
const auto& confResMode = configuration.getConflictResolutionType();
20282127
if (!setConflictResolutionMode(confResMode)) {
@@ -6493,6 +6592,7 @@ cb::engine_errc EventuallyPersistentEngine::deleteVBucketInner(
64936592
break;
64946593

64956594
case cb::engine_errc::not_my_vbucket:
6595+
64966596
EP_LOG_WARN(
64976597
"Deletion of {} failed because the vbucket doesn't exist!!!",
64986598
vbid);

engines/ep/src/ep_engine.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,24 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface {
12731273
const cb::mcbp::Request& request,
12741274
const AddResponseFn& response);
12751275

1276+
/**
1277+
* Get the configured shard count for the bucket
1278+
* @return shard count
1279+
*/
1280+
size_t getShardCount();
1281+
1282+
/**
1283+
* Attempt to read the shard count from disk
1284+
* @return empty optional if it does not exist
1285+
*/
1286+
std::optional<size_t> getShardCountFromDisk();
1287+
1288+
/**
1289+
* Save the shard count to disk if it does not exist
1290+
* @param workload
1291+
*/
1292+
void maybeSaveShardCount(WorkLoadPolicy& workload);
1293+
12761294
private:
12771295
void doEngineStatsCouchDB(const BucketStatCollector& collector, const EPStats& epstats);
12781296
void doEngineStatsMagma(const BucketStatCollector& collector);

engines/ep/tests/module_tests/evp_engine_test.cc

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <executor/cb3_taskqueue.h>
2626

2727
#include <boost/algorithm/string/join.hpp>
28+
#include <boost/filesystem.hpp>
2829
#include <configuration_impl.h>
2930
#include <platform/dirutils.h>
3031
#include <chrono>
@@ -50,6 +51,10 @@ void EventuallyPersistentEngineTest::SetUp() {
5051
ExecutorPool::create();
5152
}
5253

54+
initializeEngine();
55+
}
56+
57+
void EventuallyPersistentEngineTest::initializeEngine() {
5358
// Setup an engine with a single active vBucket.
5459
EXPECT_EQ(cb::engine_errc::success,
5560
create_ep_engine_instance(get_mock_server_api, &handle))
@@ -90,13 +95,23 @@ void EventuallyPersistentEngineTest::SetUp() {
9095
}
9196

9297
void EventuallyPersistentEngineTest::TearDown() {
98+
shutdownEngine();
99+
ExecutorPool::shutdown();
100+
// Cleanup any files we created - ignore if they don't exist
101+
try {
102+
cb::io::rmrf(test_dbname);
103+
} catch (std::system_error& e) {
104+
if (e.code() != std::error_code(ENOENT, std::system_category())) {
105+
throw e;
106+
}
107+
}
108+
}
109+
110+
void EventuallyPersistentEngineTest::shutdownEngine() {
93111
destroy_mock_cookie(cookie);
94112
// Need to force the destroy (i.e. pass true) because
95113
// NonIO threads may have been disabled (see DCPTest subclass).
96114
engine->destroy(true);
97-
ExecutorPool::shutdown();
98-
// Cleanup any files we created.
99-
cb::io::rmrf(test_dbname);
100115
}
101116

102117
queued_item EventuallyPersistentEngineTest::store_item(
@@ -367,6 +382,92 @@ TEST_P(DurabilityTest, DurabilityStateStats) {
367382
expectStatsForVB(vb);
368383
}
369384

385+
TEST_P(EPEnginePersistentTest, ShardCountsOnSecondBucketInit) {
386+
auto originalShardCount = engine->getWorkLoadPolicy().getNumShards();
387+
auto newShardCount = originalShardCount + 1;
388+
389+
// We populate the config with shards from this value in the initialize fn
390+
numShards = newShardCount;
391+
shutdownEngine();
392+
initializeEngine();
393+
394+
if (bucketType == "persistent_magma") {
395+
EXPECT_EQ(originalShardCount,
396+
engine->getWorkLoadPolicy().getNumShards());
397+
} else {
398+
EXPECT_EQ(newShardCount, engine->getWorkLoadPolicy().getNumShards());
399+
}
400+
}
401+
402+
TEST_P(EPEnginePersistentTest, EngineInitReadOnlyDataDir) {
403+
store_item(vbid, "key", "value");
404+
405+
shutdownEngine();
406+
407+
// As we're modifying the directory we still need execute permissions or we
408+
// can't even look in it.
409+
boost::filesystem::permissions(test_dbname,
410+
boost::filesystem::others_read |
411+
boost::filesystem::owner_read |
412+
boost::filesystem::group_read |
413+
boost::filesystem::others_exe |
414+
boost::filesystem::owner_exe |
415+
boost::filesystem::group_exe);
416+
417+
std::string config = config_string;
418+
config += "dbname=" + test_dbname;
419+
config += ";max_vbuckets=" + std::to_string(numVbuckets) +
420+
";max_num_shards=" + std::to_string(numShards) + ";";
421+
422+
// Set the bucketType
423+
config += generateBucketTypeConfig(bucketType);
424+
425+
EXPECT_EQ(cb::engine_errc::success,
426+
create_ep_engine_instance(get_mock_server_api, &handle))
427+
<< "Failed to create ep engine instance";
428+
engine = reinterpret_cast<EventuallyPersistentEngine*>(handle);
429+
ObjectRegistry::onSwitchThread(engine);
430+
431+
// Should come up fine, but in some sort of read only mode
432+
EXPECT_EQ(cb::engine_errc::success, engine->initialize(config.c_str()));
433+
434+
// Set the filesystem permissions back for the next test
435+
boost::filesystem::permissions(test_dbname, boost::filesystem::all_all);
436+
437+
// Reset our cookie to have a ptr to the new engine which is required when
438+
// we destroy it in TearDown()
439+
cookie = create_mock_cookie(engine);
440+
}
441+
442+
// Tests that engine initializes fine even if the data dir doesn't exist
443+
TEST_P(EPEnginePersistentTest, EngineInitNoDataDir) {
444+
shutdownEngine();
445+
446+
cb::io::rmrf(test_dbname);
447+
448+
std::string config = config_string;
449+
config += "dbname=" + test_dbname + ";";
450+
451+
// Set the bucketType
452+
config += generateBucketTypeConfig(bucketType);
453+
454+
EXPECT_EQ(cb::engine_errc::success,
455+
create_ep_engine_instance(get_mock_server_api, &handle))
456+
<< "Failed to create ep engine instance";
457+
engine = reinterpret_cast<EventuallyPersistentEngine*>(handle);
458+
ObjectRegistry::onSwitchThread(engine);
459+
460+
EXPECT_EQ(cb::engine_errc::success, engine->initialize(config.c_str()));
461+
cookie = create_mock_cookie(engine);
462+
}
463+
464+
INSTANTIATE_TEST_SUITE_P(Persistent,
465+
EPEnginePersistentTest,
466+
EPEngineParamTest::persistentConfigValues(),
467+
[](const ::testing::TestParamInfo<std::string>& info) {
468+
return info.param;
469+
});
470+
370471
INSTANTIATE_TEST_SUITE_P(EphemeralOrPersistent,
371472
DurabilityTest,
372473
EPEngineParamTest::allConfigValues(),

engines/ep/tests/module_tests/evp_engine_test.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ class EventuallyPersistentEngineTest : virtual public ::testing::Test {
3737

3838
void TearDown() override;
3939

40+
/**
41+
* Set up the engine. Does not touch file or executor pool
42+
*/
43+
void initializeEngine();
44+
45+
/**
46+
* Shutdown the engine. Does not touch files or executor pool
47+
*/
48+
void shutdownEngine();
49+
4050
/* Helper methods for tests */
4151

4252
/* Stores an item into the given vbucket. */
@@ -134,6 +144,8 @@ class EPEngineParamTest : virtual public EventuallyPersistentEngineTest,
134144
}
135145
};
136146

147+
class EPEnginePersistentTest : public EPEngineParamTest {};
148+
137149
/*
138150
* EPEngine-level test fixture for Durability.
139151
*/

0 commit comments

Comments
 (0)