From 119bb391736570768aeaf85f4ffd2a3a84473015 Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Wed, 24 Sep 2025 07:51:31 -0700 Subject: [PATCH 1/7] more migrations and lots of logging for a test that i'm having issues porting to async --- .../change-streams/change_stream.test.ts | 140 ++++++++++-------- .../change_streams.prose.test.ts | 46 ++---- 2 files changed, 90 insertions(+), 96 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 32da9530831..64e791589d2 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1,5 +1,5 @@ import { strict as assert } from 'assert'; -import { UUID } from 'bson'; +import { Document, Long, UUID } from 'bson'; import { expect } from 'chai'; import { on, once } from 'events'; import { gte, lt } from 'semver'; @@ -7,27 +7,19 @@ import * as sinon from 'sinon'; import { PassThrough } from 'stream'; import { setTimeout } from 'timers'; -import { - type ChangeStream, - type ChangeStreamDocument, - type ChangeStreamOptions, - type Collection, - type CommandStartedEvent, - type Db, - isHello, - LEGACY_HELLO_COMMAND, - Long, - MongoAPIError, - MongoChangeStreamError, - type MongoClient, - MongoServerError, - ReadPreference, - type ResumeToken -} from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder'; import { type FailCommandFailPoint, sleep } from '../../tools/utils'; import { delay, filterForCommands } from '../shared'; +import { ChangeStream, ChangeStreamDocument, ChangeStreamOptions, ResumeToken } from '../../../src/change_stream'; +import { MongoClient } from '../../../src/mongo_client'; +import { ReadPreference } from '../../../src/read_preference'; +import { Collection } from '../../../src/collection'; +import { Db } from '../../../src/db'; +import { CommandStartedEvent } from '../../../src/cmap/command_monitoring_events'; +import { isHello } from '../../../src/utils'; +import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; +import { MongoAPIError, MongoChangeStreamError, MongoServerError } from '../../../src/error'; const initIteratorMode = async (cs: ChangeStream) => { const initEvent = once(cs.cursor, 'init'); @@ -323,30 +315,30 @@ describe('Change Streams', function () { it('should properly close ChangeStream cursor', { metadata: { requires: { topology: 'replicaset' } }, - test: function (done) { + test: async function () { const configuration = this.configuration; const client = configuration.newClient(); + let changeStream: ChangeStream>; - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); - + try { + await client.connect(); const database = client.db('integration_tests'); const changeStream = database.collection('changeStreamCloseTest').watch(pipeline); - this.defer(() => changeStream.close()); assert.equal(changeStream.closed, false); assert.equal(changeStream.cursor.closed, false); - changeStream.close(err => { - expect(err).to.not.exist; + await changeStream.close(); - // Check the cursor is closed - expect(changeStream.closed).to.be.true; - expect(changeStream.cursor).property('closed', true); - done(); - }); - }); + // Check the cursor is closed + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); + } finally { + await client.close(); + if (changeStream) { + await changeStream.close(); + } + } } }); @@ -355,32 +347,34 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset' } }, - test: function (done) { + test: async function () { const configuration = this.configuration; const client = configuration.newClient(); + let changeStream: ChangeStream>; - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); + await client.connect(); - const forbiddenStage = {}; - const forbiddenStageName = '$alksdjfhlaskdfjh'; - forbiddenStage[forbiddenStageName] = 2; + const forbiddenStage = {}; + const forbiddenStageName = '$alksdjfhlaskdfjh'; + forbiddenStage[forbiddenStageName] = 2; + try { const database = client.db('integration_tests'); - const changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]); - this.defer(() => changeStream.close()); - - changeStream.next(err => { - assert.ok(err); - assert.ok(err.message); - assert.ok( - err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1 - ); + changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]); - done(); - }); - }); + await changeStream.next(); + } catch (err) { + assert.ok(err); + assert.ok(err.message); + assert.ok( + err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1 + ); + } finally { + await client.close(); + if (changeStream) { + await changeStream.close(); + } + } } } ); @@ -457,32 +451,49 @@ describe('Change Streams', function () { }); }); - it('should error if resume token projected out of change stream document using iterator', { + it.only('should error if resume token projected out of change stream document using iterator', { metadata: { requires: { topology: 'replicaset' } }, test(done) { const configuration = this.configuration; const client = configuration.newClient(); + console.log(`pavel >>> 1 before all`); + client.connect((err, client) => { expect(err).to.not.exist; + console.log(`pavel >>> 2 after connect`); + const database = client.db('integration_tests'); const collection = database.collection('resumetokenProjectedOutCallback'); const changeStream = collection.watch([{ $project: { _id: false } }]); + console.log(`pavel >>> 3 before hasNext`); + changeStream.hasNext(() => { // trigger initialize + console.log(`pavel >>> ? trigger initialize?`); }); + console.log(`pavel >>> 4 after hasNext`); + changeStream.cursor.on('init', () => { + console.log(`pavel >>> 5 init`); collection.insertOne({ b: 2 }, (err, res) => { + console.log(`pavel >>> 6 after insertOne`); expect(err).to.be.undefined; expect(res).to.exist; + console.log(`pavel >>> 7 before next`); changeStream.next(err => { + console.log(`pavel >>> 8 after next`); expect(err).to.exist; + console.log(`pavel >>> 9 before close`); changeStream.close(() => { + console.log(`pavel >>> 10 after close change stream`); client.close(() => { + console.log(`pavel >>> 11 after close client`); + console.log("pavel >>> DONE"); done(); }); }); @@ -1290,7 +1301,7 @@ describe('Change Streams', function () { await mock.cleanup(); }); - it('changeStream should close if cursor id for initial aggregate is Long.ZERO', function (done) { + it('changeStream should close if cursor id for initial aggregate is Long.ZERO', async function () { mockServer.setMessageHandler(req => { const doc = req.document; if (isHello(doc)) { @@ -1319,17 +1330,18 @@ describe('Change Streams', function () { const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`, { serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed }); - client.connect(err => { - expect(err).to.not.exist; - const collection = client.db('cs').collection('test'); - const changeStream = collection.watch(); - changeStream.next((err, doc) => { - expect(err).to.exist; - expect(doc).to.not.exist; - expect(err?.message).to.equal('ChangeStream is closed'); - changeStream.close(() => client.close(done)); - }); - }); + await client.connect(); + const collection = client.db('cs').collection('test'); + const changeStream = collection.watch(); + try { + await changeStream.next(); + } catch (err) { + expect(err).to.exist; + expect(err?.message).to.equal('ChangeStream is closed'); + } finally { + await changeStream.close(); + await client.close(); + } }); }); }); diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 6b4110f9cc0..b887233c158 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -3,22 +3,15 @@ import { once } from 'events'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; -import { - type ChangeStream, - type Collection, - type CommandFailedEvent, - type CommandStartedEvent, - type CommandSucceededEvent, - type Document, - LEGACY_HELLO_COMMAND, - Long, - type MongoClient, - MongoNetworkError, - ObjectId, - Timestamp -} from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; import { setupDatabase } from '../shared'; +import { ChangeStream } from '../../../src/change_stream'; +import { MongoNetworkError } from '../../../src/error'; +import { Collection } from '../../../src/collection'; +import { MongoClient } from '../../../src/mongo_client'; +import { Document, Long, ObjectId, Timestamp } from 'bson'; +import { CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent } from '../../../src/cmap/command_monitoring_events'; +import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; /** * Triggers a fake resumable error on a change stream @@ -57,9 +50,7 @@ function triggerResumableError( nextStub.restore(); }); - changeStream.next(() => { - // ignore - }); + changeStream.next(); } if (typeof delay === 'number') { @@ -78,12 +69,6 @@ const initIteratorMode = async (cs: ChangeStream) => { return; }; -/** Waits for a change stream to start */ -async function waitForStarted(changeStream, callback) { - await once(changeStream.cursor, 'init'); - await callback(); -} - describe('Change Stream prose tests', function () { before(async function () { return await setupDatabase(this.configuration, ['integration_tests']); @@ -643,12 +628,12 @@ describe('Change Stream prose tests', function () { // when resuming a change stream. it('$changeStream with results must include resumeAfter and not startAfter', { metadata: { requires: { topology: 'replicaset' } }, - test: function (done) { + test: async function () { let events = []; client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); - changeStream.on('error', done); - this.defer(() => changeStream.close()); + // changeStream.on('error', done); + // this.defer(() => changeStream.close()); changeStream.on('change', change => { events.push({ change: { insert: { x: change.fullDocument.x } } }); @@ -663,16 +648,13 @@ describe('Change Stream prose tests', function () { expect(events[0]).to.equal('error'); expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist; expect(events[2]).to.eql({ change: { insert: { x: 3 } } }); - done(); break; } }); - waitForStarted(changeStream, () => - coll - .insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }) - .then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } })) - ); + await once(changeStream.cursor, 'init'); + await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }); + await coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }); } }); }); From 67cca6f4ea0f92199b5aec99ef19efef1dddea3e Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Wed, 24 Sep 2025 15:34:24 -0700 Subject: [PATCH 2/7] fix: got all tests working --- .../change-streams/change_stream.test.ts | 56 +++++-------------- .../change_streams.prose.test.ts | 5 +- 2 files changed, 17 insertions(+), 44 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 64e791589d2..a6755b0f21d 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -451,56 +451,28 @@ describe('Change Streams', function () { }); }); - it.only('should error if resume token projected out of change stream document using iterator', { + it('should error if resume token projected out of change stream document using iterator', { metadata: { requires: { topology: 'replicaset' } }, - test(done) { + async test() { const configuration = this.configuration; const client = configuration.newClient(); - console.log(`pavel >>> 1 before all`); - - client.connect((err, client) => { - expect(err).to.not.exist; + await client.connect(); - console.log(`pavel >>> 2 after connect`); + const database = client.db('integration_tests'); + const collection = database.collection('resumetokenProjectedOutCallback'); + const changeStream = collection.watch([{ $project: { _id: false } }]); - const database = client.db('integration_tests'); - const collection = database.collection('resumetokenProjectedOutCallback'); - const changeStream = collection.watch([{ $project: { _id: false } }]); + await initIteratorMode(changeStream); - console.log(`pavel >>> 3 before hasNext`); + const res = await collection.insertOne({ b: 2 }); + expect(res).to.exist; - changeStream.hasNext(() => { - // trigger initialize - console.log(`pavel >>> ? trigger initialize?`); - }); - - console.log(`pavel >>> 4 after hasNext`); - - changeStream.cursor.on('init', () => { - console.log(`pavel >>> 5 init`); - collection.insertOne({ b: 2 }, (err, res) => { - console.log(`pavel >>> 6 after insertOne`); - expect(err).to.be.undefined; - expect(res).to.exist; - - console.log(`pavel >>> 7 before next`); - changeStream.next(err => { - console.log(`pavel >>> 8 after next`); - expect(err).to.exist; - console.log(`pavel >>> 9 before close`); - changeStream.close(() => { - console.log(`pavel >>> 10 after close change stream`); - client.close(() => { - console.log(`pavel >>> 11 after close client`); - console.log("pavel >>> DONE"); - done(); - }); - }); - }); - }); - }); - }); + const err = await changeStream.next().catch(e => e); + expect(err).to.exist; + console.log(`pavel >>> ${JSON.stringify(err)}`); + await changeStream.close(); + await client.close(); } }); diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index b887233c158..1bc57a5eb65 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -632,8 +632,9 @@ describe('Change Stream prose tests', function () { let events = []; client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); - // changeStream.on('error', done); - // this.defer(() => changeStream.close()); + changeStream.on('error', async () => { + await changeStream.close(); + }); changeStream.on('change', change => { events.push({ change: { insert: { x: change.fullDocument.x } } }); From 0087f5561d629442cd6fba43a272267ead34c88f Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Thu, 25 Sep 2025 13:16:24 -0700 Subject: [PATCH 3/7] lint fix --- .../change-streams/change_stream.test.ts | 31 +++++++++++-------- .../change_streams.prose.test.ts | 18 ++++++----- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a6755b0f21d..177226218c0 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1,5 +1,5 @@ import { strict as assert } from 'assert'; -import { Document, Long, UUID } from 'bson'; +import { type Document, Long, UUID } from 'bson'; import { expect } from 'chai'; import { on, once } from 'events'; import { gte, lt } from 'semver'; @@ -7,19 +7,24 @@ import * as sinon from 'sinon'; import { PassThrough } from 'stream'; import { setTimeout } from 'timers'; +import { + type ChangeStream, + type ChangeStreamDocument, + type ChangeStreamOptions, + type ResumeToken +} from '../../../src/change_stream'; +import { type CommandStartedEvent } from '../../../src/cmap/command_monitoring_events'; +import { type Collection } from '../../../src/collection'; +import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; +import { type Db } from '../../../src/db'; +import { MongoAPIError, MongoChangeStreamError, MongoServerError } from '../../../src/error'; +import { type MongoClient } from '../../../src/mongo_client'; +import { ReadPreference } from '../../../src/read_preference'; +import { isHello } from '../../../src/utils'; import * as mock from '../../tools/mongodb-mock/index'; import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder'; import { type FailCommandFailPoint, sleep } from '../../tools/utils'; import { delay, filterForCommands } from '../shared'; -import { ChangeStream, ChangeStreamDocument, ChangeStreamOptions, ResumeToken } from '../../../src/change_stream'; -import { MongoClient } from '../../../src/mongo_client'; -import { ReadPreference } from '../../../src/read_preference'; -import { Collection } from '../../../src/collection'; -import { Db } from '../../../src/db'; -import { CommandStartedEvent } from '../../../src/cmap/command_monitoring_events'; -import { isHello } from '../../../src/utils'; -import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; -import { MongoAPIError, MongoChangeStreamError, MongoServerError } from '../../../src/error'; const initIteratorMode = async (cs: ChangeStream) => { const initEvent = once(cs.cursor, 'init'); @@ -469,8 +474,8 @@ describe('Change Streams', function () { expect(res).to.exist; const err = await changeStream.next().catch(e => e); - expect(err).to.exist; - console.log(`pavel >>> ${JSON.stringify(err)}`); + expect(err).to.exist; + console.log(`pavel >>> ${JSON.stringify(err)}`); await changeStream.close(); await client.close(); } @@ -1306,7 +1311,7 @@ describe('Change Streams', function () { const collection = client.db('cs').collection('test'); const changeStream = collection.watch(); try { - await changeStream.next(); + await changeStream.next(); } catch (err) { expect(err).to.exist; expect(err?.message).to.equal('ChangeStream is closed'); diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 1bc57a5eb65..59e9d9f674d 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -1,17 +1,21 @@ +import { type Document, Long, ObjectId, Timestamp } from 'bson'; import { expect } from 'chai'; import { once } from 'events'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; +import { type ChangeStream } from '../../../src/change_stream'; +import { + type CommandFailedEvent, + type CommandStartedEvent, + type CommandSucceededEvent +} from '../../../src/cmap/command_monitoring_events'; +import { type Collection } from '../../../src/collection'; +import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; +import { MongoNetworkError } from '../../../src/error'; +import { type MongoClient } from '../../../src/mongo_client'; import * as mock from '../../tools/mongodb-mock/index'; import { setupDatabase } from '../shared'; -import { ChangeStream } from '../../../src/change_stream'; -import { MongoNetworkError } from '../../../src/error'; -import { Collection } from '../../../src/collection'; -import { MongoClient } from '../../../src/mongo_client'; -import { Document, Long, ObjectId, Timestamp } from 'bson'; -import { CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent } from '../../../src/cmap/command_monitoring_events'; -import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; /** * Triggers a fake resumable error on a change stream From 531f63860995812e4babd862821318352da3682f Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Thu, 25 Sep 2025 13:34:22 -0700 Subject: [PATCH 4/7] minor refactor and lint fixes --- .../change-streams/change_stream.test.ts | 75 ++++++++----------- 1 file changed, 31 insertions(+), 44 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 177226218c0..24776875957 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1,5 +1,5 @@ import { strict as assert } from 'assert'; -import { type Document, Long, UUID } from 'bson'; +import { Long, UUID } from 'bson'; import { expect } from 'chai'; import { on, once } from 'events'; import { gte, lt } from 'semver'; @@ -323,27 +323,22 @@ describe('Change Streams', function () { test: async function () { const configuration = this.configuration; const client = configuration.newClient(); - let changeStream: ChangeStream>; - try { - await client.connect(); - const database = client.db('integration_tests'); - const changeStream = database.collection('changeStreamCloseTest').watch(pipeline); + await client.connect(); + const database = client.db('integration_tests'); + const changeStream = database.collection('changeStreamCloseTest').watch(pipeline); - assert.equal(changeStream.closed, false); - assert.equal(changeStream.cursor.closed, false); + assert.equal(changeStream.closed, false); + assert.equal(changeStream.cursor.closed, false); - await changeStream.close(); + await changeStream.close(); - // Check the cursor is closed - expect(changeStream.closed).to.be.true; - expect(changeStream.cursor).property('closed', true); - } finally { - await client.close(); - if (changeStream) { - await changeStream.close(); - } - } + // Check the cursor is closed + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); + + await changeStream.close(); + await client.close(); } }); @@ -355,7 +350,6 @@ describe('Change Streams', function () { test: async function () { const configuration = this.configuration; const client = configuration.newClient(); - let changeStream: ChangeStream>; await client.connect(); @@ -363,23 +357,18 @@ describe('Change Streams', function () { const forbiddenStageName = '$alksdjfhlaskdfjh'; forbiddenStage[forbiddenStageName] = 2; - try { - const database = client.db('integration_tests'); - changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]); + const database = client.db('integration_tests'); + const changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]); + + const err = await changeStream.next().catch(e => e); + assert.ok(err); + assert.ok(err.message); + assert.ok( + err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1 + ); - await changeStream.next(); - } catch (err) { - assert.ok(err); - assert.ok(err.message); - assert.ok( - err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1 - ); - } finally { - await client.close(); - if (changeStream) { - await changeStream.close(); - } - } + await changeStream.close(); + await client.close(); } } ); @@ -1310,15 +1299,13 @@ describe('Change Streams', function () { await client.connect(); const collection = client.db('cs').collection('test'); const changeStream = collection.watch(); - try { - await changeStream.next(); - } catch (err) { - expect(err).to.exist; - expect(err?.message).to.equal('ChangeStream is closed'); - } finally { - await changeStream.close(); - await client.close(); - } + + const err = await changeStream.next().catch(e => e); + expect(err).to.exist; + expect(err?.message).to.equal('ChangeStream is closed'); + + await changeStream.close(); + await client.close(); }); }); }); From 742b50cd4c505806f22a59dee9f14c6b6027d393 Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Thu, 25 Sep 2025 13:38:43 -0700 Subject: [PATCH 5/7] remove debug statement --- test/integration/change-streams/change_stream.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 24776875957..6da57f4ad9e 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -464,7 +464,6 @@ describe('Change Streams', function () { const err = await changeStream.next().catch(e => e); expect(err).to.exist; - console.log(`pavel >>> ${JSON.stringify(err)}`); await changeStream.close(); await client.close(); } From 6644334c2480670491a6526d595c00b0066eec2f Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Mon, 29 Sep 2025 14:00:06 -0700 Subject: [PATCH 6/7] pr feedback --- .../change_streams.prose.test.ts | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 59e9d9f674d..6fd02fb15b1 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -1,19 +1,22 @@ -import { type Document, Long, ObjectId, Timestamp } from 'bson'; import { expect } from 'chai'; import { once } from 'events'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; -import { type ChangeStream } from '../../../src/change_stream'; import { + type ChangeStream, + type Collection, type CommandFailedEvent, type CommandStartedEvent, - type CommandSucceededEvent -} from '../../../src/cmap/command_monitoring_events'; -import { type Collection } from '../../../src/collection'; -import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; -import { MongoNetworkError } from '../../../src/error'; -import { type MongoClient } from '../../../src/mongo_client'; + type CommandSucceededEvent, + type Document, + LEGACY_HELLO_COMMAND, + Long, + type MongoClient, + MongoNetworkError, + ObjectId, + Timestamp +} from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; import { setupDatabase } from '../shared'; @@ -636,9 +639,6 @@ describe('Change Stream prose tests', function () { let events = []; client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); - changeStream.on('error', async () => { - await changeStream.close(); - }); changeStream.on('change', change => { events.push({ change: { insert: { x: change.fullDocument.x } } }); @@ -648,18 +648,20 @@ describe('Change Stream prose tests', function () { events = []; triggerResumableError(changeStream, () => events.push('error')); break; - case 3: - expect(events).to.be.an('array').with.lengthOf(3); - expect(events[0]).to.equal('error'); - expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist; - expect(events[2]).to.eql({ change: { insert: { x: 3 } } }); - break; } }); await once(changeStream.cursor, 'init'); await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }); await coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }); + await once(changeStream, 'change'); + + expect(events).to.be.an('array').with.lengthOf(3); + expect(events[0]).to.equal('error'); + expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist; + expect(events[2]).to.eql({ change: { insert: { x: 3 } } }); + + await changeStream.close(); } }); }); From 0728326c3979c74a207572a568926a0d6703eb46 Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Wed, 1 Oct 2025 10:15:13 -0700 Subject: [PATCH 7/7] pr feedback --- .../integration/change-streams/change_streams.prose.test.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 6fd02fb15b1..d13acc73d09 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { once } from 'events'; +import { on, once } from 'events'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; @@ -652,9 +652,11 @@ describe('Change Stream prose tests', function () { }); await once(changeStream.cursor, 'init'); + const changes = on(changeStream, 'change'); await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }); + await changes.next(); await coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }); - await once(changeStream, 'change'); + await changes.next(); expect(events).to.be.an('array').with.lengthOf(3); expect(events[0]).to.equal('error');