Skip to content

Commit b6425f5

Browse files
committed
MB-54666: Producer doesn't enable ChangeStreams if not magma
CDC requires a magma backend. Change-Id: If8d3f33d7e0809b4a9403d07635359200c88aed7 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/183745 Well-Formed: Restriction Checker Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 2989b63 commit b6425f5

File tree

5 files changed

+48
-8
lines changed

5 files changed

+48
-8
lines changed

engines/ep/src/dcp/producer.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,6 +1112,10 @@ cb::engine_errc DcpProducer::control(uint32_t opaque,
11121112
flatBuffersSystemEventsEnabled = true;
11131113
return cb::engine_errc::success;
11141114
} else if (key == DcpControlKeys::ChangeStreams && valueStr == "true") {
1115+
if (!engine_.getKVBucket()->getStorageProperties().canRetainHistory()) {
1116+
return cb::engine_errc::not_supported;
1117+
}
1118+
11151119
changeStreams = true;
11161120
return cb::engine_errc::success;
11171121
}

engines/ep/tests/mock/mock_magma_kvstore.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313
#include "kvstore/magma-kvstore/magma-memory-tracking-proxy.h"
1414

1515
MockMagmaKVStore::MockMagmaKVStore(MagmaKVStoreConfig& config)
16-
: MagmaKVStore(config) {
16+
: MagmaKVStore(config),
17+
storageProperties(StorageProperties::ByIdScan::Yes,
18+
StorageProperties::AutomaticDeduplication::No,
19+
StorageProperties::PrepareCounting::No,
20+
StorageProperties::CompactionStaleItemCallbacks::Yes,
21+
StorageProperties::HistoryRetentionAvailable::Yes) {
1722
}
1823

1924
KVStoreIface::ReadVBStateResult MockMagmaKVStore::readVBStateFromDisk(

engines/ep/tests/mock/mock_magma_kvstore.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,16 @@ class MockMagmaKVStore : public MagmaKVStore {
6565
fileHandleSyncStatusHook = hook;
6666
}
6767

68+
StorageProperties getStorageProperties() const override {
69+
return storageProperties;
70+
}
71+
6872
TestingHook<> readVBStateFromDiskHook;
6973

7074
std::function<int(VB::Commit&, kvstats_ctx&)> saveDocsErrorInjector;
7175
std::function<bool()> snapshotVBucketErrorInjector;
76+
77+
StorageProperties storageProperties;
7278
};
7379

7480
#endif

engines/ep/tests/module_tests/dcp_single_threaded_test.cc

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,15 +1132,19 @@ TEST_P(STDcpTest, ProducerNegotiatesFlatBuffers) {
11321132
}
11331133

11341134
TEST_P(STDcpTest, ProducerNegotiatesChangeStreams) {
1135-
auto* cookie = create_mock_cookie();
1135+
if (!isMagma()) {
1136+
GTEST_SKIP();
1137+
}
1138+
1139+
// @todo CDC: Can remove when magma has enabled history support
1140+
replaceMagmaKVStore();
11361141

1142+
auto* cookie = create_mock_cookie();
11371143
const auto producer = std::make_shared<MockDcpProducer>(
11381144
*engine, cookie, "test_producer", 0);
1139-
1140-
// Disables by default
11411145
ASSERT_FALSE(producer->areChangeStreamsEnabled());
11421146

1143-
// DCP_CONTROL validation
1147+
// Value validation
11441148
EXPECT_EQ(cb::engine_errc::invalid_arguments,
11451149
producer->control(0, DcpControlKeys::ChangeStreams, "not-true"));
11461150
EXPECT_FALSE(producer->areChangeStreamsEnabled());
@@ -1153,6 +1157,23 @@ TEST_P(STDcpTest, ProducerNegotiatesChangeStreams) {
11531157
destroy_mock_cookie(cookie);
11541158
}
11551159

1160+
TEST_P(STDcpTest, ProducerNegotiatesChangeStreams_NotMagma) {
1161+
if (isMagma()) {
1162+
GTEST_SKIP();
1163+
}
1164+
1165+
auto* cookie = create_mock_cookie();
1166+
const auto producer = std::make_shared<MockDcpProducer>(
1167+
*engine, cookie, "test_producer", 0);
1168+
ASSERT_FALSE(producer->areChangeStreamsEnabled());
1169+
1170+
EXPECT_EQ(cb::engine_errc::not_supported,
1171+
producer->control(0, DcpControlKeys::ChangeStreams, "true"));
1172+
EXPECT_FALSE(producer->areChangeStreamsEnabled());
1173+
1174+
destroy_mock_cookie(cookie);
1175+
}
1176+
11561177
void STDcpTest::testConsumerNegotiatesChangeStreams(bool enabled) {
11571178
MockDcpConnMap connMap(*engine);
11581179
connMap.initialize();

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5110,11 +5110,13 @@ INSTANTIATE_TEST_SUITE_P(Persistent,
51105110
STPassiveStreamMagmaTest,
51115111
STParameterizedBucketTest::magmaConfigValues(),
51125112
STParameterizedBucketTest::PrintToStringParamName);
5113-
#endif /*EP_USE_MAGMA*/
51145113

51155114
void CDCActiveStreamTest::SetUp() {
51165115
STActiveStreamPersistentTest::SetUp();
51175116

5117+
// @todo CDC: Can remove as soon as magma enables history
5118+
replaceMagmaKVStore();
5119+
51185120
CollectionsManifest manifest;
51195121
manifest.add(CollectionEntry::historical,
51205122
cb::NoExpiryLimit,
@@ -5231,7 +5233,7 @@ TEST_P(CDCActiveStreamTest, CollectionNotDeduped_InMemory) {
52315233

52325234
INSTANTIATE_TEST_SUITE_P(Persistent,
52335235
CDCActiveStreamTest,
5234-
STParameterizedBucketTest::persistentConfigValues(),
5236+
STParameterizedBucketTest::magmaConfigValues(),
52355237
STParameterizedBucketTest::PrintToStringParamName);
52365238

52375239
TEST_P(CDCPassiveStreamTest, HistorySnapshotReceived) {
@@ -5277,5 +5279,7 @@ TEST_P(CDCPassiveStreamTest, HistorySnapshotReceived) {
52775279

52785280
INSTANTIATE_TEST_SUITE_P(Persistent,
52795281
CDCPassiveStreamTest,
5280-
STParameterizedBucketTest::persistentConfigValues(),
5282+
STParameterizedBucketTest::magmaConfigValues(),
52815283
STParameterizedBucketTest::PrintToStringParamName);
5284+
5285+
#endif /*EP_USE_MAGMA*/

0 commit comments

Comments
 (0)