Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ declare global {

interface Context {
configuration: TestConfiguration;
/** @deprecated Please use afterEach hooks instead */
defer(fn: () => Promise<unknown>): void;
}

interface Test {
Expand Down
148 changes: 73 additions & 75 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,75 +209,68 @@ describe('Change Streams', function () {
}
});

it('should support creating multiple simultaneous ChangeStreams', {
metadata: { requires: { topology: 'replicaset' } },

test: function (done) {
it(
'should support creating multiple simultaneous ChangeStreams',
{ requires: { topology: 'replicaset' } },
async function () {
const configuration = this.configuration;
const client = configuration.newClient();

client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());
await client.connect();

const database = client.db('integration_tests');
const collection1 = database.collection('simultaneous1');
const collection2 = database.collection('simultaneous2');
this.defer(() => client.close());

const changeStream1 = collection1.watch([{ $addFields: { changeStreamNumber: 1 } }]);
this.defer(() => changeStream1.close());
const changeStream2 = collection2.watch([{ $addFields: { changeStreamNumber: 2 } }]);
this.defer(() => changeStream2.close());
const changeStream3 = collection2.watch([{ $addFields: { changeStreamNumber: 3 } }]);
this.defer(() => changeStream3.close());
const database = client.db('integration_tests');
const collection1 = database.collection('simultaneous1');
const collection2 = database.collection('simultaneous2');

setTimeout(() => {
this.defer(
collection1.insertMany([{ a: 1 }]).then(() => collection2.insertMany([{ a: 1 }]))
);
}, 50);

Promise.resolve()
.then(() =>
Promise.all([changeStream1.hasNext(), changeStream2.hasNext(), changeStream3.hasNext()])
)
.then(function (hasNexts) {
// Check all the Change Streams have a next item
assert.ok(hasNexts[0]);
assert.ok(hasNexts[1]);
assert.ok(hasNexts[2]);

return Promise.all([changeStream1.next(), changeStream2.next(), changeStream3.next()]);
})
.then(function (changes) {
// Check the values of the change documents are correct
assert.equal(changes[0].operationType, 'insert');
assert.equal(changes[1].operationType, 'insert');
assert.equal(changes[2].operationType, 'insert');

expect(changes[0]).to.have.nested.property('fullDocument.a', 1);
expect(changes[1]).to.have.nested.property('fullDocument.a', 1);
expect(changes[2]).to.have.nested.property('fullDocument.a', 1);

expect(changes[0]).to.have.nested.property('ns.db', 'integration_tests');
expect(changes[1]).to.have.nested.property('ns.db', 'integration_tests');
expect(changes[2]).to.have.nested.property('ns.db', 'integration_tests');

expect(changes[0]).to.have.nested.property('ns.coll', 'simultaneous1');
expect(changes[1]).to.have.nested.property('ns.coll', 'simultaneous2');
expect(changes[2]).to.have.nested.property('ns.coll', 'simultaneous2');

expect(changes[0]).to.have.nested.property('changeStreamNumber', 1);
expect(changes[1]).to.have.nested.property('changeStreamNumber', 2);
expect(changes[2]).to.have.nested.property('changeStreamNumber', 3);
})
.then(
() => done(),
err => done(err)
);
});
const changeStream1 = collection1.watch([{ $addFields: { changeStreamNumber: 1 } }]);
this.defer(() => changeStream1.close());
const changeStream2 = collection2.watch([{ $addFields: { changeStreamNumber: 2 } }]);
this.defer(() => changeStream2.close());
const changeStream3 = collection2.watch([{ $addFields: { changeStreamNumber: 3 } }]);
this.defer(() => changeStream3.close());

setTimeout(() => {
collection1.insertMany([{ a: 1 }]).then(() => collection2.insertMany([{ a: 1 }]));
}, 50);

await Promise.resolve()
.then(() =>
Promise.all([changeStream1.hasNext(), changeStream2.hasNext(), changeStream3.hasNext()])
)
.then(function (hasNexts) {
// Check all the Change Streams have a next item
assert.ok(hasNexts[0]);
assert.ok(hasNexts[1]);
assert.ok(hasNexts[2]);

return Promise.all([changeStream1.next(), changeStream2.next(), changeStream3.next()]);
})
.then(function (changes) {
// Check the values of the change documents are correct
assert.equal(changes[0].operationType, 'insert');
assert.equal(changes[1].operationType, 'insert');
assert.equal(changes[2].operationType, 'insert');

expect(changes[0]).to.have.nested.property('fullDocument.a', 1);
expect(changes[1]).to.have.nested.property('fullDocument.a', 1);
expect(changes[2]).to.have.nested.property('fullDocument.a', 1);

expect(changes[0]).to.have.nested.property('ns.db', 'integration_tests');
expect(changes[1]).to.have.nested.property('ns.db', 'integration_tests');
expect(changes[2]).to.have.nested.property('ns.db', 'integration_tests');

expect(changes[0]).to.have.nested.property('ns.coll', 'simultaneous1');
expect(changes[1]).to.have.nested.property('ns.coll', 'simultaneous2');
expect(changes[2]).to.have.nested.property('ns.coll', 'simultaneous2');

expect(changes[0]).to.have.nested.property('changeStreamNumber', 1);
expect(changes[1]).to.have.nested.property('changeStreamNumber', 2);
expect(changes[2]).to.have.nested.property('changeStreamNumber', 3);
});
}
});
);

