Skip to content

Commit 3010db4

Browse files
jimwwalkerdaverigby
authored andcommitted
MB-47318: [BP] Add PassiveStream always buffered mode
Add a DcpControl that can force consumer to always process messages on the AUXIO task. Change-Id: Ie96930982efc43ee1ec56e997ca29c658ae0b3e6 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/165016 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Well-Formed: Restriction Checker
1 parent caac463 commit 3010db4

File tree

4 files changed

+26
-2
lines changed

4 files changed

+26
-2
lines changed

engines/ep/src/dcp/consumer.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1907,6 +1907,15 @@ ENGINE_ERROR_CODE DcpConsumer::control(uint32_t opaque,
19071907
setFlowControlBufSize(result);
19081908
return ENGINE_SUCCESS;
19091909
}
1910+
} else if (key == "always_buffer_operations") {
1911+
alwaysBufferOperations = value == "true";
1912+
// Warn about this because only tests should be here
1913+
logger->warn(
1914+
"always_buffer_operations:{} results in "
1915+
"alwaysBufferOperations:{}",
1916+
value,
1917+
alwaysBufferOperations ? "true" : "false");
1918+
return ENGINE_SUCCESS;
19101919
}
19111920

19121921
logger->warn("Invalid ctrl parameter {} {}", key, value);
@@ -1915,3 +1924,7 @@ ENGINE_ERROR_CODE DcpConsumer::control(uint32_t opaque,
19151924

19161925
return ConnHandler::control(opaque, key, value);
19171926
}
1927+
1928+
bool DcpConsumer::shouldBufferOperations() const {
1929+
return alwaysBufferOperations;
1930+
}

engines/ep/src/dcp/consumer.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,12 @@ class DcpConsumer : public ConnHandler,
340340
return allowSanitizeValueInDeletion.load();
341341
}
342342

343+
/**
344+
* Force streams to buffer?
345+
* @return true if streams should always buffer operations
346+
*/
347+
bool shouldBufferOperations() const;
348+
343349
protected:
344350
/**
345351
* Records when the consumer last received a message from producer.
@@ -651,6 +657,8 @@ class DcpConsumer : public ConnHandler,
651657
*/
652658
std::atomic_bool allowSanitizeValueInDeletion;
653659

660+
bool alwaysBufferOperations{false};
661+
654662
static const std::string noopCtrlMsg;
655663
static const std::string noopIntervalCtrlMsg;
656664
static const std::string connBufferCtrlMsg;

engines/ep/src/dcp/passive_stream.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ PassiveStream::PassiveStream(EventuallyPersistentEngine* e,
6464
cur_snapshot_type(Snapshot::None),
6565
cur_snapshot_ack(false),
6666
cur_snapshot_prepare(false),
67-
vb_manifest_uid(vb_manifest_uid) {
67+
vb_manifest_uid(vb_manifest_uid),
68+
alwaysBufferOperations(c->shouldBufferOperations()) {
6869
LockHolder lh(streamMutex);
6970
streamRequest_UNLOCKED(vb_uuid);
7071
itemsReady.store(true);
@@ -327,7 +328,7 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(
327328
vb_);
328329
return ENGINE_DISCONNECT;
329330
case ReplicationThrottle::Status::Process:
330-
if (buffer.empty()) {
331+
if (buffer.empty() && !alwaysBufferOperations) {
331332
/* Process the response here itself rather than buffering it */
332333
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
333334
switch (dcpResponse->getEvent()) {

engines/ep/src/dcp/passive_stream.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,4 +339,6 @@ class PassiveStream : public Stream {
339339
// Flag indicating if the most recent call to processMessage
340340
// backed off due to ENOMEM. Only used for limiting logging
341341
std::atomic<bool> isNoMemory{false};
342+
343+
bool alwaysBufferOperations{false};
342344
};

0 commit comments

Comments
 (0)