Skip to content

Commit 7131d27

Browse files
committed
MB-47318: [BP] Push UpdateFlowControl through to PassiveStream
Pass the UpdateFlowControl object which owns the correct buffer ack value. If the PassiveStream has to buffer the DcpResponse for later processing it is in charge of releasing the ack bytes and will in the next patch save the value. Change-Id: Ifcc3aade4d53b1780fee3000b11b3f8ef14c6266 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/166227 Well-Formed: Restriction Checker Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 645f02e commit 7131d27

File tree

7 files changed

+84
-64
lines changed

7 files changed

+84
-64
lines changed

engines/ep/src/dcp/consumer.cc

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,8 @@ ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque,
561561
cb::const_byte_buffer meta,
562562
uint32_t deleteTime,
563563
IncludeDeleteTime includeDeleteTime,
564-
DeleteSource deletionCause) {
564+
DeleteSource deletionCause,
565+
UpdateFlowControl& ufc) {
565566
lastMessageTime = ep_current_time();
566567

567568
if (doDisconnect()) {
@@ -646,15 +647,16 @@ ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque,
646647

647648
try {
648649
err = stream->messageReceived(std::make_unique<MutationConsumerMessage>(
649-
item,
650-
opaque,
651-
IncludeValue::Yes,
652-
IncludeXattrs::Yes,
653-
includeDeleteTime,
654-
IncludeDeletedUserXattrs::Yes,
655-
key.getEncoding(),
656-
emd.release(),
657-
cb::mcbp::DcpStreamId{}));
650+
item,
651+
opaque,
652+
IncludeValue::Yes,
653+
IncludeXattrs::Yes,
654+
includeDeleteTime,
655+
IncludeDeletedUserXattrs::Yes,
656+
key.getEncoding(),
657+
emd.release(),
658+
cb::mcbp::DcpStreamId{}),
659+
ufc);
658660
} catch (const std::bad_alloc&) {
659661
err = ENGINE_ENOMEM;
660662
}
@@ -749,12 +751,12 @@ ENGINE_ERROR_CODE DcpConsumer::toMainDeletion(DeleteType origin,
749751
meta,
750752
deleteTime,
751753
includeDeleteTime,
752-
deleteSource);
754+
deleteSource,
755+
ufc);
753756

