Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ declare global {

interface Context {
configuration: TestConfiguration;
/** @deprecated Please use afterEach hooks instead */
defer(fn: () => Promise<unknown>): void;
}

interface Test {
Expand Down
151 changes: 80 additions & 71 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,74 +209,78 @@ describe('Change Streams', function () {
}
});

it('should support creating multiple simultaneous ChangeStreams', {
metadata: { requires: { topology: 'replicaset' } },
describe('when creating multiple simultaneous ChangeStreams', () => {
let client;
let changeStream1;
let changeStream2;
let changeStream3;

test: function (done) {
const configuration = this.configuration;
const client = configuration.newClient();
beforeEach(async function () {
client = this.configuration.newClient();
});

client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());
afterEach(async function () {
await changeStream1?.close();
await changeStream2?.close();
await changeStream3?.close();
await client?.close();
});

it(
'supports simultaneous parallel ChangeStream use',
{ requires: { topology: '!single' } },
async function () {
const database = client.db('integration_tests');
const collection1 = database.collection('simultaneous1');
const collection2 = database.collection('simultaneous2');

const changeStream1 = collection1.watch([{ $addFields: { changeStreamNumber: 1 } }]);
this.defer(() => changeStream1.close());
const changeStream2 = collection2.watch([{ $addFields: { changeStreamNumber: 2 } }]);
this.defer(() => changeStream2.close());
const changeStream3 = collection2.watch([{ $addFields: { changeStreamNumber: 3 } }]);
this.defer(() => changeStream3.close());
changeStream1 = collection1.watch([{ $addFields: { changeStreamNumber: 1 } }]);
changeStream2 = collection2.watch([{ $addFields: { changeStreamNumber: 2 } }]);
changeStream3 = collection2.watch([{ $addFields: { changeStreamNumber: 3 } }]);

setTimeout(() => {
this.defer(
collection1.insertMany([{ a: 1 }]).then(() => collection2.insertMany([{ a: 1 }]))
);
collection1.insertMany([{ a: 1 }]).then(() => collection2.insertMany([{ a: 1 }]));
}, 50);

Promise.resolve()
.then(() =>
Promise.all([changeStream1.hasNext(), changeStream2.hasNext(), changeStream3.hasNext()])
)
.then(function (hasNexts) {
// Check all the Change Streams have a next item
assert.ok(hasNexts[0]);
assert.ok(hasNexts[1]);
assert.ok(hasNexts[2]);

return Promise.all([changeStream1.next(), changeStream2.next(), changeStream3.next()]);
})
.then(function (changes) {
// Check the values of the change documents are correct
assert.equal(changes[0].operationType, 'insert');
assert.equal(changes[1].operationType, 'insert');
assert.equal(changes[2].operationType, 'insert');

expect(changes[0]).to.have.nested.property('fullDocument.a', 1);
expect(changes[1]).to.have.nested.property('fullDocument.a', 1);
expect(changes[2]).to.have.nested.property('fullDocument.a', 1);

expect(changes[0]).to.have.nested.property('ns.db', 'integration_tests');
expect(changes[1]).to.have.nested.property('ns.db', 'integration_tests');
expect(changes[2]).to.have.nested.property('ns.db', 'integration_tests');

expect(changes[0]).to.have.nested.property('ns.coll', 'simultaneous1');
expect(changes[1]).to.have.nested.property('ns.coll', 'simultaneous2');
expect(changes[2]).to.have.nested.property('ns.coll', 'simultaneous2');

expect(changes[0]).to.have.nested.property('changeStreamNumber', 1);
expect(changes[1]).to.have.nested.property('changeStreamNumber', 2);
expect(changes[2]).to.have.nested.property('changeStreamNumber', 3);
})
.then(
() => done(),
err => done(err)
);
});
}
const hasNexts = await Promise.all([
changeStream1.hasNext(),
changeStream2.hasNext(),
changeStream3.hasNext()
]);

// Check all the Change Streams have a next item
expect(hasNexts[0]).to.be.true;
expect(hasNexts[1]).to.be.true;
expect(hasNexts[2]).to.be.true;

const changes = await Promise.all([
changeStream1.next(),
changeStream2.next(),
changeStream3.next()
]);

// Check the values of the change documents are correct
expect(changes[0].operationType).to.be.equal('insert');
expect(changes[1].operationType).to.be.equal('insert');
expect(changes[2].operationType).to.be.equal('insert');

expect(changes[0]).to.have.nested.property('fullDocument.a', 1);
expect(changes[1]).to.have.nested.property('fullDocument.a', 1);
expect(changes[2]).to.have.nested.property('fullDocument.a', 1);

expect(changes[0]).to.have.nested.property('ns.db', 'integration_tests');
expect(changes[1]).to.have.nested.property('ns.db', 'integration_tests');
expect(changes[2]).to.have.nested.property('ns.db', 'integration_tests');

expect(changes[0]).to.have.nested.property('ns.coll', 'simultaneous1');
expect(changes[1]).to.have.nested.property('ns.coll', 'simultaneous2');
expect(changes[2]).to.have.nested.property('ns.coll', 'simultaneous2');

expect(changes[0]).to.have.nested.property('changeStreamNumber', 1);
expect(changes[1]).to.have.nested.property('changeStreamNumber', 2);
expect(changes[2]).to.have.nested.property('changeStreamNumber', 3);
}
);
});

