Skip to content

Commit 97cc7b5

Browse files
gormanbevergreen
authored andcommitted
SERVER-42723 New shard with new database can be ignored by change streams
1 parent 1daf063 commit 97cc7b5

14 files changed

+259
-91
lines changed

buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ selector:
2424
- jstests/sharding/prepared_txn_metadata_refresh.js
2525
# Enable when 4.4 becomes last stable
2626
- jstests/sharding/bulk_insert.js
27+
- jstests/sharding/change_streams_new_shard_new_database.js
2728
- jstests/sharding/clear_jumbo.js
2829
- jstests/sharding/comment_field.js
2930
- jstests/sharding/covered_shard_key_indexes.js

jstests/change_streams/required_as_first_stage.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ assertErrorCode(
2020
40602);
2121

2222
let error = assert.throws(() => coll.aggregate([{$sort: {x: 1}}, {$changeStream: {}}]));
23-
assert.contains(error.code, [40602, 50988], "Unexpected error: " + tojson(error));
23+
assert.contains(error.code, [40602], "Unexpected error: " + tojson(error));
2424

2525
error = assert.throws(
2626
() => coll.aggregate([{$sort: {x: 1}}, {$changeStream: {}}], {allowDiskUse: true}));
27-
assert.contains(error.code, [40602, 50988], "Unexpected error: " + tojson(error));
27+
assert.contains(error.code, [40602], "Unexpected error: " + tojson(error));
2828

2929
error = assert.throws(() => coll.aggregate([{$group: {_id: "$x"}}, {$changeStream: {}}]));
30-
assert.contains(error.code, [40602, 50988], "Unexpected error: " + tojson(error));
30+
assert.contains(error.code, [40602], "Unexpected error: " + tojson(error));
3131

3232
// This one has a different error code because of conflicting host type requirements: the $group
3333
// needs to merge on a shard, but the $changeStream needs to merge on mongos. This doesn't