754757
// TMPFAIL means the stream has buffered the message for later processing
755758
// so skip flowControl, success or any other error, we still need to ack
756759
if (err == ENGINE_TMPFAIL) {
757-
ufc.release();
758760
// Mask the TMPFAIL
759761
return ENGINE_SUCCESS;
760762
}
@@ -1818,15 +1820,14 @@ ENGINE_ERROR_CODE DcpConsumer::lookupStreamAndDispatchMessage(
18181820
// Pass the message to the associated stream.
18191821
ENGINE_ERROR_CODE err;
18201822
try {
1821-
err = stream->messageReceived(std::move(msg));
1823+
err = stream->messageReceived(std::move(msg), ufc);
18221824
} catch (const std::bad_alloc&) {
18231825
return ENGINE_ENOMEM;
18241826
}
18251827

18261828
// The item was buffered and will be processed later
18271829
if (err == ENGINE_TMPFAIL) {
18281830
notifyVbucketReady(vbucket);
1829-
ufc.release();
18301831
return ENGINE_SUCCESS;
18311832
}
18321833

engines/ep/src/dcp/consumer.h

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
class DcpResponse;
3737
class PassiveStream;
3838
class StreamEndResponse;
39+
class UpdateFlowControl;
3940

4041
/**
4142
* A DCP Consumer object represents a DCP connection which receives streams
@@ -461,7 +462,8 @@ class DcpConsumer : public ConnHandler,
461462
cb::const_byte_buffer meta,
462463
uint32_t deleteTime,
463464
IncludeDeleteTime includeDeleteTime,
464-
DeleteSource deletionCause);
465+
DeleteSource deletionCause,
466+
UpdateFlowControl& ufc);
465467

466468
/**
467469
* Helper function for mutation() and prepare() messages as they are handled
@@ -515,39 +517,6 @@ class DcpConsumer : public ConnHandler,
515517
*/
516518
std::shared_ptr<PassiveStream> removeStream(Vbid vbid);
517519

518-
/**
519-
* RAII helper class to update the flowControl object with the number of
520-
* bytes to free and trigger the consumer notify
521-
*/
522-
class UpdateFlowControl {
523-
public:
524-
UpdateFlowControl(DcpConsumer& consumer, uint32_t bytes)
525-
: consumer(consumer), bytes(bytes) {
526-
if (bytes == 0) {
527-
throw std::invalid_argument("UpdateFlowControl given 0 bytes");
528-
}
529-
}
530-
531-
~UpdateFlowControl() {
532-
if (bytes) {
533-
consumer.flowControl.incrFreedBytes(bytes);
534-
consumer.scheduleNotifyIfNecessary();
535-
}
536-
}
537-
538-
/**
539-
* If the user no longer wants this instance to perform the update
540-
* calling release() means this instance will skip the update.
541-
*/
542-
void release() {
543-
bytes = 0;
544-
}
545-
546-
private:
547-
DcpConsumer& consumer;
548-
uint32_t bytes;
549-
};
550-
551520
/**
552521
* Helper method to lookup the correct stream for the given
553522
* vbid / opaque pair, and then dispatch the message to that stream.
@@ -659,6 +628,8 @@ class DcpConsumer : public ConnHandler,
659628

660629
bool alwaysBufferOperations{false};
661630

631+
friend UpdateFlowControl;
632+
662633
static const std::string noopCtrlMsg;
663634
static const std::string noopIntervalCtrlMsg;
664635
static const std::string connBufferCtrlMsg;
@@ -671,6 +642,39 @@ class DcpConsumer : public ConnHandler,
671642
static const std::string enableOpcodeExpiryCtrlMsg;
672643
};
673644

645+
/**
646+
* RAII helper class to update the flowControl object with the number of
647+
* bytes to free and trigger the consumer notify
648+
*/
649+
class UpdateFlowControl {
650+
public:
651+
UpdateFlowControl(DcpConsumer& consumer, uint32_t bytes)
652+
: consumer(consumer), bytes(bytes) {
653+
if (bytes == 0) {
654+
throw std::invalid_argument("UpdateFlowControl given 0 bytes");
655+
}
656+
}
657+
658+
~UpdateFlowControl() {
659+
if (bytes) {
660+
consumer.flowControl.incrFreedBytes(bytes);
661+
consumer.scheduleNotifyIfNecessary();
662+
}
663+
}
664+
665+
/**
666+
* Calling release means that this object will not update the FlowControl
667+
* instance when destructed.
668+
*/
669+
void release() {
670+
bytes = 0;
671+
}
672+
673+
private:
674+
DcpConsumer& consumer;
675+
uint32_t bytes{0};
676+
};
677+
674678
/*
675679
* Task that orchestrates rollback on Consumer,
676680
* runs in background.

engines/ep/src/dcp/passive_stream.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ void PassiveStream::reconnectStream(VBucketPtr& vb,
271271
}
272272

273273
ENGINE_ERROR_CODE PassiveStream::messageReceived(
274-
std::unique_ptr<DcpResponse> dcpResponse) {
274+
std::unique_ptr<DcpResponse> dcpResponse, UpdateFlowControl& ufc) {
275275
if (!dcpResponse) {
276276
return ENGINE_EINVAL;
277277
}
@@ -406,6 +406,7 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(
406406

407407
// Only buffer if the stream is not dead
408408
if (isActive()) {
409+
ufc.release(); // @todo save the value along side the buffered response
409410
buffer.push(std::move(dcpResponse));
410411
}
411412
return ENGINE_TMPFAIL;
@@ -1303,7 +1304,7 @@ bool PassiveStream::Buffer::empty() const {
13031304
return messages.empty();
13041305
}
13051306

1306-
void PassiveStream::Buffer::push(std::unique_ptr<DcpResponse> message) {
1307+
void PassiveStream::Buffer::push(PassiveStream::Buffer::BufferType message) {
13071308
std::lock_guard<std::mutex> lg(bufMutex);
13081309
bytes += message->getMessageSize();
13091310
messages.push_back(std::move(message));
@@ -1318,7 +1319,7 @@ void PassiveStream::Buffer::pop_front(std::unique_lock<std::mutex>& lh,
13181319
bytes -= bytesPopped;
13191320
}
13201321

1321-
std::unique_ptr<DcpResponse>& PassiveStream::Buffer::front(
1322+
PassiveStream::Buffer::BufferType& PassiveStream::Buffer::front(
13221323
std::unique_lock<std::mutex>& lh) {
13231324
return messages.front();
13241325
}

engines/ep/src/dcp/passive_stream.h

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class DropScopeEvent;
3434
class EventuallyPersistentEngine;
3535
class MutationConsumerMessage;
3636
class SystemEventMessage;
37+
class UpdateFlowControl;
3738

3839
class PassiveStream : public Stream {
3940
public:
@@ -94,11 +95,14 @@ class PassiveStream : public Stream {
9495
/*
9596
* Calls the appropriate function to process the message.
9697
*
97-
* @params response The dcp message that needs to be processed.
98+
* @param response The dcp message that needs to be processed.
99+
* @param ackSize the value to use when DCP acking - this may differ from
100+
* DcpResponse::getMessageSize if for example an Item value was
101+
* decompressed
98102
* @returns the error code from processing the message.
99103
*/
100-
virtual ENGINE_ERROR_CODE messageReceived(
101-
std::unique_ptr<DcpResponse> response);
104+
ENGINE_ERROR_CODE messageReceived(
105+
std::unique_ptr<DcpResponse> response, UpdateFlowControl& ackSize);
102106

103107
void addStats(const AddStatFn& add_stat, const void* c) override;
104108

@@ -306,7 +310,9 @@ class PassiveStream : public Stream {
306310

307311
bool empty() const;
308312

309-
void push(std::unique_ptr<DcpResponse> message);
313+
using BufferType = std::unique_ptr<DcpResponse>;
314+
315+
void push(BufferType message);
310316

311317
/*
312318
* Caller must of locked bufMutex and pass as lh (not asserted)
@@ -317,13 +323,13 @@ class PassiveStream : public Stream {
317323
* Return a reference to the item at the front.
318324
* The user must pass a lock to bufMutex.
319325
*/
320-
std::unique_ptr<DcpResponse>& front(std::unique_lock<std::mutex>& lh);
326+
BufferType& front(std::unique_lock<std::mutex>& lh);
321327

322-
size_t bytes;
328+
size_t bytes{0};
323329
/* Lock ordering w.r.t to streamMutex:
324330
First acquire bufMutex and then streamMutex */
325331
mutable std::mutex bufMutex;
326-
std::deque<std::unique_ptr<DcpResponse> > messages;
332+
std::deque<BufferType> messages;
327333
} buffer;
328334

329335
/*

engines/ep/tests/mock/mock_stream.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "mock_stream.h"
1919
#include "checkpoint_manager.h"
20+
#include "dcp/consumer.h"
2021
#include "dcp/response.h"
2122
#include "mock_dcp_producer.h"
2223
#include "vbucket.h"
@@ -91,7 +92,14 @@ void MockActiveStream::consumeAllBackfillItems() {
9192

9293
ENGINE_ERROR_CODE MockPassiveStream::messageReceived(
9394
std::unique_ptr<DcpResponse> dcpResponse) {
94-
return PassiveStream::messageReceived(std::move(dcpResponse));
95+
auto consumer = consumerPtr.lock();
96+
if (!consumer) {
97+
throw std::logic_error(
98+
"MockPassiveStream::messageReceived cannot proceed without the "
99+
"DcpConsumer");
100+
}
101+
UpdateFlowControl ufc(*consumer, dcpResponse->getMessageSize());
102+
return PassiveStream::messageReceived(std::move(dcpResponse), ufc);
95103
}
96104

97105
std::unique_ptr<DcpResponse> MockPassiveStream::public_popFromReadyQ() {

engines/ep/tests/mock/mock_stream.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,7 @@ class MockPassiveStream : public PassiveStream {
288288
transitionState(StreamState::Dead);
289289
}
290290

291-
ENGINE_ERROR_CODE messageReceived(
292-
std::unique_ptr<DcpResponse> dcpResponse) override;
291+
ENGINE_ERROR_CODE messageReceived(std::unique_ptr<DcpResponse> dcpResponse);
293292

294293
void processMarker(SnapshotMarker* marker) override {
295294
PassiveStream::processMarker(marker);

engines/ep/tests/module_tests/dcp_reflection_test.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
152152

153153
void transferResponseMessage();
154154

155-
std::pair<ActiveStream*, PassiveStream*> getStreams();
155+
std::pair<ActiveStream*, MockPassiveStream*> getStreams();
156156

157157
Vbid vbid;
158158
EventuallyPersistentEngine* producerNode;
@@ -400,11 +400,12 @@ DCPLoopbackStreamTest::DcpRoute::getNextProducerMsg(ActiveStream* stream) {
400400
return producerMsg;
401401
}
402402

403-
std::pair<ActiveStream*, PassiveStream*>
403+
std::pair<ActiveStream*, MockPassiveStream*>
404404
DCPLoopbackStreamTest::DcpRoute::getStreams() {
405405
auto* pStream =
406406
dynamic_cast<ActiveStream*>(producer->findStream(vbid).get());
407-
auto* cStream = consumer->getVbucketStream(vbid).get();
407+
auto* cStream = dynamic_cast<MockPassiveStream*>(
408+
consumer->getVbucketStream(vbid).get());
408409
EXPECT_TRUE(pStream);
409410
EXPECT_TRUE(cStream);
410411
return {pStream, cStream};

0 commit comments

Comments
 (0)