Skip to content

Commit d148b3d

Browse files
BenHuddlestondaverigby
authored andcommitted
Separate support for SyncWrites and SyncReplication
Consumers should be able to stream Prepares, Commits, and Aborts if they wish without acking. Change-Id: I7eb3df1a214cef5acccaa6ab5284a9f1d311b23e Reviewed-on: http://review.couchbase.org/113037 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent c0864fd commit d148b3d

13 files changed

+87
-38
lines changed

engines/ep/src/connhandler.h

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#pragma once
1919

20+
#include "dcp/dcp-types.h"
2021
#include "utility.h"
2122

2223
#include <memcached/dcp.h>
@@ -266,7 +267,23 @@ class ConnHandler {
266267
// Empty
267268
}
268269

270+
/**
271+
* Does the Connection support SyncReplication (Acking prepares)?
272+
*/
269273
bool isSyncReplicationEnabled() const {
274+
return supportsSyncReplication.load() ==
275+
SyncReplication::SyncReplication;
276+
}
277+
278+
/**
279+
* Does the Connection support SyncWrites (sending and receiving Prepares,
280+
* Commits, and Aborts)?
281+
*/
282+
bool isSyncWritesEnabled() const {
283+
return supportsSyncReplication.load() != SyncReplication::No;
284+
}
285+
286+
SyncReplication getSyncReplSupport() const {
270287
return supportsSyncReplication.load();
271288
}
272289

