Skip to content

Commit f8a3345

Browse files
committed
Merge branch 'neo' into master
Commits: dc531f2 MB-47139 magma-kvstore: Adapt to SetMaxOpenFiles API change ac0da7c MB-51513: Merge branch 'couchbase/cheshire-cat' into 'couchbase/neo' cc160cb MB-51513: Revert "MB-34280: Set max DCP name to 200 characters" 5cbcd28 MB-51513: Merge 'couchbase/cheshire-cat' into 'couchbase/neo' 09f70f6 MB-51414: Simplify handling of snapshot ranges in processItems() db53ff0 MB-50874: Merge branch 'mad-hatter' into cheshire-cat Change-Id: Ic6195a595038776f2c7014f9c1241b96f12bfa90
2 parents 285bae8 + dc531f2 commit f8a3345

File tree

14 files changed

+62
-191
lines changed

14 files changed

+62
-191
lines changed

daemon/mcbp_validators.cc

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -576,14 +576,6 @@ static Status dcp_open_validator(Cookie& cookie) {
576576
"consumer_name must be a string");
577577
return Status::Einval;
578578
}
579-
auto nm = kv.value().get<std::string>();
580-
if (nm.size() > cb::limits::MaxDcpName) {
581-
cookie.setErrorContext(
582-
"consumer_name limit is " +
583-
std::to_string(cb::limits::MaxDcpName) +
584-
" characters");
585-
return Status::Einval;
586-
}
587579
} else {
588580
cookie.setErrorContext("Unsupported JSON property " +
589581
kv.key());
@@ -595,14 +587,6 @@ static Status dcp_open_validator(Cookie& cookie) {
595587
return Status::Einval;
596588
}
597589
}
598-
599-
if (size_t(cookie.getHeader().getKeylen()) > cb::limits::MaxDcpName) {
600-
cookie.setErrorContext("Dcp name limit is " +
601-
std::to_string(cb::limits::MaxDcpName) +
602-
" characters");
603-
return Status::Einval;
604-
}
605-
606590
return verify_common_dcp_restrictions(cookie);
607591
}
608592

docs/dcp/documentation/commands/open-connection.md

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,7 @@ Flags are specified as a bitmask in network byte order with the following bits d
4343

4444
When setting the Producer or Consumer flag the sender is telling the server what type of connection will be created. For example, if the Producer type is set then the sender of the Open Connection message will be a Consumer.
4545

46-
The connection name is specified using the key field. When selecting a name the
47-
only requirement is that the name take up no more space than 200 bytes. It is
48-
recommended that the name uses that ASCII character set and uses alpha-numeric
49-
characters. It is highly advantageous for improved supportability Couchbase
50-
Server that the connection names embed as much contextual information as
51-
possible from the client.
46+
The connection name is specified using the key field. When selecting a name the only requirement is that the name take up no more space than 256 bytes. It is recommended that the name uses that ASCII character set and uses alpha-numeric characters. It is highly advantageous for improved supportability Couchbase Server that the connection names embed as much contextual information as possible from the client.
5247

5348
As of version 6.5, the _value_ can be used to specify additional information
5449
about the connection to be opened. If non-empty, the value must be a JSON