jstests/libs/override_methods/continuous_stepdown.js

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@ ContinuousStepdown.configure = function(stepdownOptions,
176176
* Overrides the ReplSetTest constructor to start the continuous primary stepdown thread.
177177
*/
178178
ReplSetTest = function ReplSetTestWithContinuousPrimaryStepdown() {
179+
// Preserve the original set of nodeOptions passed to the constructor.
180+
const origNodeOpts = Object.assign({}, (arguments[0] && arguments[0].nodeOptions) || {});
181+
179182
// Construct the original object
180183
originalReplSetTest.apply(this, arguments);
181184

@@ -185,24 +188,31 @@ ContinuousStepdown.configure = function(stepdownOptions,
185188
const _originalAwaitLastOpCommitted = this.awaitLastOpCommitted;
186189

187190
/**
188-
* Overrides startSet call to increase logging verbosity.
191+
* Overrides startSet call to increase logging verbosity. Ensure that we only override the
192+
* 'logComponentVerbosity' server parameter, but retain any other parameters that were
193+
* supplied during ReplSetTest construction.
189194
*/
190195
this.startSet = function() {
191-
let options = arguments[0] || {};
192-
193-
if (typeof (options.setParameter) === "string") {
194-
var eqIdx = options.setParameter.indexOf("=");
195-
if (eqIdx != -1) {
196-
var param = options.setParameter.substring(0, eqIdx);
197-
var value = options.setParameter.substring(eqIdx + 1);
198-
options.setParameter = {};
199-
options.setParameter[param] = value;
196+
// Helper function to convert a string representation of setParameter to object form.
197+
function setParamToObj(setParam) {
198+
if (typeof (setParam) === "string") {
199+
var eqIdx = setParam.indexOf("=");
200+
if (eqIdx != -1) {
201+
var param = setParam.substring(0, eqIdx);
202+
var value = setParam.substring(eqIdx + 1);
203+
return {[param]: value};
204+
}
200205
}
206+
return Object.assign({}, setParam || {});
201207
}
208+
209+
const options = arguments[0] || {};
210+
211+
options.setParameter = Object.assign(setParamToObj(origNodeOpts.setParameter),
212+
setParamToObj(options.setParameter),
213+
{logComponentVerbosity: verbositySetting});
202214
arguments[0] = options;
203215

204-
options.setParameter = options.setParameter || {};
205-
options.setParameter.logComponentVerbosity = verbositySetting;
206216
return _originalStartSetFn.apply(this, arguments);
207217
};
208218

jstests/sharding/change_stream_show_migration_events.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Tests the undocumented 'showChunkMigrations' option for change streams.
1+
// Tests the undocumented 'showMigrationEvents' option for change streams.
22
//
33
// This test is connecting directly to a shard, and change streams require the getMore command.
44
// @tags: [requires_find_command, uses_change_streams]
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/**
2+
* Tests that existing whole-cluster, whole-db and single-collection $changeStreams correctly pick
3+
* up events on a newly-added shard when a new unsharded collection is created on it. Exercises the
4+
* fix for SERVER-42723.
5+
* Tagging as 'requires_find_command' to ensure that this test is not run in the legacy protocol
6+
* passthroughs. Legacy getMore fails in cases where it is run on a database or collection which
7+
* does not yet exist.
8+
* @tags: [uses_change_streams, requires_sharding, requires_find_command]
9+
*/
10+
(function() {
11+
12+
"use strict";
13+
14+
const rsNodeOptions = {
15+
setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
16+
};
17+
const st =
18+
new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});
19+
20+
// We require one 'test' database and a second 'other' database.
21+
const oldShardDB = st.s.getDB(jsTestName() + "_other");
22+
const newShardDB = st.s.getDB(jsTestName());
23+
24+
const configDB = st.s.getDB("config");
25+
const adminDB = st.s.getDB("admin");
26+
27+
const oldShardColl = oldShardDB.coll;
28+
const newShardColl = newShardDB.test;
29+
30+
// Helper function to add a new ReplSetTest shard into the cluster.
31+
function addShardToCluster(shardName) {
32+
const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions});
33+
replTest.startSet({shardsvr: ""});
34+
replTest.initiate();
35+
assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName}));
36+
return replTest;
37+
}
38+
39+
// Helper function to confirm that a stream sees an expected sequence of documents.
40+
function assertAllEventsObserved(changeStream, expectedDocs) {
41+
for (let expectedDoc of expectedDocs) {
42+
assert.soon(() => changeStream.hasNext());
43+
const nextEvent = changeStream.next();
44+
assert.docEq(nextEvent.fullDocument, expectedDoc);
45+
}
46+
}
47+
48+
// Open a whole-db change stream on the as yet non-existent database.
49+
const wholeDBCS = newShardDB.watch();
50+
51+
// Open a single-collection change stream on a namespace within the non-existent database.
52+
const singleCollCS = newShardColl.watch();
53+
54+
// Open a whole-cluster stream on the deployment.
55+
const wholeClusterCS = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]);
56+
57+
// Insert some data into the 'other' database on the only existing shard. This should ensure that
58+
// the primary shard of the test database will be created on the second shard, after it is added.
59+
const insertedDocs = Array.from({length: 20}, (_, i) => ({_id: i}));
60+
assert.commandWorked(oldShardColl.insert(insertedDocs));
61+
62+
// Verify that the whole-cluster stream sees all these events.
63+
assertAllEventsObserved(wholeClusterCS, insertedDocs);
64+
65+
// Verify that the other two streams did not see any of the insertions on the 'other' collection.
66+
for (let csCursor of [wholeDBCS, singleCollCS]) {
67+
assert(!csCursor.hasNext());
68+
}
69+
70+
// Now add a new shard into the cluster...
71+
const newShard1 = addShardToCluster("newShard1");
72+
73+
// ... create a new database and collection, and verify that they were placed on the new shard....
74+
assert.commandWorked(newShardDB.runCommand({create: newShardColl.getName()}));
75+
assert(configDB.databases.findOne({_id: newShardDB.getName(), primary: "newShard1"}));
76+
77+
// ... insert some documents into the new, unsharded collection on the new shard...
78+
assert.commandWorked(newShardColl.insert(insertedDocs));
79+
80+
// ... and confirm that all the pre-existing streams see all of these events.
81+
for (let csCursor of [singleCollCS, wholeDBCS, wholeClusterCS]) {
82+
assertAllEventsObserved(csCursor, insertedDocs);
83+
}
84+
85+
// Stop the new shard manually since the ShardingTest doesn't know anything about it.
86+
st.stop();
87+
newShard1.stopSet();
88+
})();