@@ -321,8 +338,12 @@ class ConnHandler {
321338
//! The bucketLogger for this connection
322339
std::shared_ptr<BucketLogger> logger;
323340

324-
/// Does this DCP Connection support Synchronous Replication
325-
std::atomic<bool> supportsSyncReplication{false};
341+
/**
342+
* Does this DCP Connection support Synchronous Replication (i.e. acking
343+
* Prepares). A connection should support SyncWrites to support
344+
* SyncReplication.
345+
*/
346+
std::atomic<SyncReplication> supportsSyncReplication{SyncReplication::No};
326347

327348
private:
328349

engines/ep/src/dcp/active_stream.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
7878
forceValueCompression(p->isForceValueCompressionEnabled()
7979
? ForceValueCompression::Yes
8080
: ForceValueCompression::No),
81-
syncReplication(p->isSyncReplicationEnabled() ? SyncReplication::Yes
82-
: SyncReplication::No),
81+
syncReplication(p->getSyncReplSupport()),
8382
filter(std::move(f)),
8483
sid(filter.getStreamId()) {
8584
const char* type = "";
@@ -949,7 +948,7 @@ std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
949948
start_seqno_ < static_cast<uint64_t>(item->getPrepareSeqno());
950949

951950
if ((item->getOperation() == queue_op::commit_sync_write) &&
952-
(syncReplication == SyncReplication::Yes) &&
951+
(supportSyncWrites()) &&
953952
(!prepareSeqnoGTRequestedStart ||
954953
sendCommitSyncWriteAs == SendCommitSyncWriteAs::Commit)) {
955954
return std::make_unique<CommitSyncWrite>(opaque_,
@@ -1101,7 +1100,7 @@ void ActiveStream::processItems(OutstandingItemsResult& outstandingItemsResult,
11011100
}
11021101

11031102
bool ActiveStream::shouldProcessItem(const Item& item) {
1104-
if (!item.shouldReplicate(syncReplication == SyncReplication::Yes)) {
1103+
if (!item.shouldReplicate(supportSyncWrites())) {
11051104
return false;
11061105
}
11071106

engines/ep/src/dcp/active_stream.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,11 @@ class ActiveStream : public Stream,
282282
void nextCheckpointItemTask(const LockHolder& streamMutex);
283283

284284
bool supportSyncReplication() const {
285-
return syncReplication == SyncReplication::Yes;
285+
return syncReplication == SyncReplication::SyncReplication;
286+
}
287+
288+
bool supportSyncWrites() const {
289+
return syncReplication != SyncReplication::No;
286290
}
287291

288292
/* Indicates that a backfill has been scheduled and has not yet completed.
@@ -467,7 +471,13 @@ class ActiveStream : public Stream,
467471
/// Should items be forcefully compressed on this stream?
468472
const ForceValueCompression forceValueCompression;
469473

470-
/// Does this stream support synchronous replication?
474+
/// Does this stream support synchronous replication (i.e. acking Prepares)?
475+
/**
476+
* What level of SyncReplication does this stream Support:
477+
* - None
478+
* - SyncWrites: Sending Prepares/Commits/Aborts
479+
* - SyncReplication: SyncWrites + Acking Prepares
480+
*/
471481
const SyncReplication syncReplication;
472482

473483
/**

engines/ep/src/dcp/consumer.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -975,8 +975,9 @@ bool DcpConsumer::handleResponse(const protocol_binary_response_header* resp) {
975975
// the Sync Replication negotiation-key.
976976
if (resp->response.getOpaque() == syncReplNegotiation.opaque) {
977977
syncReplNegotiation.state = SyncReplNegotiation::State::Completed;
978-
supportsSyncReplication.store(resp->response.getStatus() ==
979-
cb::mcbp::Status::Success);
978+
if (resp->response.getStatus() == cb::mcbp::Status::Success) {
979+
supportsSyncReplication.store(SyncReplication::SyncReplication);
980+
}
980981
}
981982
return true;
982983
} else if (opcode == cb::mcbp::ClientOpcode::GetErrorMap) {
@@ -1501,14 +1502,14 @@ ENGINE_ERROR_CODE DcpConsumer::enableExpiryOpcode(
15011502

15021503
ENGINE_ERROR_CODE DcpConsumer::enableSynchronousReplication(
15031504
dcp_message_producers* producers) {
1504-
// enable_synchronous_replication and consumer_name are separated into two
1505+
// enable_sync_writes and consumer_name are separated into two
15051506
// different variables as in the future non-replication consumers may wish
15061507
// to stream prepares and commits.
15071508
switch (syncReplNegotiation.state) {
15081509
case SyncReplNegotiation::State::PendingRequest: {
15091510
uint32_t opaque = ++opaqueCounter;
1510-
ENGINE_ERROR_CODE ret = producers->control(
1511-
opaque, "enable_synchronous_replication", "true");
1511+
ENGINE_ERROR_CODE ret =
1512+
producers->control(opaque, "enable_sync_writes", "true");
15121513
syncReplNegotiation.state = SyncReplNegotiation::State::PendingResponse;
15131514
syncReplNegotiation.opaque = opaque;
15141515
return ret;

engines/ep/src/dcp/dcp-types.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,5 +155,8 @@ enum class MultipleStreamRequests : bool {
155155
No
156156
};
157157

158-
/// Does the stream support synchronous replication?
159-
enum class SyncReplication : bool { Yes, No };
158+
/** Does the stream support synchronous replication (i.e. acking Prepares)?
159+
* A Stream may also support just SyncWrites (receiving Prepares and Commits
160+
* without acking).
161+
*/
162+
enum class SyncReplication : char { No, SyncWrites, SyncReplication };

engines/ep/src/dcp/producer.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -979,13 +979,19 @@ ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque,
979979
}
980980
multipleStreamRequests = MultipleStreamRequests::Yes;
981981
return ENGINE_SUCCESS;
982-
} else if (key == "enable_synchronous_replication") {
982+
} else if (key == "enable_sync_writes") {
983983
if (valueStr == "true") {
984-
supportsSyncReplication = true;
984+
supportsSyncReplication = SyncReplication::SyncWrites;
985+
if (!consumerName.empty()) {
986+
supportsSyncReplication = SyncReplication::SyncReplication;
987+
}
985988
return ENGINE_SUCCESS;
986989
}
987-
} else if (key == "consumer_name") {
990+
} else if (key == "consumer_name" && !valueStr.empty()) {
988991
consumerName = valueStr;
992+
if (supportsSyncReplication == SyncReplication::SyncWrites) {
993+
supportsSyncReplication = SyncReplication::SyncReplication;
994+
}
989995
return ENGINE_SUCCESS;
990996
}
991997

@@ -1269,6 +1275,7 @@ void DcpProducer::addStats(const AddStatFn& add_stat, const void* c) {
12691275
add_stat,
12701276
c);
12711277
addStat("synchronous_replication", isSyncReplicationEnabled(), add_stat, c);
1278+
addStat("synchronous_writes", isSyncWritesEnabled(), add_stat, c);
12721279

12731280
// Possible that the producer has had its streams closed and hence doesn't
12741281
// have a backfill manager anymore.

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3654,7 +3654,7 @@ static uint32_t add_stream_for_consumer(EngineIface* h,
36543654
dcpStepAndExpectControlMsg("supports_hifi_MFU"s);
36553655
dcpStepAndExpectControlMsg("send_stream_end_on_client_close_stream"s);
36563656
dcpStepAndExpectControlMsg("enable_expiry_opcode"s);
3657-
dcpStepAndExpectControlMsg("enable_synchronous_replication"s);
3657+
dcpStepAndExpectControlMsg("enable_sync_writes"s);
36583658
simulateProdRespAtSyncReplNegotiation(h, cookie, producers);
36593659
dcpStepAndExpectControlMsg("consumer_name"s);
36603660

@@ -4345,7 +4345,7 @@ static void drainDcpControl(EngineIface* engine,
43454345
do {
43464346
dcp_step(engine, cookie, producers);
43474347
// The Sync Repl negotiation introduces a blocking step
4348-
if (producers.last_key == "enable_synchronous_replication") {
4348+
if (producers.last_key == "enable_sync_writes") {
43494349
simulateProdRespAtSyncReplNegotiation(engine, cookie, producers);
43504350
}
43514351
} while (producers.last_op == cb::mcbp::ClientOpcode::DcpControl);

engines/ep/tests/mock/mock_dcp_consumer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,14 @@ class MockDcpConsumer: public DcpConsumer {
113113
* Used for simulating a successful Consumer-Producer Sync Repl handshake.
114114
*/
115115
void enableSyncReplication() {
116-
supportsSyncReplication = true;
116+
supportsSyncReplication = SyncReplication::SyncReplication;
117117
}
118118

119119
/**
120120
* Disable SyncRepl for testing
121121
*/
122122
void disableSyncReplication() {
123-
supportsSyncReplication = false;
123+
supportsSyncReplication = SyncReplication::No;
124124
}
125125

126126
/**

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,10 @@ class MockDcpProducer : public DcpProducer {
8484
return enableExpiryOpcode;
8585
}
8686

87-
void setSyncReplication(bool value) {
87+
void setSyncReplication(SyncReplication value) {
8888
supportsSyncReplication = value;
8989
}
90+
9091
/**
9192
* Create the ActiveStreamCheckpointProcessorTask and assign to
9293
* checkpointCreator->task

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ void DurabilityActiveStreamTest::setUp(bool startCheckpointProcessorTask) {
5050
{{"topology", nlohmann::json::array({{active, replica}})}});
5151

5252
// Enable SyncReplication and flow-control (Producer BufferLog)
53-
setupProducer({{"enable_synchronous_replication", "true"},
53+
setupProducer({{"enable_sync_writes", "true"},
5454
{"connection_buffer_size", "52428800"},
5555
{"consumer_name", "test_consumer"}},
5656
startCheckpointProcessorTask);

0 commit comments

Comments
 (0)