Skip to content

Commit dcbfd6e

Browse files
authored
fix(NODE-4413): set maxTimeMS on getMores when maxAwaitTimeMS is specified (#3319)
1 parent bc70022 commit dcbfd6e

File tree

6 files changed

+426
-144
lines changed

6 files changed

+426
-144
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,20 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
7878
session?: ClientSession;
7979
readPreference?: ReadPreferenceLike;
8080
readConcern?: ReadConcernLike;
81+
/**
82+
* Specifies the number of documents to return in each response from MongoDB
83+
*/
8184
batchSize?: number;
85+
/**
86+
* When applicable `maxTimeMS` controls the amount of time the initial command
87+
* that constructs a cursor should take. (ex. find, aggregate, listCollections)
88+
*/
8289
maxTimeMS?: number;
90+
/**
91+
* When applicable `maxAwaitTimeMS` controls the amount of time subsequent getMores
92+
* that a cursor uses to fetch more data should take. (ex. cursor.next())
93+
*/
94+
maxAwaitTimeMS?: number;
8395
/**
8496
* Comment to apply to the operation.
8597
*
@@ -89,7 +101,19 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
89101
* In server versions 4.4 and above, 'comment' can be any valid BSON type.
90102
*/
91103
comment?: unknown;
104+
/**
105+
* By default, MongoDB will automatically close a cursor when the
106+
* client has exhausted all results in the cursor. However, for [capped collections](https://www.mongodb.com/docs/manual/core/capped-collections)
107+
* you may use a Tailable Cursor that remains open after the client exhausts
108+
* the results in the initial cursor.
109+
*/
92110
tailable?: boolean;
111+
/**
112+
* If awaitData is set to true, when the cursor reaches the end of the capped collection,
113+
* MongoDB blocks the query thread for a period of time waiting for new data to arrive.
114+
* When new data is inserted into the capped collection, the blocked thread is signaled
115+
* to wake up and return the next batch to the client.
116+
*/
93117
awaitData?: boolean;
94118
noCursorTimeout?: boolean;
95119
}
@@ -155,7 +179,7 @@ export abstract class AbstractCursor<
155179
}
156180
this[kClient] = client;
157181
this[kNamespace] = namespace;
158-
this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230
182+
this[kDocuments] = [];
159183
this[kInitialized] = false;
160184
this[kClosed] = false;
161185
this[kKilled] = false;
@@ -186,6 +210,10 @@ export abstract class AbstractCursor<
186210
this[kOptions].maxTimeMS = options.maxTimeMS;
187211
}
188212