it('should properly close ChangeStream cursor', {
metadata: { requires: { topology: 'replicaset' } },
Expand Down Expand Up @@ -807,22 +800,27 @@ describe('Change Streams', function () {

it('when invoked with promises', {
metadata: { requires: { topology: 'replicaset' } },
test: function () {
const read = () => {
return Promise.resolve()
.then(() => changeStream.next())
.then(() => changeStream.next())
.then(() => {
this.defer(lastWrite());
const nextP = changeStream.next();
return changeStream.close().then(() => nextP);
});
test: async function () {
const read = async () => {
await changeStream.next();
await changeStream.next();

const write = lastWrite();

const nextP = changeStream.next();

await changeStream.close();

await write;
await nextP;
};

return Promise.all([read(), write()]).then(
() => Promise.reject(new Error('Expected operation to fail with error')),
err => expect(err.message).to.equal('ChangeStream is closed')
const error = await Promise.all([read(), write()]).then(
() => null,
error => error
);

expect(error.message).to.equal('ChangeStream is closed');
}
});

Expand Down
75 changes: 39 additions & 36 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ const initIteratorMode = async (cs: ChangeStream) => {
};

/** Waits for a change stream to start */
function waitForStarted(changeStream, callback) {
changeStream.cursor.once('init', () => {
callback();
});
async function waitForStarted(changeStream, callback) {
await once(changeStream.cursor, 'init');
await callback();
}

// Define the pipeline processing changes
Expand Down Expand Up @@ -844,42 +843,48 @@ describe('Change Stream prose tests', function () {
describe('Change Stream prose 17-18', function () {
let client: MongoClient;
let coll: Collection;
let startAfter;
let startAfter: unknown;

function recordEvent(events, e) {
if (e.commandName !== 'aggregate') return;
events.push({ $changeStream: e.command.pipeline[0].$changeStream });
}

beforeEach(function (done) {
beforeEach('get startAfter token', async function () {
const configuration = this.configuration;
client = configuration.newClient({ monitorCommands: true });
client.connect(err => {
expect(err).to.not.exist;
coll = client.db('integration_tests').collection('setupAfterTest');
const changeStream = coll.watch();
changeStream.on('error', done);
waitForStarted(changeStream, () => {
coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } }, err => {
expect(err).to.not.exist;
const utilClient = configuration.newClient();
await utilClient.connect();

coll.drop(err => {
expect(err).to.not.exist;
});
});
});
const coll = utilClient.db('integration_tests').collection('setupAfterTest');
const changeStream = coll.watch();

changeStream.on('change', change => {
if (change.operationType === 'invalidate') {
startAfter = change._id;
changeStream.close(done);
}
});
});
const willInit = once(changeStream.cursor, 'init');

await changeStream.tryNext();
await willInit;

await coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } });
await coll.drop();

for await (const change of changeStream) {
if (change.operationType === 'invalidate') {
startAfter = change._id;
break;
}
}

await changeStream.close();

await utilClient.close();
});

afterEach(function (done) {
client.close(done);
beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true });
coll = client.db('integration_tests').collection('setupAfterTest');
});

afterEach(async function () {
await client.close();
});

// 17. $changeStream stage for ChangeStream started with startAfter against a server >=4.1.1
Expand All @@ -894,8 +899,8 @@ describe('Change Stream prose tests', function () {
client.on('commandStarted', e => recordEvent(events, e));
const changeStream = coll.watch([], { startAfter });

changeStream.on('error', async e => {
await changeStream.close(e);
changeStream.on('error', async () => {
await changeStream.close();
});

const changePromise = once(changeStream, 'change');
Expand Down Expand Up @@ -955,11 +960,9 @@ describe('Change Stream prose tests', function () {
});

waitForStarted(changeStream, () =>
this.defer(
coll
.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } })
.then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }))
)
coll
.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } })
.then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }))
);
}
});
Expand Down
91 changes: 43 additions & 48 deletions test/tools/runner/plugins/deferred.js
Original file line number Diff line number Diff line change
@@ -1,57 +1,52 @@
'use strict';

