diff --git a/global.d.ts b/global.d.ts index 14a69aae68b..09c35deea6e 100644 --- a/global.d.ts +++ b/global.d.ts @@ -84,6 +84,8 @@ declare global { interface Context { configuration: TestConfiguration; + /** @deprecated Please use afterEach hooks instead */ + defer(fn: () => Promise): void; } interface Test { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 000d7ba41b1..a115caf29e6 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -209,74 +209,78 @@ describe('Change Streams', function () { } }); - it('should support creating multiple simultaneous ChangeStreams', { - metadata: { requires: { topology: 'replicaset' } }, + describe('when creating multiple simultaneous ChangeStreams', () => { + let client; + let changeStream1; + let changeStream2; + let changeStream3; - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient(); + beforeEach(async function () { + client = this.configuration.newClient(); + }); - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); + afterEach(async function () { + await changeStream1?.close(); + await changeStream2?.close(); + await changeStream3?.close(); + await client?.close(); + }); + it( + 'supports simultaneous parallel ChangeStream use', + { requires: { topology: '!single' } }, + async function () { const database = client.db('integration_tests'); const collection1 = database.collection('simultaneous1'); const collection2 = database.collection('simultaneous2'); - const changeStream1 = collection1.watch([{ $addFields: { changeStreamNumber: 1 } }]); - this.defer(() => changeStream1.close()); - const changeStream2 = collection2.watch([{ $addFields: { changeStreamNumber: 2 } }]); - this.defer(() => changeStream2.close()); - const changeStream3 = collection2.watch([{ $addFields: { changeStreamNumber: 3 } }]); - this.defer(() => changeStream3.close()); + changeStream1 = collection1.watch([{ $addFields: { changeStreamNumber: 1 } }]); + changeStream2 = collection2.watch([{ $addFields: { changeStreamNumber: 2 } }]); + changeStream3 = collection2.watch([{ $addFields: { changeStreamNumber: 3 } }]); setTimeout(() => { - this.defer( - collection1.insertMany([{ a: 1 }]).then(() => collection2.insertMany([{ a: 1 }])) - ); + collection1.insertMany([{ a: 1 }]).then(() => collection2.insertMany([{ a: 1 }])); }, 50); - Promise.resolve() - .then(() => - Promise.all([changeStream1.hasNext(), changeStream2.hasNext(), changeStream3.hasNext()]) - ) - .then(function (hasNexts) { - // Check all the Change Streams have a next item - assert.ok(hasNexts[0]); - assert.ok(hasNexts[1]); - assert.ok(hasNexts[2]); - - return Promise.all([changeStream1.next(), changeStream2.next(), changeStream3.next()]); - }) - .then(function (changes) { - // Check the values of the change documents are correct - assert.equal(changes[0].operationType, 'insert'); - assert.equal(changes[1].operationType, 'insert'); - assert.equal(changes[2].operationType, 'insert'); - - expect(changes[0]).to.have.nested.property('fullDocument.a', 1); - expect(changes[1]).to.have.nested.property('fullDocument.a', 1); - expect(changes[2]).to.have.nested.property('fullDocument.a', 1); - - expect(changes[0]).to.have.nested.property('ns.db', 'integration_tests'); - expect(changes[1]).to.have.nested.property('ns.db', 'integration_tests'); - expect(changes[2]).to.have.nested.property('ns.db', 'integration_tests'); - - expect(changes[0]).to.have.nested.property('ns.coll', 'simultaneous1'); - expect(changes[1]).to.have.nested.property('ns.coll', 'simultaneous2'); - expect(changes[2]).to.have.nested.property('ns.coll', 'simultaneous2'); - - expect(changes[0]).to.have.nested.property('changeStreamNumber', 1); - expect(changes[1]).to.have.nested.property('changeStreamNumber', 2); - expect(changes[2]).to.have.nested.property('changeStreamNumber', 3); - }) - .then( - () => done(), - err => done(err) - ); - }); - } + const hasNexts = await Promise.all([ + changeStream1.hasNext(), + changeStream2.hasNext(), + changeStream3.hasNext() + ]); + + // Check all the Change Streams have a next item + expect(hasNexts[0]).to.be.true; + expect(hasNexts[1]).to.be.true; + expect(hasNexts[2]).to.be.true; + + const changes = await Promise.all([ + changeStream1.next(), + changeStream2.next(), + changeStream3.next() + ]); + + // Check the values of the change documents are correct + expect(changes[0].operationType).to.be.equal('insert'); + expect(changes[1].operationType).to.be.equal('insert'); + expect(changes[2].operationType).to.be.equal('insert'); + + expect(changes[0]).to.have.nested.property('fullDocument.a', 1); + expect(changes[1]).to.have.nested.property('fullDocument.a', 1); + expect(changes[2]).to.have.nested.property('fullDocument.a', 1); + + expect(changes[0]).to.have.nested.property('ns.db', 'integration_tests'); + expect(changes[1]).to.have.nested.property('ns.db', 'integration_tests'); + expect(changes[2]).to.have.nested.property('ns.db', 'integration_tests'); + + expect(changes[0]).to.have.nested.property('ns.coll', 'simultaneous1'); + expect(changes[1]).to.have.nested.property('ns.coll', 'simultaneous2'); + expect(changes[2]).to.have.nested.property('ns.coll', 'simultaneous2'); + + expect(changes[0]).to.have.nested.property('changeStreamNumber', 1); + expect(changes[1]).to.have.nested.property('changeStreamNumber', 2); + expect(changes[2]).to.have.nested.property('changeStreamNumber', 3); + } + ); }); it('should properly close ChangeStream cursor', { @@ -806,23 +810,28 @@ describe('Change Streams', function () { }); it('when invoked with promises', { - metadata: { requires: { topology: 'replicaset' } }, - test: function () { - const read = () => { - return Promise.resolve() - .then(() => changeStream.next()) - .then(() => changeStream.next()) - .then(() => { - this.defer(lastWrite()); - const nextP = changeStream.next(); - return changeStream.close().then(() => nextP); - }); + metadata: { requires: { topology: '!single' } }, + test: async function () { + const read = async () => { + await changeStream.next(); + await changeStream.next(); + + const write = lastWrite(); + + const nextP = changeStream.next(); + + await changeStream.close(); + + await write; + await nextP; }; - return Promise.all([read(), write()]).then( - () => Promise.reject(new Error('Expected operation to fail with error')), - err => expect(err.message).to.equal('ChangeStream is closed') + const error = await Promise.all([read(), write()]).then( + () => null, + error => error ); + + expect(error.message).to.equal('ChangeStream is closed'); } }); diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index cd2c424072a..b4def6532fc 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -80,10 +80,9 @@ const initIteratorMode = async (cs: ChangeStream) => { }; /** Waits for a change stream to start */ -function waitForStarted(changeStream, callback) { - changeStream.cursor.once('init', () => { - callback(); - }); +async function waitForStarted(changeStream, callback) { + await once(changeStream.cursor, 'init'); + await callback(); } // Define the pipeline processing changes @@ -844,42 +843,48 @@ describe('Change Stream prose tests', function () { describe('Change Stream prose 17-18', function () { let client: MongoClient; let coll: Collection; - let startAfter; + let startAfter: unknown; function recordEvent(events, e) { if (e.commandName !== 'aggregate') return; events.push({ $changeStream: e.command.pipeline[0].$changeStream }); } - beforeEach(function (done) { + beforeEach('get startAfter token', async function () { const configuration = this.configuration; - client = configuration.newClient({ monitorCommands: true }); - client.connect(err => { - expect(err).to.not.exist; - coll = client.db('integration_tests').collection('setupAfterTest'); - const changeStream = coll.watch(); - changeStream.on('error', done); - waitForStarted(changeStream, () => { - coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } }, err => { - expect(err).to.not.exist; + const utilClient = configuration.newClient(); + await utilClient.connect(); - coll.drop(err => { - expect(err).to.not.exist; - }); - }); - }); + const coll = utilClient.db('integration_tests').collection('setupAfterTest'); + const changeStream = coll.watch(); - changeStream.on('change', change => { - if (change.operationType === 'invalidate') { - startAfter = change._id; - changeStream.close(done); - } - }); - }); + const willInit = once(changeStream.cursor, 'init'); + + await changeStream.tryNext(); + await willInit; + + await coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } }); + await coll.drop(); + + for await (const change of changeStream) { + if (change.operationType === 'invalidate') { + startAfter = change._id; + break; + } + } + + await changeStream.close(); + + await utilClient.close(); }); - afterEach(function (done) { - client.close(done); + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + coll = client.db('integration_tests').collection('setupAfterTest'); + }); + + afterEach(async function () { + await client.close(); }); // 17. $changeStream stage for ChangeStream started with startAfter against a server >=4.1.1 @@ -894,8 +899,8 @@ describe('Change Stream prose tests', function () { client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); - changeStream.on('error', async e => { - await changeStream.close(e); + changeStream.on('error', async () => { + await changeStream.close(); }); const changePromise = once(changeStream, 'change'); @@ -955,11 +960,9 @@ describe('Change Stream prose tests', function () { }); waitForStarted(changeStream, () => - this.defer( - coll - .insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }) - .then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } })) - ) + coll + .insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }) + .then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } })) ); } }); diff --git a/test/tools/runner/plugins/deferred.js b/test/tools/runner/plugins/deferred.js index 6ceef03fac7..f8219aaad67 100644 --- a/test/tools/runner/plugins/deferred.js +++ b/test/tools/runner/plugins/deferred.js @@ -1,57 +1,52 @@ 'use strict'; + const kDeferred = Symbol('deferred'); +const mocha = require('mocha'); -(mocha => { - const Context = mocha.Context; - function makeExecuteDeferred(test) { - return () => { - const deferredActions = test[kDeferred]; - - // process actions LIFO - const promises = Array.from(deferredActions).reverse(); - const result = promises.reduce((p, action) => { - if (action.length > 0) { - // assume these are async methods with provided `done` - const actionPromise = new Promise((resolve, reject) => { - function done(err) { - if (err) return reject(err); - resolve(); - } - - action(done); - }); - - return p.then(actionPromise); - } - - return p.then(action); - }, Promise.resolve()); - - return result.then( - () => test[kDeferred].clear(), - err => { - test[kDeferred].clear(); - return Promise.reject(err); - } - ); - }; - } +const { Context } = mocha; - Context.prototype.defer = function (fn) { - const test = this.test; - if (test[kDeferred] == null) { - test[kDeferred] = new Set(); +function makeExecuteDeferred(test) { + return async function () { + /** @type {Array<() => Promise>} */ + const deferredActions = test[kDeferred]; - const parentSuite = test.parent; - const afterEachHooks = parentSuite._afterEach; - if (afterEachHooks[0] == null || afterEachHooks[0].title !== kDeferred) { - const deferredHook = parentSuite._createHook('"deferred" hook', makeExecuteDeferred(test)); + // process actions LIFO + const actions = Array.from(deferredActions).reverse(); - afterEachHooks.unshift(deferredHook); + try { + for (const fn of actions) { + await fn(); } + } finally { + test[kDeferred].length = 0; } - - test[kDeferred].add(fn); - return this; }; -})(require('mocha')); +} + +Context.prototype.defer = function defer(fn) { + const test = this.test; + + if (typeof fn !== 'function') { + throw new Error('defer is meant to take a function that returns a promise'); + } + + if (test[kDeferred] == null) { + test[kDeferred] = []; + + const parentSuite = test.parent; + const afterEachHooks = parentSuite._afterEach; + if (afterEachHooks[0] == null || afterEachHooks[0].title !== kDeferred) { + const deferredHook = parentSuite._createHook('"deferred" hook', makeExecuteDeferred(test)); + + // defer runs after test but before afterEach(s) + afterEachHooks.unshift(deferredHook); + } + } + + if (test[kDeferred].includes(fn)) { + throw new Error('registered the same deferred action more than once'); + } + + test[kDeferred].push(fn); + return this; +};