Skip to content

Commit f4e672b

Browse files
authored
fix(shell-api): .watch() signature and behavior fixes MONGOSH-976 (#1107)
Modify the `.watch()` signature so that it “officially” allows omitting the first argument. As a drive-by fix, stop applying `.tryNext()` for cases in which the change stream cursor starts from a specific point in time. (Unfortunately, `.hasNext()` is not an option here, unlike discussed with the Node.js driver team, because it is blocking like `.next()` and unlike `.tryNext()`.)
1 parent fb33585 commit f4e672b

File tree

4 files changed

+56
-7
lines changed

4 files changed

+56
-7
lines changed

packages/shell-api/src/collection.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1715,7 +1715,11 @@ export default class Collection extends ShellApiWithMongoClass {
17151715
@topologies([Topologies.ReplSet, Topologies.Sharded])
17161716
@apiVersions([1])
17171717
@returnsPromise
1718-
async watch(pipeline: Document[] = [], options: ChangeStreamOptions = {}): Promise<ChangeStreamCursor> {
1718+
async watch(pipeline: Document[] | ChangeStreamOptions = [], options: ChangeStreamOptions = {}): Promise<ChangeStreamCursor> {
1719+
if (!Array.isArray(pipeline)) {
1720+
options = pipeline;
1721+
pipeline = [];
1722+
}
17191723
this._emitCollectionApiCall('watch', { pipeline, options });
17201724
const cursor = new ChangeStreamCursor(
17211725
this._mongo._serviceProvider.watch(pipeline, {
@@ -1739,7 +1743,11 @@ export default class Collection extends ShellApiWithMongoClass {
17391743
// .watch() call and before the .tryNext() call could also have been
17401744
// observed before the .watch() call, i.e. there is a race condition
17411745
// here either way and we can use that to our advantage.
1742-
await cursor.tryNext();
1746+
// We only do this for change streams that do not specify from a specified
1747+
// point in time, i.e. start from the current time.
1748+
if (!options.resumeAfter && !options.startAfter && !options.startAtOperationTime) {
1749+
await cursor.tryNext();
1750+
}
17431751
this._mongo._instanceState.currentCursor = cursor;
17441752
return cursor;
17451753
}

packages/shell-api/src/database.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,7 +1436,11 @@ export default class Database extends ShellApiWithMongoClass {
14361436
@topologies([Topologies.ReplSet, Topologies.Sharded])
14371437
@apiVersions([1])
14381438
@returnsPromise
1439-
async watch(pipeline: Document[] = [], options: ChangeStreamOptions = {}): Promise<ChangeStreamCursor> {
1439+
async watch(pipeline: Document[] | ChangeStreamOptions = [], options: ChangeStreamOptions = {}): Promise<ChangeStreamCursor> {
1440+
if (!Array.isArray(pipeline)) {
1441+
options = pipeline;
1442+
pipeline = [];
1443+
}
14401444
this._emitDatabaseApiCall('watch', { pipeline, options });
14411445
const cursor = new ChangeStreamCursor(
14421446
this._mongo._serviceProvider.watch(pipeline, {
@@ -1446,7 +1450,9 @@ export default class Database extends ShellApiWithMongoClass {
14461450
this._name,
14471451
this._mongo
14481452
);
1449-
await cursor.tryNext(); // See comment in coll.watch().
1453+
if (!options.resumeAfter && !options.startAfter && !options.startAtOperationTime) {
1454+
await cursor.tryNext(); // See comment in coll.watch().
1455+
}
14501456
this._mongo._instanceState.currentCursor = cursor;
14511457
return cursor;
14521458
}

packages/shell-api/src/mongo.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -507,14 +507,20 @@ export default class Mongo extends ShellApiClass {
507507
@topologies([Topologies.ReplSet, Topologies.Sharded])
508508
@apiVersions([1])
509509
@returnsPromise
510-
async watch(pipeline: Document[] = [], options: ChangeStreamOptions = {}): Promise<ChangeStreamCursor> {
510+
async watch(pipeline: Document[] | ChangeStreamOptions = [], options: ChangeStreamOptions = {}): Promise<ChangeStreamCursor> {
511+
if (!Array.isArray(pipeline)) {
512+
options = pipeline;
513+
pipeline = [];
514+
}
511515
this._emitMongoApiCall('watch', { pipeline, options });
512516
const cursor = new ChangeStreamCursor(
513517
this._serviceProvider.watch(pipeline, options),
514518
redactURICredentials(this._uri),
515519
this
516520
);
517-
await cursor.tryNext(); // See comment in coll.watch().
521+
if (!options.resumeAfter && !options.startAfter && !options.startAtOperationTime) {
522+
await cursor.tryNext(); // See comment in coll.watch().
523+
}
518524
this._instanceState.currentCursor = cursor;
519525
return cursor;
520526
}

packages/shell-api/src/replica-set.spec.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -857,6 +857,7 @@ describe('ReplicaSet', () => {
857857
let additionalServer: MongodSetup;
858858
let serviceProvider: CliServiceProvider;
859859
let instanceState: ShellInstanceState;
860+
let db: Database;
860861
let rs: ReplicaSet;
861862

862863
before(async function() {
@@ -873,7 +874,8 @@ describe('ReplicaSet', () => {
873874

874875
serviceProvider = await CliServiceProvider.connect(`${await srv0.connectionString()}?directConnection=true`, {}, {}, new EventEmitter());
875876
instanceState = new ShellInstanceState(serviceProvider);
876-
rs = new ReplicaSet(instanceState.currentDb);
877+
db = instanceState.currentDb;
878+
rs = new ReplicaSet(db);
877879

878880
// check replset uninitialized
879881
try {
@@ -923,6 +925,33 @@ describe('ReplicaSet', () => {
923925
expect(Object.keys(result)).to.include('logSizeMB');
924926
});
925927
});
928+
929+
describe('watch', () => {
930+
afterEach(async() => {
931+
await db.dropDatabase();
932+
});
933+
934+
it('allows watching changes as they happen', async() => {
935+
const coll = db.getCollection('cstest');
936+
const cs = await coll.watch();
937+
await coll.insertOne({ i: 42 });
938+
expect((await cs.next()).fullDocument.i).to.equal(42);
939+
});
940+
941+
it('allow to resume watching changes as they happen', async() => {
942+
const coll = db.getCollection('cstest');
943+
const cs = await coll.watch();
944+
await coll.insertOne({ i: 123 });
945+
expect((await cs.next()).fullDocument.i).to.equal(123);
946+
const token = cs.getResumeToken();
947+
await coll.insertOne({ i: 456 });
948+
expect((await cs.next()).fullDocument.i).to.equal(456);
949+
950+
const cs2 = await coll.watch({ resumeAfter: token });
951+
expect((await cs2.next()).fullDocument.i).to.equal(456);
952+
});
953+
});
954+
926955
describe('reconfig', () => {
927956
it('reconfig with one less secondary', async() => {
928957
const newcfg: Partial<ReplSetConfig> = {

0 commit comments

Comments
 (0)