jstests/sharding/change_streams_unsharded_becomes_sharded.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,7 @@ function testUnshardedBecomesSharded(collToWatch) {
9797
];
9898

9999
// Verify that the cursor on the original shard is still valid and sees new inserted
100-
// documents. The 'documentKey' field should now include the shard key, even before a
101-
// 'kNewShardDetected' operation has been generated by the migration of a chunk to a new
102-
// shard.
100+
// documents. The 'documentKey' field should now include the shard key.
103101
assert.commandWorked(mongosColl.insert({_id: 1, x: 1}));
104102
assert.commandWorked(mongosCollOther.insert({_id: 1, y: 1}));
105103
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]});

src/mongo/db/pipeline/document_source_change_stream.cpp

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,7 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac
244244

245245
BSONObj DocumentSourceChangeStream::buildMatchFilter(
246246
const boost::intrusive_ptr<ExpressionContext>& expCtx,
247-
Timestamp startFrom,
248-
bool startFromInclusive,
247+
Timestamp startFromInclusive,
249248
bool showMigrationEvents) {
250249
auto nss = expCtx->ns;
251250

@@ -297,6 +296,11 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
297296
// 2.1) Normal CRUD ops.
298297
auto normalOpTypeMatch = BSON("op" << NE << "n");
299298

299+
// TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility
300+
// with 4.2, even though we no longer rely on them to detect new shards. We may wish to remove
301+
// this mechanism in 4.6, or retain it for future cases where a change stream is targeted to a
302+
// subset of shards. See SERVER-44039 for details.
303+
300304
// 2.2) A chunk gets migrated to a new shard that doesn't have any chunks.
301305
auto chunkMigratedNewShardMatch = BSON("op"
302306
<< "n"
@@ -326,7 +330,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
326330
// Only include CRUD operations tagged "fromMigrate" when the "showMigrationEvents" option is
327331
// set - exempt all other operations and commands with that tag. Include the resume token, if
328332
// resuming, so we can verify it was still present in the oplog.
329-
return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom)
333+
return BSON("$and" << BSON_ARRAY(BSON("ts" << GTE << startFromInclusive)
330334
<< BSON(OR(opMatch, commandAndApplyOpsMatch))));
331335
}
332336

@@ -388,6 +392,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
388392
}
389393
}
390394

395+
// If we do not have a 'resumeAfter' starting point, check for 'startAtOperationTime'.
391396
if (auto startAtOperationTime = spec.getStartAtOperationTime()) {
392397
uassert(40674,
393398
"Only one type of resume option is allowed, but multiple were found.",
@@ -396,33 +401,37 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
396401
resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom);
397402
}
398403

399-
// There might not be a starting point if we're on mongos, otherwise we should either have a
400-
// 'resumeAfter' starting point, or should start from the latest majority committed operation.
404+
// We can only run on a replica set, or through mongoS. Confirm that this is the case.
401405
auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
402406
uassert(
403407
40573,
404408
"The $changeStream stage is only supported on replica sets",
405409
expCtx->inMongos ||
406410
(replCoord &&
407411
replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet));
408-
if (!startFrom && !expCtx->inMongos) {
409-
startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp();
412+
413+
// If we do not have an explicit starting point, we should start from the latest majority
414+
// committed operation. If we are on mongoS and do not have a starting point, set it to the
415+
// current clusterTime so that all shards start in sync. We always start one tick beyond the
416+
// most recent operation, to ensure that the stream does not return it.
417+
if (!startFrom) {
418+
const auto currentTime = !expCtx->inMongos
419+
? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()}
420+
: LogicalClock::get(expCtx->opCtx)->getClusterTime();
421+
startFrom = currentTime.addTicks(1).asTimestamp();
410422
}
411423

