Skip to content

Commit 3b641b0

Browse files
committed
MB-58531: Allow the config to define collections per OSO snapshot
Add a new configuration value (default to 1) * dcp_oso_max_collections_per_backfill Read the new config parameter when ActiveStream decides if an OSO snapshot can be used (replacing the singleCollection() test). Add a unit test which configures > 1 collection per snapshot and checks that two collections are delivered. Change-Id: Ic8d56e08a033b448cc7e494ba0b334b48a204b64 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/197004 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent f070b4a commit 3b641b0

File tree

9 files changed

+134
-15
lines changed

9 files changed

+134
-15
lines changed

engines/ep/configuration.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,12 @@
11141114
"dynamic": true,
11151115
"type": "size_t"
11161116
},
1117+
"dcp_oso_max_collections_per_backfill": {
1118+
"default": "1",
1119+
"desr": "This is the maximum number of collections that a DCP stream can be filtering to be eligible for OSO",
1120+
"dynamic": true,
1121+
"type": "size_t"
1122+
},
11171123
"dcp_scan_byte_limit": {
11181124
"default": {
11191125
"on-prem": "4 * 1024 * 1024",

engines/ep/src/collections/vbucket_filter.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,15 @@ cb::engine_errc Filter::checkPrivileges(
503503
return cb::engine_errc::success;
504504
}
505505

506+
bool Filter::isOsoSuitable(size_t limit) const {
507+
// Note: passthrough means all collections match, we could probably do OSO
508+
// for passthrough, setting the key range to match every collection, i.e.
509+
// namespace \00 to \01 and \08 to \ff, but for now just consider the
510+
// explicitly filter case and when the filter size is acceptable for the
511+
// given limit.
512+
return !passthrough && filter.size() <= limit;
513+
}
514+
506515
void Filter::dump() const {
507516
std::cerr << *this << std::endl;
508517
}

engines/ep/src/collections/vbucket_filter.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,11 +194,6 @@ class Filter {
194194
return filter.end();
195195
}
196196

197-
/// @return true if this filter represents a single collection
198-
bool singleCollection() const {
199-
return !passthrough && filter.size() == 1;
200-
}
201-
202197
/**
203198
* Method to check if the filter dose not filter collections
204199
* @return true if the filter is a pass-through filter
@@ -218,6 +213,15 @@ class Filter {
218213
return !isPassThroughFilter() && !isLegacyFilter();
219214
}
220215

216+
/**
217+
* Check if the filter is suitable for an OSO snapshot given the limit.
218+
* Currently a !passthrough filter with a size that does not exceed the
219+
* limit would result in true.
220+
*
221+
* @return true if an OSO snapshot is ok for this filter
222+
*/
223+
bool isOsoSuitable(size_t limit) const;
224+
221225
/**
222226
* Dump this to std::cerr
223227
*/

engines/ep/src/dcp/active_stream.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2030,18 +2030,19 @@ void ActiveStream::scheduleBackfill_UNLOCKED(DcpProducer& producer,
20302030
bool ActiveStream::tryAndScheduleOSOBackfill(DcpProducer& producer,
20312031
VBucket& vb) {
20322032
// OSO only _allowed_ (but may not be chosen):
2033-
// if the filter is set to a single collection.
2034-
// if this is the initial backfill request
20352033
// if the client has enabled OSO
2036-
if (producer.isOutOfOrderSnapshotsEnabled() && filter.singleCollection() &&
2034+
// if the size of the collection filter fits the current configured max
2035+
// if this is the initial backfill request (diskonly or not)
2036+
const auto& config = engine->getConfiguration();
2037+
2038+
if (producer.isOutOfOrderSnapshotsEnabled() &&
2039+
filter.isOsoSuitable(config.getDcpOsoMaxCollectionsPerBackfill()) &&
20372040
lastReadSeqno.load() == 0 &&
20382041
((curChkSeqno.load() > lastReadSeqno.load() + 1) || (isDiskOnly()))) {
2039-
20402042
// however OSO is only _used_ if:
20412043
// - dcp_oso_backfill is set to enabled,
20422044
// - dcp_oso_backfill is set to "auto", and OSO is predicted to be
20432045
// faster for this backfill.
2044-
const auto& config = engine->getConfiguration();
20452046
const auto osoBackfill = config.getDcpOsoBackfill();
20462047
if (osoBackfill == "disabled") {
20472048
return false;

engines/ep/src/ep_engine.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,9 @@ cb::engine_errc EventuallyPersistentEngine::setDcpParam(const std::string& key,
957957
getConfiguration().setDcpTakeoverMaxTime(std::stoull(val));
958958
} else if (key == "dcp_backfill_byte_limit") {
959959
getConfiguration().setDcpBackfillByteLimit(std::stoull(val));
960+
} else if (key == "dcp_oso_max_collections_per_backfill") {
961+
getConfiguration().setDcpOsoMaxCollectionsPerBackfill(
962+
std::stoull(val));
960963
} else {
961964
msg = "Unknown config param";
962965
rv = cb::engine_errc::no_such_key;

engines/ep/tests/ep_testsuite.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6547,6 +6547,7 @@ static enum test_result test_mb19687_fixed(EngineIface* h) {
65476547
"ep_dcp_oso_backfill_large_value_ratio",
65486548
"ep_dcp_oso_backfill_small_value_ratio",
65496549
"ep_dcp_oso_backfill_small_item_size_threshold",
6550+
"ep_dcp_oso_max_collections_per_backfill",
65506551
"ep_dcp_scan_byte_limit",
65516552
"ep_dcp_scan_item_limit",
65526553
"ep_dcp_takeover_max_time",
@@ -6808,6 +6809,7 @@ static enum test_result test_mb19687_fixed(EngineIface* h) {
68086809
"ep_dcp_oso_backfill_large_value_ratio",
68096810
"ep_dcp_oso_backfill_small_value_ratio",
68106811
"ep_dcp_oso_backfill_small_item_size_threshold",
6812+
"ep_dcp_oso_max_collections_per_backfill",
68116813
"ep_dcp_producer_snapshot_marker_yield_limit",
68126814
"ep_dcp_scan_byte_limit",
68136815
"ep_dcp_scan_item_limit",

engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include "tests/module_tests/test_helpers.h"
2222
#include "vbucket.h"
2323

24+
#include <folly/portability/GMock.h>
25+
#include <folly/portability/GTest.h>
2426
#include <spdlog/fmt/fmt.h>
2527

2628
class CollectionsOSODcpTest : public CollectionsDcpParameterizedTest {
@@ -29,10 +31,12 @@ class CollectionsOSODcpTest : public CollectionsDcpParameterizedTest {
2931
}
3032

3133
void SetUp() override {
32-
config_string += "collections_enabled=true";
3334
// Disable OSO backfill auto-selection to simplify most of the
3435
// functional tests - set to always.
35-
config_string += ";dcp_oso_backfill=enabled";
36+
if (!config_string.empty()) {
37+
config_string += ";";
38+
}
39+
config_string += "dcp_oso_backfill=enabled";
3640

3741
CollectionsDcpParameterizedTest::SetUp();
3842
producers = std::make_unique<CollectionsDcpTestProducers>();
@@ -997,6 +1001,90 @@ TEST_P(CollectionsOSODcpAutoSelectTest, SmallThresholdDynamic) {
9971001
testDcpOsoBackfillAutomaticMode(0.004, 0.006, 300);
9981002
}
9991003

1004+
class CollectionsOSOMultiTest : public CollectionsOSODcpTest {
1005+
public:
1006+
void SetUp() override {
1007+
CollectionsOSODcpTest::SetUp();
1008+
std::string msg;
1009+
ASSERT_EQ(cb::engine_errc::success,
1010+
engine->setDcpParam(
1011+
"dcp_oso_max_collections_per_backfill", "100", msg));
1012+
}
1013+
};
1014+
1015+
// The test uses three collections and filters for two and expects the two
1016+
// filtered collections to be sent in the OSO snapshot
1017+
TEST_P(CollectionsOSOMultiTest, multi) {
1018+
using namespace cb::mcbp;
1019+
1020+
CollectionsManifest cm;
1021+
setCollections(cookie,
1022+
cm.add(CollectionEntry::vegetable)
1023+
.add(CollectionEntry::fruit)
1024+
.add(CollectionEntry::dairy));
1025+
flush_vbucket_to_disk(vbid, 3);
1026+
1027+
// 3 collections
1028+
std::array<CollectionID, 3> collections = {CollectionUid::fruit,
1029+
CollectionUid::dairy,
1030+
CollectionUid::vegetable};
1031+
// 4 keys
1032+
std::array<std::string, 4> keys = {{"a", "b", "c", "d"}};
1033+
1034+
// combine!
1035+
for (auto cid : collections) {
1036+
for (const auto& key : keys) {
1037+
store_item(vbid, makeStoredDocKey(key, cid), "value" + key);
1038+
}
1039+
flush_vbucket_to_disk(vbid, keys.size());
1040+
}
1041+
1042+
ensureDcpWillBackfill();
1043+
1044+
// filter on collections 1 and 2 of the 3 that have been written to.
1045+
nlohmann::json filter = {{"collections",
1046+
{collections.at(1).to_string(false),
1047+
collections.at(2).to_string(false)}}};
1048+
createDcpObjects(filter.dump(), OutOfOrderSnapshots::Yes, 0);
1049+
1050+
runBackfill();
1051+
1052+
stepAndExpect(ClientOpcode::DcpOsoSnapshot);
1053+
EXPECT_EQ(uint32_t(request::DcpOsoSnapshotFlags::Start),
1054+
producers->last_oso_snapshot_flags);
1055+
1056+
std::unordered_set<std::string> keys1, keys2;
1057+
1058+
// This test is written to not assume any ordering of the snapshot.
1059+
// This loop will collect all keys and check they are for the correct
1060+
// collections and the correct keys
1061+
while (producer->stepWithBorderGuard(*producers) ==
1062+
cb::engine_errc::success &&
1063+
producers->last_op != ClientOpcode::DcpOsoSnapshot) {
1064+
EXPECT_THAT(producers->last_op,
1065+
testing::AnyOf(ClientOpcode::DcpSystemEvent,
1066+
ClientOpcode::DcpMutation));
1067+
EXPECT_THAT(producers->last_collection_id,
1068+
testing::AnyOf(collections.at(1), collections.at(2)));
1069+
1070+
if (producers->last_op == ClientOpcode::DcpMutation) {
1071+
EXPECT_THAT(producers->last_key, testing::AnyOfArray(keys));
1072+
1073+
if (producers->last_collection_id == collections.at(1)) {
1074+
keys1.emplace(producers->last_key);
1075+
} else {
1076+
keys2.emplace(producers->last_key);
1077+
}
1078+
}
1079+
}
1080+
1081+
EXPECT_EQ(uint32_t(request::DcpOsoSnapshotFlags::End),
1082+
producers->last_oso_snapshot_flags);
1083+
// All Keys from each collection must of been observed
1084+
EXPECT_EQ(keys.size(), keys1.size());
1085+
EXPECT_EQ(keys.size(), keys2.size());
1086+
}
1087+
10001088
INSTANTIATE_TEST_SUITE_P(CollectionsOSOEphemeralTests,
10011089
CollectionsOSOEphemeralTest,
10021090
STParameterizedBucketTest::ephConfigValues(),
@@ -1011,3 +1099,8 @@ INSTANTIATE_TEST_SUITE_P(CollectionsOSODcpAutoSelectTests,
10111099
CollectionsOSODcpAutoSelectTest,
10121100
STParameterizedBucketTest::persistentConfigValues(),
10131101
STParameterizedBucketTest::PrintToStringParamName);
1102+
1103+
INSTANTIATE_TEST_SUITE_P(CollectionsOSOMultiTests,
1104+
CollectionsOSOMultiTest,
1105+
STParameterizedBucketTest::persistentConfigValues(),
1106+
STParameterizedBucketTest::PrintToStringParamName);

include/memcached/dockey.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ class CollectionID {
128128
/// Get network byte order of the value
129129
CollectionIDNetworkOrder to_network() const;
130130

131-
std::string to_string() const;
131+
/// @param xPrefix true if the result should be 0x prefixed
132+
std::string to_string(bool xPrefix = true) const;
132133

133134
/**
134135
* A number of interfaces pass collection-ID as a string, e.g. the Manifest

utilities/dockey.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ std::ostream& operator<<(std::ostream& os, const CollectionID& cid) {
8080
return os << cid.to_string();
8181
}
8282

83-
std::string CollectionID::to_string() const {
84-
return fmt::format("{:#x}", value);
83+
std::string CollectionID::to_string(bool xPrefix) const {
84+
return fmt::format(xPrefix ? "{:#x}" : "{:x}", value);
8585
}
8686

8787
bool operator==(ScopeIDType lhs, const ScopeID& rhs) {

0 commit comments

Comments
 (0)