engines/ep/src/checkpoint_manager.cc

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -971,13 +971,10 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
971971
if (enteredNewCp) {
972972
const auto& checkpoint = **cursor.getCheckpoint();
973973
result.checkpointType = checkpoint.getCheckpointType();
974-
result.ranges.push_back(
975-
{{checkpoint.getSnapshotStartSeqno(),
976-
checkpoint.getSnapshotEndSeqno()},
977-
checkpoint.getHighCompletedSeqno(),
978-
checkpoint.getHighPreparedSeqno(),
979-
checkpoint.getState() ==
980-
checkpoint_state::CHECKPOINT_CLOSED});
974+
result.ranges.push_back({{checkpoint.getSnapshotStartSeqno(),
975+
checkpoint.getSnapshotEndSeqno()},
976+
checkpoint.getHighCompletedSeqno(),
977+
checkpoint.getHighPreparedSeqno()});
981978
enteredNewCp = false;
982979

983980
// As we cross into new checkpoints, update the maxDeletedRevSeqno

engines/ep/src/checkpoint_manager.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ struct CheckpointSnapshotRange {
6060
// HPS that should be flushed when the entire range has been persisted.
6161
// This is the seqno of the latest prepare in this checkpoint.
6262
std::optional<uint64_t> highPreparedSeqno = {};
63-
// Is the checkpoint closed
64-
bool isClosed = false;
6563
};
6664

6765
/**

engines/ep/src/connhandler.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,6 @@ struct ConnCounter {
8181

8282
class ConnHandler : public DcpConnHandlerIface {
8383
public:
84-
/// The maximum length of a DCP stat name
85-
static constexpr size_t MaxDcpStatNameLength = 47;
86-
8784
enum class PausedReason : uint8_t {
8885
BufferLogFull,
8986
Initializing,

engines/ep/src/dcp/active_stream.cc

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,27 +1180,12 @@ void ActiveStream::processItems(
11801180
if (qi->getOperation() == queue_op::checkpoint_end) {
11811181
// At the end of each checkpoint remove its snapshot range, so
11821182
// we don't use it to set nextSnapStart for the next checkpoint.
1183-
// If statement ensures that we have indeed completed the
1184-
// snapshot range by checking that checkpoint_end seqno is
1185-
// greater than the end of the range. Also account for an edge
1186-
// case where we have closed checkpoint who's checkpoint_end is
1187-
// equal to snapshot end seqno. This can happen when snapshots
1188-
// partially received by a replica which is then promoted to
1189-
// active
1183+
// We can just erase the range at the head of ranges as every
1184+
// time as CheckpointManager::getItemsForCursor() will always
1185+
// ensure there is a snapshot range for if there is a
1186+
// queue_op::checkpoint_end in the items it returns.
11901187
auto rangeItr = outstandingItemsResult.ranges.begin();
1191-
auto snapEnd = static_cast<int64_t>(rangeItr->getEnd());
1192-
if (snapEnd < qi->getBySeqno() ||
1193-
(rangeItr->isClosed && snapEnd == qi->getBySeqno())) {
1194-
outstandingItemsResult.ranges.erase(rangeItr);
1195-
} else {
1196-
log(spdlog::level::level_enum::err,
1197-
"ActiveStream::processItems checkpoint_end:{} "
1198-
"should not be in the current snapshot range "
1199-
"s:{}->e:{}",
1200-
qi->getBySeqno(),
1201-
rangeItr->getStart(),
1202-
rangeItr->getEnd());
1203-
}
1188+
outstandingItemsResult.ranges.erase(rangeItr);
12041189
}
12051190

12061191
if (qi->getOperation() == queue_op::checkpoint_start) {

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -597,10 +597,6 @@ MagmaKVStore::MagmaKVStore(MagmaKVStoreConfig& configuration)
597597
configuration.getMagmaMemoryQuotaLowWaterMarkRatio();
598598

599599
configuration.setStore(this);
600-
{
601-
cb::UseArenaMallocSecondaryDomain domainGuard;
602-
magma::SetMaxOpenFiles(configuration.getMaxFileDescriptors());
603-
}
604600

605601
// To save memory only allocate counters for the number of vBuckets that
606602
// this shard will have to deal with
@@ -665,6 +661,7 @@ MagmaKVStore::MagmaKVStore(MagmaKVStoreConfig& configuration)
665661

666662
magma = std::make_unique<MagmaMemoryTrackingProxy>(configuration.magmaCfg);
667663

664+
magma->SetMaxOpenFiles(configuration.getMaxFileDescriptors());
668665
setMaxDataSize(configuration.getBucketQuota());
669666
setMagmaFragmentationPercentage(
670667
configuration.getMagmaFragmentationPercentage());

engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,11 @@ void MagmaMemoryTrackingProxy::EnableBlockCache(bool enable) {
363363
magma->EnableBlockCache(enable);
364364
}
365365

366+
void MagmaMemoryTrackingProxy::SetMaxOpenFiles(size_t n, bool blocking) {
367+
cb::UseArenaMallocSecondaryDomain domainGuard;
368+
magma->SetMaxOpenFiles(n, blocking);
369+
}
370+
366371
void MagmaMemoryTrackingProxy::SetMemoryQuota(const size_t quota) {
367372
cb::UseArenaMallocSecondaryDomain domainGuard;
368373
magma->SetMemoryQuota(quota);

engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ class MagmaMemoryTrackingProxy {
176176
magma::Status Rollback(const magma::Magma::KVStoreID kvID,
177177
magma::Magma::SeqNo rollbackSeqno,
178178
magma::Magma::RollbackCallback callback);
179+
void SetMaxOpenFiles(size_t n, bool blocking = false);
179180
void SetFragmentationRatio(double fragRatio);
180181
void EnableBlockCache(bool enable);
181182
void SetMemoryQuota(const size_t quota);

engines/ep/tests/module_tests/collections/collections_seqno_advance.cc

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,6 @@ class CollectionsSeqnoAdvanced
131131
}
132132

133133
void setupOneOperation(InputType type, ForStream fs) {
134-
switch (fs) {
135-
case ForStream::Yes: {
136-
queueOperation(currentSeqno, type, myCollection);
137-
break;
138-
}
139-
case ForStream::No: {
140-
queueOperation(currentSeqno, type, CollectionEntry::vegetable);
141-
break;
142-
}
143-
}
144-
145134
switch (type) {
146135
case InputType::Mutation:
147136
case InputType::Prepare:
@@ -152,71 +141,89 @@ class CollectionsSeqnoAdvanced
152141
case InputType::CPEnd:
153142
break;
154143
}
144+
145+
switch (fs) {
146+
case ForStream::Yes: {
147+
queueOperation(type, myCollection);
148+
break;
149+
}
150+
case ForStream::No: {
151+
queueOperation(type, CollectionEntry::vegetable);
152+
break;
153+
}
154+
}
155155
}
156156

157-
void queueOperation(uint64_t seqno, InputType type, CollectionID cid) {
157+
void queueOperation(InputType type, CollectionID cid) {
158158
switch (type) {
159159
case InputType::Mutation: {
160-
queueMutation(seqno, cid);
160+
queueMutation(cid);
161161
break;
162162
}
163163
case InputType::Prepare: {
164-
queuePrepare(seqno, cid);
164+
queuePrepare(cid);
165165
break;
166166
}
167167
case InputType::CPEndStart: {
168-
queueCPEnd(seqno);
169-
queueCPStart(seqno);
168+
queueCPEnd();
169+
queueCPStart();
170170
break;
171171
}
172172
case InputType::CPStart: {
173-
queueCPStart(seqno);
173+
queueCPStart();
174174
break;
175175
}
176176
case InputType::CPEnd: {
177-
queueCPEnd(seqno);
177+
queueCPEnd();
178178
break;
179179
}
180180
}
181181
}
182182

183-
void queueMutation(uint64_t seqno, CollectionID cid) {
183+
void queueMutation(CollectionID cid) {
184184
auto item = makeCommittedItem(
185-
makeStoredDocKey(std::to_string(seqno), cid), "value");
186-
item->setBySeqno(seqno);
185+
makeStoredDocKey(std::to_string(currentSeqno), cid), "value");
186+
item->setBySeqno(currentSeqno);
187187
input.items.emplace_back(item);
188188
}
189189

190-
void queuePrepare(uint64_t seqno, CollectionID cid) {
190+
void queuePrepare(CollectionID cid) {
191191
auto item = makePendingItem(
192-
makeStoredDocKey(std::to_string(seqno), cid), "value");
193-
item->setBySeqno(seqno);
192+
makeStoredDocKey(std::to_string(currentSeqno), cid), "value");
193+
item->setBySeqno(currentSeqno);
194194
input.items.emplace_back(item);
195195
}
196196

197-
void queueCPStart(uint64_t seqno) {
197+
void queueCPStart() {
198198
queue_op checkpoint_op = queue_op::checkpoint_start;
199199
StoredDocKey key(to_string(checkpoint_op), CollectionID::System);
200-
queued_item qi(new Item(key, vbid, checkpoint_op, 1, seqno));
201-
202-
input.ranges.push_back(
203-
{{static_cast<uint64_t>(input.items.back()->getBySeqno()),
204-
seqno},
205-
{},
206-
{}});
207-
200+
queued_item qi(new Item(key, vbid, checkpoint_op, 1, currentSeqno + 1));
208201
input.items.emplace_back(qi);
202+
checkStart = currentSeqno + 1;
203+
input.ranges.push_back({{currentSeqno + 1, currentSeqno + 1}, {}, {}});
209204
}
210205

211-
void queueCPEnd(uint64_t seqno) {
206+
void queueCPEnd() {
207+
auto endSeqno = currentSeqno;
208+
if (!input.items.empty()) {
209+
endSeqno = static_cast<uint64_t>(input.items.back()->getBySeqno());
210+
}
211+
if (input.ranges.empty()) {
212+
input.ranges.push_back({{1, endSeqno}, {}, {}});
213+
} else {
214+
input.ranges.back().range.setEnd(endSeqno);
215+
}
216+
212217
queue_op checkpoint_op = queue_op::checkpoint_end;
213218
StoredDocKey key(to_string(checkpoint_op), CollectionID::System);
214-
queued_item qi(new Item(key, vbid, checkpoint_op, 1, seqno));
219+
queued_item qi(new Item(key, vbid, checkpoint_op, 1, currentSeqno + 1));
215220
input.items.emplace_back(qi);
216221
}
217222

218223
// Starting seqno, each operation will increment this
219224
uint64_t currentSeqno{1};
225+
uint64_t checkStart{1};
226+
uint64_t checkEnd{1};
220227

221228
std::shared_ptr<MockDcpProducer> producer;
222229
std::shared_ptr<MockActiveStream> stream;

0 commit comments

Comments
 (0)