Skip to content

Commit ed9bab4

Browse files
committed
MB-43205: Introduce DcpConsumer::allowSanitizeValueInDeletion
We will start using/testing it in follow-up patches. Change-Id: Ied6a6c14e4d96ed5e39c81c82de8a15e7103e47a Reviewed-on: http://review.couchbase.org/c/kv_engine/+/143665 Tested-by: Build Bot <[email protected]> Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 0c62ce0 commit ed9bab4

File tree

4 files changed

+41
-4
lines changed

4 files changed

+41
-4
lines changed

engines/ep/src/dcp/consumer.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine& engine,
201201
// Consumer needs to know if the Producer supports IncludeDeletedUserXattrs
202202
deletedUserXattrsNegotiation.state =
203203
BlockingDcpControlNegotiation::State::PendingRequest;
204+
205+
allowSanitizeValueInDeletion.store(config.isAllowSanitizeValueInDeletion());
204206
}
205207

206208
DcpConsumer::~DcpConsumer() {

engines/ep/src/dcp/consumer.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,10 @@ class DcpConsumer : public ConnHandler,
328328

329329
void setDisconnect() override;
330330

331+
void setAllowSanitizeValueInDeletion(bool value) {
332+
allowSanitizeValueInDeletion.store(value);
333+
}
334+
331335
protected:
332336
/**
333337
* Records when the consumer last received a message from producer.
@@ -632,6 +636,13 @@ class DcpConsumer : public ConnHandler,
632636
*/
633637
size_t processBufferedMessagesBatchSize;
634638

639+
/**
640+
* Whether this consumer should just sanitize invalid payloads in deletions
641+
* or fail the operation if an invalid payload is detected.
642+
* Non-const as the related configuration param is dynamic.
643+
*/
644+
std::atomic_bool allowSanitizeValueInDeletion;
645+
635646
static const std::string noopCtrlMsg;
636647
static const std::string noopIntervalCtrlMsg;
637648
static const std::string connBufferCtrlMsg;

engines/ep/src/dcp/dcpconnmap.cc

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,22 @@ DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
5353
engine.getConfiguration().getDcpMinCompressionRatio());
5454

5555
// Note: these allocations are deleted by ~Configuration
56-
engine.getConfiguration().addValueChangedListener(
56+
auto& config = engine.getConfiguration();
57+
config.addValueChangedListener(
5758
"dcp_blacklist_fts_connection_logs",
5859
std::make_unique<DcpConfigChangeListener>(*this));
59-
engine.getConfiguration().addValueChangedListener(
60+
config.addValueChangedListener(
6061
"dcp_consumer_process_buffered_messages_yield_limit",
6162
std::make_unique<DcpConfigChangeListener>(*this));
62-
engine.getConfiguration().addValueChangedListener(
63+
config.addValueChangedListener(
6364
"dcp_consumer_process_buffered_messages_batch_size",
6465
std::make_unique<DcpConfigChangeListener>(*this));
65-
engine.getConfiguration().addValueChangedListener(
66+
config.addValueChangedListener(
6667
"dcp_idle_timeout",
6768
std::make_unique<DcpConfigChangeListener>(*this));
69+
config.addValueChangedListener(
70+
"allow_sanitize_value_in_deletion",
71+
std::make_unique<DcpConfigChangeListener>(*this));
6872
}
6973

7074
DcpConnMap::~DcpConnMap() {
@@ -583,6 +587,8 @@ void DcpConnMap::DcpConfigChangeListener::booleanValueChanged(const std::string&
583587
bool value) {
584588
if (key == "dcp_blacklist_fts_connection_logs") {
585589
myConnMap.blacklistFtsConnectionLogsConfigChanged(value);
590+
} else if (key == "allow_sanitize_value_in_deletion") {
591+
myConnMap.consumerAllowSanitizeValueInDeletionConfigChanged(value);
586592
}
587593
}
588594

@@ -636,6 +642,17 @@ void DcpConnMap::blacklistFtsConnectionLogsConfigChanged(bool newValue) {
636642
}
637643
}
638644

645+
void DcpConnMap::consumerAllowSanitizeValueInDeletionConfigChanged(
646+
bool newValue) {
647+
LockHolder lh(connsLock);
648+
for (const auto& cookieToConn : map_) {
649+
auto* consumer = dynamic_cast<DcpConsumer*>(cookieToConn.second.get());
650+
if (consumer) {
651+
consumer->setAllowSanitizeValueInDeletion(newValue);
652+
}
653+
}
654+
}
655+
639656
std::shared_ptr<ConnHandler> DcpConnMap::findByName(const std::string& name) {
640657
LockHolder lh(connsLock);
641658
for (const auto& cookieToConn : map_) {

engines/ep/src/dcp/dcpconnmap.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,13 @@ class DcpConnMap : public ConnMap, public BackfillTrackingIface {
199199
*/
200200
void blacklistFtsConnectionLogsConfigChanged(bool newValue);
201201

202+
/**
203+
* Reflect the EP configuration change into all existing consumers.
204+
*
205+
* @param newValue
206+
*/
207+
void consumerAllowSanitizeValueInDeletionConfigChanged(bool newValue);
208+
202209
/**
203210
* @param engine The engine
204211
* @param cookie The cookie that identifies the connection

0 commit comments

Comments
 (0)