412-
if (startFrom) {
413-
const bool startFromInclusive = (resumeStage != nullptr);
414-
stages.push_back(DocumentSourceOplogMatch::create(
415-
DocumentSourceChangeStream::buildMatchFilter(
416-
expCtx, *startFrom, startFromInclusive, showMigrationEvents),
417-
expCtx));
418-
419-
// If we haven't already populated the initial PBRT, then we are starting from a specific
420-
// timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
421-
if (expCtx->initialPostBatchResumeToken.isEmpty()) {
422-
Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)};
423-
expCtx->initialPostBatchResumeToken =
424-
ResumeToken::makeHighWaterMarkToken(startTime).toDocument().toBson();
425-
}
424+
// We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies
425+
// upon the fact that it is always the first stage in the pipeline.
426+
stages.push_back(DocumentSourceOplogMatch::create(
427+
DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, showMigrationEvents),
428+
expCtx));
429+
430+
// If we haven't already populated the initial PBRT, then we are starting from a specific
431+
// timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
432+
if (expCtx->initialPostBatchResumeToken.isEmpty()) {
433+
expCtx->initialPostBatchResumeToken =
434+
ResumeToken::makeHighWaterMarkToken(*startFrom).toDocument().toBson();
426435
}
427436

428437
// Obtain the current FCV and use it to create the DocumentSourceChangeStreamTransform stage.
@@ -516,12 +525,14 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
516525
(expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS()));
517526

518527
// Prevent $changeStream from running on internal databases. A stream may run against the
519-
// 'admin' database iff 'allChangesForCluster' is true.
528+
// 'admin' database iff 'allChangesForCluster' is true. A stream may run against the 'config'
529+
// database iff 'allowToRunOnConfigDB' is true.
530+
const bool isNotBannedInternalDB =
531+
!expCtx->ns.isLocal() && (!expCtx->ns.isConfigDB() || spec.getAllowToRunOnConfigDB());
520532
uassert(ErrorCodes::InvalidNamespace,
521533
str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db()
522534
<< " database",
523-
expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster()
524-
: (!expCtx->ns.isLocal() && !expCtx->ns.isConfigDB()));
535+
expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() : isNotBannedInternalDB);
525536

526537
// Prevent $changeStream from running on internal collections in any database.
527538
uassert(ErrorCodes::InvalidNamespace,

src/mongo/db/pipeline/document_source_change_stream.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ class DocumentSourceChangeStream final {
166166
*/
167167
static BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
168168
Timestamp startFrom,
169-
bool startFromInclusive,
170169
bool showMigrationEvents);
171170

172171
/**

src/mongo/db/pipeline/document_source_change_stream.idl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,11 @@ structs:
109109
deletes may appear that do not reflect actual deletions or insertions
110110
of data. Instead they reflect this data moving from one shard to
111111
another.
112+
allowToRunOnConfigDB:
113+
cpp_name: allowToRunOnConfigDB
114+
type: bool
115+
default: false
116+
description: A flag indicating whether the change stream may be opened on the
117+
'config' database, which is usually banned. This flag is used
118+
internally to allow mongoS to open a stream on 'config.shards', in
119+
order to monitor for the addition of new shards to the cluster.

src/mongo/db/pipeline/document_source_change_stream_transform.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -362,14 +362,11 @@ Value DocumentSourceChangeStreamTransform::serialize(
362362
changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAfterFieldName].missing()) {
363363
MutableDocument newChangeStreamOptions(changeStreamOptions);
364364

365-
// Use the current cluster time plus 1 tick since the oplog query will include all
366-
// operations/commands equal to or greater than the 'startAtOperationTime' timestamp. In
367-
// particular, avoid including the last operation that went through mongos in an attempt to
368-
// match the behavior of a replica set more closely.
369-
auto clusterTime = LogicalClock::get(pExpCtx->opCtx)->getClusterTime();
370-
clusterTime.addTicks(1);
371-
newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] =
372-
Value(clusterTime.asTimestamp());
365+
// Configure the serialized $changeStream to start from the initial high-watermark
366+
// postBatchResumeToken which we generated while parsing the $changeStream pipeline.
367+
invariant(!pExpCtx->initialPostBatchResumeToken.isEmpty());
368+
newChangeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] =
369+
Value(pExpCtx->initialPostBatchResumeToken);
373370
changeStreamOptions = newChangeStreamOptions.freeze();
374371
}
375372
return Value(Document{{getSourceName(), changeStreamOptions}});

0 commit comments

Comments
 (0)