const kDeferred = Symbol('deferred');
const mocha = require('mocha');

(mocha => {
const Context = mocha.Context;
function makeExecuteDeferred(test) {
return () => {
const deferredActions = test[kDeferred];

// process actions LIFO
const promises = Array.from(deferredActions).reverse();
const result = promises.reduce((p, action) => {
if (action.length > 0) {
// assume these are async methods with provided `done`
const actionPromise = new Promise((resolve, reject) => {
function done(err) {
if (err) return reject(err);
resolve();
}

action(done);
});

return p.then(actionPromise);
}

return p.then(action);
}, Promise.resolve());

return result.then(
() => test[kDeferred].clear(),
err => {
test[kDeferred].clear();
return Promise.reject(err);
}
);
};
}
const { Context } = mocha;

Context.prototype.defer = function (fn) {
const test = this.test;
if (test[kDeferred] == null) {
test[kDeferred] = new Set();
function makeExecuteDeferred(test) {
return async function () {
/** @type {Array<() => Promise<void>>} */
const deferredActions = test[kDeferred];

const parentSuite = test.parent;
const afterEachHooks = parentSuite._afterEach;
if (afterEachHooks[0] == null || afterEachHooks[0].title !== kDeferred) {
const deferredHook = parentSuite._createHook('"deferred" hook', makeExecuteDeferred(test));
// process actions LIFO
const actions = Array.from(deferredActions).reverse();

afterEachHooks.unshift(deferredHook);
try {
for (const fn of actions) {
await fn();
}
} finally {
test[kDeferred].length = 0;
}

test[kDeferred].add(fn);
return this;
};
})(require('mocha'));
}

Context.prototype.defer = function defer(fn) {
const test = this.test;

if (typeof fn !== 'function') {
throw new Error('defer is meant to take a function that returns a promise');
}

if (test[kDeferred] == null) {
test[kDeferred] = [];

const parentSuite = test.parent;
const afterEachHooks = parentSuite._afterEach;
if (afterEachHooks[0] == null || afterEachHooks[0].title !== kDeferred) {
const deferredHook = parentSuite._createHook('"deferred" hook', makeExecuteDeferred(test));

// defer runs after test but before afterEach(s)
afterEachHooks.unshift(deferredHook);
}
}

if (test[kDeferred].includes(fn)) {
throw new Error('registered the same deferred action more than once');
}

test[kDeferred].push(fn);
return this;
};