it('should properly close ChangeStream cursor', {
Expand Down Expand Up @@ -806,23 +810,28 @@ describe('Change Streams', function () {
});

it('when invoked with promises', {
metadata: { requires: { topology: 'replicaset' } },
test: function () {
const read = () => {
return Promise.resolve()
.then(() => changeStream.next())
.then(() => changeStream.next())
.then(() => {
this.defer(lastWrite());
const nextP = changeStream.next();
return changeStream.close().then(() => nextP);
});
metadata: { requires: { topology: '!single' } },
test: async function () {
const read = async () => {
await changeStream.next();
await changeStream.next();

const write = lastWrite();

const nextP = changeStream.next();

await changeStream.close();

await write;
await nextP;
};

return Promise.all([read(), write()]).then(
() => Promise.reject(new Error('Expected operation to fail with error')),
err => expect(err.message).to.equal('ChangeStream is closed')
const error = await Promise.all([read(), write()]).then(
() => null,
error => error
);

expect(error.message).to.equal('ChangeStream is closed');
}
});

Expand Down
75 changes: 39 additions & 36 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ const initIteratorMode = async (cs: ChangeStream) => {
};

/** Waits for a change stream to start */
function waitForStarted(changeStream, callback) {
changeStream.cursor.once('init', () => {
callback();
});
async function waitForStarted(changeStream, callback) {
await once(changeStream.cursor, 'init');
await callback();
}

// Define the pipeline processing changes
Expand Down Expand Up @@ -844,42 +843,48 @@ describe('Change Stream prose tests', function () {
describe('Change Stream prose 17-18', function () {
let client: MongoClient;
let coll: Collection;
let startAfter;
let startAfter: unknown;

function recordEvent(events, e) {
if (e.commandName !== 'aggregate') return;
events.push({ $changeStream: e.command.pipeline[0].$changeStream });
}

beforeEach(function (done) {
beforeEach('get startAfter token', async function () {
const configuration = this.configuration;
client = configuration.newClient({ monitorCommands: true });
client.connect(err => {
expect(err).to.not.exist;
coll = client.db('integration_tests').collection('setupAfterTest');
const changeStream = coll.watch();
changeStream.on('error', done);
waitForStarted(changeStream, () => {
coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } }, err => {
expect(err).to.not.exist;
const utilClient = configuration.newClient();
await utilClient.connect();

coll.drop(err => {
expect(err).to.not.exist;
});
});
});
const coll = utilClient.db('integration_tests').collection('setupAfterTest');
const changeStream = coll.watch();

changeStream.on('change', change => {
if (change.operationType === 'invalidate') {
startAfter = change._id;
changeStream.close(done);
}
});
});
const willInit = once(changeStream.cursor, 'init');

await changeStream.tryNext();
await willInit;

await coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } });
await coll.drop();

for await (const change of changeStream) {
if (change.operationType === 'invalidate') {
startAfter = change._id;
break;
}
}

await changeStream.close();

await utilClient.close();
});

afterEach(function (done) {
client.close(done);
beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true });
coll = client.db('integration_tests').collection('setupAfterTest');
});

afterEach(async function () {
await client.close();
});

// 17. $changeStream stage for ChangeStream started with startAfter against a server >=4.1.1
Expand All @@ -894,8 +899,8 @@ describe('Change Stream prose tests', function () {
client.on('commandStarted', e => recordEvent(events, e));
const changeStream = coll.watch([], { startAfter });

changeStream.on('error', async e => {
await changeStream.close(e);
changeStream.on('error', async () => {
await changeStream.close();
});

const changePromise = once(changeStream, 'change');
Expand Down Expand Up @@ -955,11 +960,9 @@ describe('Change Stream prose tests', function () {
});

waitForStarted(changeStream, () =>
this.defer(
coll
.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } })
.then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }))
)
coll
.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } })
.then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }))
);
}
});
Expand Down
Loading