Skip to content

Commit 371fc35

Browse files
committed
MB-32573 [SR]: Include VBucket id in DCP seqno_ack/commit/abort
The vBucket was incorrectly missed out from these new Sync Replication DCP messages. Add the vBucketID in, and wire up correcly in ep-engine. Change-Id: I484236d077eea66ef9e0e58e46cbe7c303b74c5c Reviewed-on: http://review.couchbase.org/103557 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 36563b8 commit 371fc35

File tree

11 files changed

+49
-7
lines changed

11 files changed

+49
-7
lines changed

daemon/connection.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2292,6 +2292,7 @@ ENGINE_ERROR_CODE Connection::prepare(uint32_t opaque,
22922292
}
22932293

22942294
ENGINE_ERROR_CODE Connection::seqno_acknowledged(uint32_t opaque,
2295+
Vbid vbucket,
22952296
uint64_t in_memory_seqno,
22962297
uint64_t on_disk_seqno) {
22972298
cb::mcbp::request::DcpSeqnoAcknowledgedPayload extras(in_memory_seqno,
@@ -2301,11 +2302,13 @@ ENGINE_ERROR_CODE Connection::seqno_acknowledged(uint32_t opaque,
23012302
builder.setMagic(cb::mcbp::Magic::ClientRequest);
23022303
builder.setOpcode(cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged);
23032304
builder.setOpaque(opaque);
2305+
builder.setVBucket(vbucket);
23042306
builder.setExtras(extras.getBuffer());
23052307
return add_packet_to_send_pipe(builder.getFrame()->getFrame());
23062308
}
23072309

23082310
ENGINE_ERROR_CODE Connection::commit(uint32_t opaque,
2311+
Vbid vbucket,
23092312
const DocKey& key,
23102313
uint64_t commit_seqno) {
23112314
cb::mcbp::request::DcpCommitPayload extras(0, commit_seqno);
@@ -2317,12 +2320,14 @@ ENGINE_ERROR_CODE Connection::commit(uint32_t opaque,
23172320
builder.setMagic(cb::mcbp::Magic::ClientRequest);
23182321
builder.setOpcode(cb::mcbp::ClientOpcode::DcpCommit);
23192322
builder.setOpaque(opaque);
2323+
builder.setVBucket(vbucket);
23202324
builder.setExtras(extras.getBuffer());
23212325
builder.setKey(cb::const_char_buffer(key));
23222326
return add_packet_to_send_pipe(builder.getFrame()->getFrame());
23232327
}
23242328

23252329
ENGINE_ERROR_CODE Connection::abort(uint32_t opaque,
2330+
Vbid vbucket,
23262331
uint64_t prepared_seqno,
23272332
uint64_t abort_seqno) {
23282333
cb::mcbp::request::DcpAbortPayload extras;
@@ -2333,6 +2338,7 @@ ENGINE_ERROR_CODE Connection::abort(uint32_t opaque,
23332338
builder.setMagic(cb::mcbp::Magic::ClientRequest);
23342339
builder.setOpcode(cb::mcbp::ClientOpcode::DcpAbort);
23352340
builder.setOpaque(opaque);
2341+
builder.setVBucket(vbucket);
23362342
builder.setExtras(extras.getBuffer());
23372343
return add_packet_to_send_pipe(builder.getFrame()->getFrame());
23382344
}

daemon/connection.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -932,14 +932,17 @@ class Connection : public dcp_message_producers {
932932
cb::durability::Requirements durability) override;
933933

934934
ENGINE_ERROR_CODE seqno_acknowledged(uint32_t opaque,
935+
Vbid vbucket,
935936
uint64_t in_memory_seqno,
936937
uint64_t on_disk_seqno) override;
937938

938939
ENGINE_ERROR_CODE commit(uint32_t opaque,
940+
Vbid vbucket,
939941
const DocKey& key,
940942
uint64_t commit_seqno) override;
941943

942944
ENGINE_ERROR_CODE abort(uint32_t opaque,
945+
Vbid vbucket,
943946
uint64_t prepared_seqno,
944947
uint64_t abort_seqno) override;
945948

engines/ep/management/mc_bin_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,13 @@ def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0, collection=None
194194
vbucketId=self.vbucketId, collection=collection)
195195

196196
def _sendAltCmd(self, cmd, flex, key, val, opaque, extras='', cas=0,
197-
dtype=0, vbucketId=0, collection=None):
197+
dtype=0, collection=None):
198198
"""Send a request in the alternative format supporing flex framing extras"""
199199
if collection:
200200
key = self._encodeCollectionId(key, collection)
201201

202202
msg = struct.pack(ALT_REQ_PKT_FMT, ALT_REQ_MAGIC_BYTE, cmd, len(flex),
203-
len(key), len(extras), dtype, vbucketId,
203+
len(key), len(extras), dtype, self.vbucketId,
204204
len(flex) + len(key) + len(extras) + len(val),
205205
opaque, cas)
206206
self.s.sendall(msg + flex + extras + key + val)

engines/ep/src/dcp/active_stream.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -896,8 +896,11 @@ std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
896896
// as a Commit message - otherwise it's just sent as a Mutation.
897897
if ((item->getOperation() == queue_op::commit_sync_write) &&
898898
(syncReplication == SyncReplication::Yes)) {
899-
return std::make_unique<CommitSyncWrite>(
900-
opaque_, 0, item->getBySeqno(), item->getKey());
899+
return std::make_unique<CommitSyncWrite>(opaque_,
900+
item->getVBucketId(),
901+
/*preparedSeqno*/ 0,
902+
item->getBySeqno(),
903+
item->getKey());
901904
}
902905

903906
if (item->getOperation() != queue_op::system_event) {

engines/ep/src/dcp/consumer.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,7 @@ ENGINE_ERROR_CODE DcpConsumer::step(struct dcp_message_producers* producers) {
891891
case DcpResponse::Event::SeqnoAcknowledgement: {
892892
auto* ack = static_cast<SeqnoAcknowledgement*>(resp.get());
893893
ret = producers->seqno_acknowledged(ack->getOpaque(),
894+
ack->getVbucket(),
894895
ack->getInMemorySeqno(),
895896
ack->getOnDiskSeqno());
896897
break;
@@ -1663,6 +1664,7 @@ ENGINE_ERROR_CODE DcpConsumer::commit(uint32_t opaque,
16631664
try {
16641665
err = stream->messageReceived(
16651666
std::make_unique<CommitSyncWrite>(opaque,
1667+
vbucket,
16661668
/*prepared_seqno*/ 0,
16671669
commit_seqno,
16681670
key));

engines/ep/src/dcp/passive_stream.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,10 @@ ENGINE_ERROR_CODE PassiveStream::processPrepare(
620620
LockHolder lh(streamMutex);
621621
// @todo-durability add in the correct on-disk seqno.
622622
pushToReadyQ(std::make_unique<SeqnoAcknowledgement>(
623-
opaque_, prepare->getItem()->getBySeqno(), 0));
623+
opaque_,
624+
prepare->getVBucket(),
625+
prepare->getItem()->getBySeqno(),
626+
0));
624627
}
625628
notifyStreamReady();
626629

engines/ep/src/dcp/producer.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -635,8 +635,10 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
635635
}
636636
case DcpResponse::Event::Commit: {
637637
CommitSyncWrite* csr = static_cast<CommitSyncWrite*>(resp.get());
638-
ret = producers->commit(
639-
csr->getOpaque(), csr->getKey(), csr->getCommitSeqno());
638+
ret = producers->commit(csr->getOpaque(),
639+
csr->getVbucket(),
640+
csr->getKey(),
641+
csr->getCommitSeqno());
640642
break;
641643
}
642644

engines/ep/src/dcp/response.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,13 @@ std::ostream& operator<<(std::ostream& os, const DcpResponse& r) {
123123
}
124124

125125
CommitSyncWrite::CommitSyncWrite(uint32_t opaque,
126+
Vbid vbucket,
126127
uint64_t preparedSeqno,
127128
uint64_t commitSeqno,
128129
const DocKey& key)
129130
/// @todo-durability: Remove key, use pending seqno to identify instead.
130131
: DcpResponse(Event::Commit, opaque, cb::mcbp::DcpStreamId{}),
132+
vbucket(vbucket),
131133
key(key),
132134
payload(preparedSeqno, commitSeqno) {
133135
}

engines/ep/src/dcp/response.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,10 +580,12 @@ class MutationConsumerMessage : public MutationResponse {
580580
class SeqnoAcknowledgement : public DcpResponse {
581581
public:
582582
SeqnoAcknowledgement(uint32_t opaque,
583+
Vbid vbucket,
583584
uint64_t inMemorySeqno,
584585
uint64_t onDiskSeqno)
585586
: DcpResponse(
586587
Event::SeqnoAcknowledgement, opaque, cb::mcbp::DcpStreamId{}),
588+
vbucket(vbucket),
587589
payload(inMemorySeqno, onDiskSeqno) {
588590
}
589591

@@ -592,6 +594,10 @@ class SeqnoAcknowledgement : public DcpResponse {
592594
sizeof(cb::mcbp::request::DcpSeqnoAcknowledgedPayload);
593595
}
594596

597+
Vbid getVbucket() const {
598+
return vbucket;
599+
}
600+
595601
uint64_t getInMemorySeqno() const {
596602
return payload.getInMemorySeqno();
597603
}
@@ -601,6 +607,7 @@ class SeqnoAcknowledgement : public DcpResponse {
601607
}
602608

603609
private:
610+
Vbid vbucket;
604611
cb::mcbp::request::DcpSeqnoAcknowledgedPayload payload;
605612
};
606613

@@ -610,6 +617,7 @@ class SeqnoAcknowledgement : public DcpResponse {
610617
class CommitSyncWrite : public DcpResponse {
611618
public:
612619
CommitSyncWrite(uint32_t opaque,
620+
Vbid vbucket,
613621
uint64_t preparedSeqno,
614622
uint64_t commitSeqno,
615623
const DocKey& key);
@@ -624,6 +632,10 @@ class CommitSyncWrite : public DcpResponse {
624632
return key;
625633
}
626634

635+
Vbid getVbucket() const {
636+
return vbucket;
637+
}
638+
627639
uint64_t getPreparedSeqno() const {
628640
return payload.getPreparedSeqno();
629641
}
@@ -637,6 +649,7 @@ class CommitSyncWrite : public DcpResponse {
637649
sizeof(cb::mcbp::request::DcpCommitPayload);
638650

639651
private:
652+
Vbid vbucket;
640653
StoredDocKey key;
641654
cb::mcbp::request::DcpCommitPayload payload;
642655
};

engines/ep/tests/mock/mock_dcp.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,16 +139,19 @@ class MockDcpMessageProducers : public dcp_message_producers {
139139
cb::durability::Requirements durability) override;
140140

141141
ENGINE_ERROR_CODE seqno_acknowledged(uint32_t opaque,
142+
Vbid vbucket,
142143
uint64_t in_memory_seqno,
143144
uint64_t on_disk_seqno) override {
144145
return ENGINE_ENOTSUP;
145146
}
146147
ENGINE_ERROR_CODE commit(uint32_t opaque,
148+
Vbid vbucket,
147149
const DocKey& key,
148150
uint64_t commit_seqno) override {
149151
return ENGINE_ENOTSUP;
150152
}
151153
ENGINE_ERROR_CODE abort(uint32_t opaque,
154+
Vbid vbucket,
152155
uint64_t prepared_seqno,
153156
uint64_t abort_seqno) override {
154157
return ENGINE_ENOTSUP;

0 commit comments

Comments
 (0)