213+
if (typeof options.maxAwaitTimeMS === 'number') {
214+
this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS;
215+
}
216+
189217
if (options.session instanceof ClientSession) {
190218
this[kSession] = options.session;
191219
} else {
@@ -617,21 +645,8 @@ export abstract class AbstractCursor<
617645

618646
/** @internal */
619647
_getMore(batchSize: number, callback: Callback<Document>): void {
620-
const cursorId = this[kId];
621-
const cursorNs = this[kNamespace];
622-
const server = this[kServer];
623-
624-
if (cursorId == null) {
625-
callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
626-
return;
627-
}
628-
629-
if (server == null) {
630-
callback(new MongoRuntimeError('Unable to iterate cursor without selected server'));
631-
return;
632-
}
633-
634-
const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, {
648+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
649+
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
635650
...this[kOptions],
636651
session: this[kSession],
637652
batchSize

src/operations/get_more.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export class GetMoreOperation extends AbstractOperation {
3939
cursorId: Long;
4040
override options: GetMoreOptions;
4141

42-
constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) {
42+
constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions) {
4343
super(options);
4444

4545
this.options = options;
@@ -63,6 +63,10 @@ export class GetMoreOperation extends AbstractOperation {
6363
);
6464
}
6565

66+
if (this.cursorId == null || this.cursorId.isZero()) {
67+
return callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
68+
}
69+
6670
const collection = this.ns.collection;
6771
if (collection == null) {
6872
// Cursors should have adopted the namespace returned by MongoDB

test/integration/change-streams/change_stream.test.ts

Lines changed: 123 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,7 @@ describe('Change Streams', function () {
11111111
changeStream.next((err, doc) => {
11121112
expect(err).to.exist;
11131113
expect(doc).to.not.exist;
1114-
expect(err.message).to.equal('ChangeStream is closed');
1114+
expect(err?.message).to.equal('ChangeStream is closed');
11151115
changeStream.close(() => client.close(done));
11161116
});
11171117
});
@@ -1372,23 +1372,139 @@ describe('Change Streams', function () {
13721372
)
13731373
.run();
13741374

1375+
UnifiedTestSuiteBuilder.describe('entity.watch() server-side options')
1376+
.runOnRequirement({
1377+
topologies: ['replicaset', 'sharded-replicaset', 'sharded', 'load-balanced'],
1378+
minServerVersion: '4.4.0'
1379+
})
1380+
.createEntities([
1381+
{ client: { id: 'client0', observeEvents: ['commandStartedEvent'] } },
1382+
{ database: { id: 'db0', client: 'client0', databaseName: 'watchOpts' } },
1383+
{ collection: { id: 'collection0', database: 'db0', collectionName: 'watchOpts' } }
1384+
])
1385+
.test(
1386+
TestBuilder.it(
1387+
'should use maxAwaitTimeMS option to set maxTimeMS on getMore and should not set maxTimeMS on aggregate'
1388+
)
1389+
.operation({
1390+
object: 'collection0',
1391+
name: 'createChangeStream',
1392+
saveResultAsEntity: 'changeStreamOnClient',
1393+
arguments: { maxAwaitTimeMS: 5000 }
1394+
})
1395+
.operation({
1396+
name: 'insertOne',
1397+
object: 'collection0',
1398+
arguments: { document: { a: 1 } },
1399+
ignoreResultAndError: true
1400+
})
1401+
.operation({
1402+
object: 'changeStreamOnClient',
1403+
name: 'iterateUntilDocumentOrError',
1404+
ignoreResultAndError: true
1405+
})
1406+
.expectEvents({
1407+
client: 'client0',
1408+
events: [
1409+
{
1410+
commandStartedEvent: {
1411+
commandName: 'aggregate',
1412+
command: { maxTimeMS: { $$exists: false } }
1413+
}
1414+
},
1415+
{ commandStartedEvent: { commandName: 'insert' } },
1416+
{ commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } }
1417+
]
1418+
})
1419+
.toJSON()
1420+
)
1421+
.test(
1422+
TestBuilder.it(
1423+
'should use maxTimeMS option to set maxTimeMS on aggregate and not set maxTimeMS on getMore'
1424+
)
1425+
.operation({
1426+
object: 'collection0',
1427+
name: 'createChangeStream',
1428+
saveResultAsEntity: 'changeStreamOnClient',
1429+
arguments: { maxTimeMS: 5000 }
1430+
})
1431+
.operation({
1432+
name: 'insertOne',
1433+
object: 'collection0',
1434+
arguments: { document: { a: 1 } },
1435+
ignoreResultAndError: true
1436+
})
1437+
.operation({
1438+
object: 'changeStreamOnClient',
1439+
name: 'iterateUntilDocumentOrError',
1440+
ignoreResultAndError: true
1441+
})
1442+
.expectEvents({
1443+
client: 'client0',
1444+
ignoreExtraEvents: true, // Sharded clusters have extra getMores
1445+
events: [
1446+
{ commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } },
1447+
{ commandStartedEvent: { commandName: 'insert' } },
1448+
{
1449+
commandStartedEvent: {
1450+
commandName: 'getMore',
1451+
command: { maxTimeMS: { $$exists: false } }
1452+
}
1453+
}
1454+
]
1455+
})
1456+
.toJSON()
1457+
)
1458+
.test(
1459+
TestBuilder.it(
1460+
'should use maxTimeMS option to set maxTimeMS on aggregate and maxAwaitTimeMS option to set maxTimeMS on getMore'
1461+
)
1462+
.operation({
1463+
object: 'collection0',
1464+
name: 'createChangeStream',
1465+
saveResultAsEntity: 'changeStreamOnClient',
1466+
arguments: { maxTimeMS: 5000, maxAwaitTimeMS: 6000 }
1467+
})
1468+
.operation({
1469+
name: 'insertOne',
1470+
object: 'collection0',
1471+
arguments: { document: { a: 1 } },
1472+
ignoreResultAndError: true
1473+
})
1474+
.operation({
1475+
object: 'changeStreamOnClient',
1476+
name: 'iterateUntilDocumentOrError',
1477+
ignoreResultAndError: true
1478+
})
1479+
.expectEvents({
1480+
client: 'client0',
1481+
ignoreExtraEvents: true, // Sharded clusters have extra getMores
1482+
events: [
1483+
{ commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } },
1484+
{ commandStartedEvent: { commandName: 'insert' } },
1485+
{ commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 6000 } } }
1486+
]
1487+
})
1488+
.toJSON()
1489+
)
1490+
.run();
1491+
13751492
describe('BSON Options', function () {
13761493
let client: MongoClient;
13771494
let db: Db;
13781495
let collection: Collection;
13791496
let cs: ChangeStream;
1497+
13801498
beforeEach(async function () {
13811499
client = await this.configuration.newClient({ monitorCommands: true }).connect();
13821500
db = client.db('db');
13831501
collection = await db.createCollection('collection');
13841502
});
1503+
13851504
afterEach(async function () {
13861505
await db.dropCollection('collection');
13871506
await cs.close();
13881507
await client.close();
1389-
client = undefined;
1390-
db = undefined;
1391-
collection = undefined;
13921508
});
13931509

13941510
context('promoteLongs', () => {
@@ -1452,7 +1568,7 @@ describe('Change Streams', function () {
14521568
it('does not send invalid options on the aggregate command', {
14531569
metadata: { requires: { topology: '!single' } },
14541570
test: async function () {
1455-
const started = [];
1571+
const started: CommandStartedEvent[] = [];
14561572

14571573
client.on('commandStarted', filterForCommands(['aggregate'], started));
14581574
const doc = { invalidBSONOption: true };
@@ -1473,7 +1589,7 @@ describe('Change Streams', function () {
14731589
it('does not send invalid options on the getMore command', {
14741590
metadata: { requires: { topology: '!single' } },
14751591
test: async function () {
1476-
const started = [];
1592+
const started: CommandStartedEvent[] = [];
14771593

14781594
client.on('commandStarted', filterForCommands(['aggregate'], started));
14791595
const doc = { invalidBSONOption: true };
@@ -1503,7 +1619,7 @@ describe('ChangeStream resumability', function () {
15031619
const changeStreamResumeOptions: ChangeStreamOptions = {
15041620
fullDocument: 'updateLookup',
15051621
collation: { locale: 'en', maxVariable: 'punct' },
1506-
maxAwaitTimeMS: 20000,
1622+
maxAwaitTimeMS: 2000,
15071623
batchSize: 200
15081624
};
15091625

0 commit comments

Comments
 (0)