diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 749285c8b2..a30ca19ed7 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1,5 +1,5 @@ import { strict as assert } from 'assert'; -import { UUID } from 'bson'; +import { Long, UUID } from 'bson'; import { expect } from 'chai'; import { on, once } from 'events'; import { gte, lt } from 'semver'; @@ -11,19 +11,16 @@ import { type ChangeStream, type ChangeStreamDocument, type ChangeStreamOptions, - type Collection, - type CommandStartedEvent, - type Db, - isHello, - LEGACY_HELLO_COMMAND, - Long, - MongoAPIError, - MongoChangeStreamError, - type MongoClient, - MongoServerError, - ReadPreference, type ResumeToken -} from '../../mongodb'; +} from '../../../src/change_stream'; +import { type CommandStartedEvent } from '../../../src/cmap/command_monitoring_events'; +import { type Collection } from '../../../src/collection'; +import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; +import { type Db } from '../../../src/db'; +import { MongoAPIError, MongoChangeStreamError, MongoServerError } from '../../../src/error'; +import { type MongoClient } from '../../../src/mongo_client'; +import { ReadPreference } from '../../../src/read_preference'; +import { isHello } from '../../../src/utils'; import * as mock from '../../tools/mongodb-mock/index'; import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder'; import { type FailCommandFailPoint, sleep } from '../../tools/utils'; @@ -323,30 +320,25 @@ describe('Change Streams', function () { it('should properly close ChangeStream cursor', { metadata: { requires: { topology: 'replicaset' } }, - test: function (done) { + test: async function () { const configuration = this.configuration; const client = configuration.newClient(); - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); + await client.connect(); + const database = client.db('integration_tests'); + const changeStream = database.collection('changeStreamCloseTest').watch(pipeline); - const database = client.db('integration_tests'); - const changeStream = database.collection('changeStreamCloseTest').watch(pipeline); - this.defer(() => changeStream.close()); + assert.equal(changeStream.closed, false); + assert.equal(changeStream.cursor.closed, false); - assert.equal(changeStream.closed, false); - assert.equal(changeStream.cursor.closed, false); + await changeStream.close(); - changeStream.close(err => { - expect(err).to.not.exist; + // Check the cursor is closed + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); - // Check the cursor is closed - expect(changeStream.closed).to.be.true; - expect(changeStream.cursor).property('closed', true); - done(); - }); - }); + await changeStream.close(); + await client.close(); } }); @@ -355,32 +347,28 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset' } }, - test: function (done) { + test: async function () { const configuration = this.configuration; const client = configuration.newClient(); - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); + await client.connect(); - const forbiddenStage = {}; - const forbiddenStageName = '$alksdjfhlaskdfjh'; - forbiddenStage[forbiddenStageName] = 2; + const forbiddenStage = {}; + const forbiddenStageName = '$alksdjfhlaskdfjh'; + forbiddenStage[forbiddenStageName] = 2; - const database = client.db('integration_tests'); - const changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]); - this.defer(() => changeStream.close()); + const database = client.db('integration_tests'); + const changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]); - changeStream.next(err => { - assert.ok(err); - assert.ok(err.message); - assert.ok( - err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1 - ); + const err = await changeStream.next().catch(e => e); + assert.ok(err); + assert.ok(err.message); + assert.ok( + err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1 + ); - done(); - }); - }); + await changeStream.close(); + await client.close(); } } ); @@ -459,37 +447,25 @@ describe('Change Streams', function () { it('should error if resume token projected out of change stream document using iterator', { metadata: { requires: { topology: 'replicaset' } }, - test(done) { + async test() { const configuration = this.configuration; const client = configuration.newClient(); - client.connect((err, client) => { - expect(err).to.not.exist; + await client.connect(); - const database = client.db('integration_tests'); - const collection = database.collection('resumetokenProjectedOutCallback'); - const changeStream = collection.watch([{ $project: { _id: false } }]); + const database = client.db('integration_tests'); + const collection = database.collection('resumetokenProjectedOutCallback'); + const changeStream = collection.watch([{ $project: { _id: false } }]); - changeStream.hasNext(() => { - // trigger initialize - }); + await initIteratorMode(changeStream); - changeStream.cursor.on('init', () => { - collection.insertOne({ b: 2 }, (err, res) => { - expect(err).to.be.undefined; - expect(res).to.exist; - - changeStream.next(err => { - expect(err).to.exist; - changeStream.close(() => { - client.close(() => { - done(); - }); - }); - }); - }); - }); - }); + const res = await collection.insertOne({ b: 2 }); + expect(res).to.exist; + + const err = await changeStream.next().catch(e => e); + expect(err).to.exist; + await changeStream.close(); + await client.close(); } }); @@ -1291,7 +1267,7 @@ describe('Change Streams', function () { await mock.cleanup(); }); - it('changeStream should close if cursor id for initial aggregate is Long.ZERO', function (done) { + it('changeStream should close if cursor id for initial aggregate is Long.ZERO', async function () { mockServer.setMessageHandler(req => { const doc = req.document; if (isHello(doc)) { @@ -1320,17 +1296,16 @@ describe('Change Streams', function () { const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`, { serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed }); - client.connect(err => { - expect(err).to.not.exist; - const collection = client.db('cs').collection('test'); - const changeStream = collection.watch(); - changeStream.next((err, doc) => { - expect(err).to.exist; - expect(doc).to.not.exist; - expect(err?.message).to.equal('ChangeStream is closed'); - changeStream.close(() => client.close(done)); - }); - }); + await client.connect(); + const collection = client.db('cs').collection('test'); + const changeStream = collection.watch(); + + const err = await changeStream.next().catch(e => e); + expect(err).to.exist; + expect(err?.message).to.equal('ChangeStream is closed'); + + await changeStream.close(); + await client.close(); }); }); }); diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 6b4110f9cc..d13acc73d0 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { once } from 'events'; +import { on, once } from 'events'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; @@ -57,9 +57,7 @@ function triggerResumableError( nextStub.restore(); }); - changeStream.next(() => { - // ignore - }); + changeStream.next(); } if (typeof delay === 'number') { @@ -78,12 +76,6 @@ const initIteratorMode = async (cs: ChangeStream) => { return; }; -/** Waits for a change stream to start */ -async function waitForStarted(changeStream, callback) { - await once(changeStream.cursor, 'init'); - await callback(); -} - describe('Change Stream prose tests', function () { before(async function () { return await setupDatabase(this.configuration, ['integration_tests']); @@ -643,12 +635,10 @@ describe('Change Stream prose tests', function () { // when resuming a change stream. it('$changeStream with results must include resumeAfter and not startAfter', { metadata: { requires: { topology: 'replicaset' } }, - test: function (done) { + test: async function () { let events = []; client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); - changeStream.on('error', done); - this.defer(() => changeStream.close()); changeStream.on('change', change => { events.push({ change: { insert: { x: change.fullDocument.x } } }); @@ -658,21 +648,22 @@ describe('Change Stream prose tests', function () { events = []; triggerResumableError(changeStream, () => events.push('error')); break; - case 3: - expect(events).to.be.an('array').with.lengthOf(3); - expect(events[0]).to.equal('error'); - expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist; - expect(events[2]).to.eql({ change: { insert: { x: 3 } } }); - done(); - break; } }); - waitForStarted(changeStream, () => - coll - .insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }) - .then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } })) - ); + await once(changeStream.cursor, 'init'); + const changes = on(changeStream, 'change'); + await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }); + await changes.next(); + await coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }); + await changes.next(); + + expect(events).to.be.an('array').with.lengthOf(3); + expect(events[0]).to.equal('error'); + expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist; + expect(events[2]).to.eql({ change: { insert: { x: 3 } } }); + + await changeStream.close(); } }); });