From 96abcd9a3f37fb19fc829f38386a658c1ad21d36 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 11 Sep 2024 13:10:58 -0400 Subject: [PATCH 1/4] test(NODE-5788): rework change stream close rejection test --- .../change-streams/change_stream.test.ts | 123 +++--------------- 1 file changed, 16 insertions(+), 107 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index baabdcb3b23..ea989f72758 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -773,115 +773,24 @@ describe('Change Streams', function () { }); }); - describe('should properly handle a changeStream event being processed mid-close', function () { - let client, coll, changeStream; - - function write() { - return Promise.resolve() - .then(() => coll.insertOne({ a: 1 })) - .then(() => coll.insertOne({ b: 2 })); - } - - function lastWrite() { - return coll.insertOne({ c: 3 }); - } - - beforeEach(function () { - client = this.configuration.newClient(); - return client.connect().then(_client => { - client = _client; - coll = client.db(this.configuration.db).collection('tester'); - changeStream = coll.watch(); - }); - }); - - afterEach(async function () { - await changeStream?.close(); - await client?.close(); - coll = undefined; - changeStream = undefined; - client = undefined; - }); - - it('when invoked with promises', { - metadata: { requires: { topology: 'replicaset' } }, - test: function () { - const read = () => { - return Promise.resolve() - .then(() => changeStream.next()) - .then(() => changeStream.next()) - .then(() => { - this.defer(lastWrite()); - const nextP = changeStream.next(); - return changeStream.close().then(() => nextP); - }); - }; - - return Promise.all([read(), write()]).then( - () => Promise.reject(new Error('Expected operation to fail with error')), - err => expect(err.message).to.equal('ChangeStream is closed') - ); - } - }); - - it('when invoked with callbacks', { - metadata: { requires: { topology: 'replicaset' } }, - test: function (done) { - const ops = []; - changeStream.next(() => { - changeStream.next(() => { - ops.push(lastWrite()); - - // explicitly close the change stream after the write has begun - ops.push(changeStream.close()); - - changeStream.next(err => { - try { - expect(err) - .property('message') - .to.match(/ChangeStream is closed/); - Promise.all(ops).then(() => done(), done); - } catch (e) { - done(e); - } - }); - }); - }); - - ops.push( - write().catch(() => { - // ignore - }) - ); - } - }); - - it.skip('when invoked using eventEmitter API', { - metadata: { - requires: { topology: 'replicaset' } - }, - async test() { - const changes = on(changeStream, 'change'); - await once(changeStream.cursor, 'init'); - - await write(); - await lastWrite().catch(() => null); - - let counter = 0; - - for await (const _ of changes) { - counter += 1; - if (counter === 2) { - await changeStream.close(); - break; - } + describe('when close is called while changes are pending', function () { + it( + 'rejects promises already returned by next', + { requires: { topology: 'replicaset' } }, + async function () { + const changes = Array.from({ length: 20 }, () => changeStream.next()); + await changeStream.close(); + const results = await Promise.allSettled(changes); + + for (const i of changes.keys()) { + expect(results) + .to.have.nested.property(`[${i}].reason`) + .that.is.instanceOf(MongoAPIError); + const message = /ChangeStream is closed/i; + expect(results).nested.property(`[${i}].reason`).to.match(message); } - - const result = await Promise.race([changes.next(), sleep(800).then(() => 42)]); - expect(result, 'should not have recieved a third event').to.equal(42); } - }).skipReason = - 'This test only worked because of timing, changeStream.close does not remove the change listener'; + ); }); describe('iterator api', function () { From 96c8f496f34ef88495c9f5e70f6962a2da05eff0 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 11 Sep 2024 15:50:28 -0400 Subject: [PATCH 2/4] chore: add more tests --- .../change-streams/change_stream.test.ts | 88 ++++++++++++++++++- 1 file changed, 86 insertions(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index ea989f72758..09a2b6fd8ed 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -4,7 +4,7 @@ import { on, once } from 'events'; import { gte, lt } from 'semver'; import * as sinon from 'sinon'; import { PassThrough } from 'stream'; -import { setTimeout } from 'timers'; +import { clearTimeout, setTimeout } from 'timers'; import { type ChangeStream, @@ -773,7 +773,50 @@ describe('Change Streams', function () { }); }); - describe('when close is called while changes are pending', function () { + describe.only('when close is called while changes are pending', function () { + let client; + let db; + let collection: Collection<{ insertCount: number }>; + let changeStream: ChangeStream<{ insertCount: number }>; + let insertInterval = undefined; + let insertCount = 0; + + /** insertOne every 300ms without running the next insert before the previous one completes */ + function setInsertInterval() { + // start an insert + // if first one, create a timeout and refresh + // if NOT first one, just refresh + collection?.insertOne({ insertCount: insertCount++ }).then(() => { + insertInterval ??= setTimeout(setInsertInterval, 300); + insertInterval.refresh(); + }); + } + + beforeEach(async function () { + client = this.configuration.newClient(); + await client.connect(); + db = client.db('test'); + collection = db.collection('test_close'); + await collection.drop().catch(() => null); + changeStream = collection.watch(); + + insertCount = 0; + setInsertInterval(); + }); + + afterEach(async function () { + clearTimeout(insertInterval); + await collection.drop().catch(() => null); + await client.close(); + + db = undefined; + client = undefined; + collection = undefined; + changeStream = undefined; + insertInterval = undefined; + insertCount = 0; + }); + it( 'rejects promises already returned by next', { requires: { topology: 'replicaset' } }, @@ -791,6 +834,47 @@ describe('Change Streams', function () { } } ); + + it( + 'rejects promises already returned by next after awaiting the first one', + { requires: { topology: 'replicaset' } }, + async function () { + const changes = Array.from({ length: 20 }, () => changeStream.next()); + await changes[0]; + const allChanges = Promise.allSettled(changes); + + await changeStream.close(); + + const results = await allChanges; + + const statuses = results.map(({ status }) => status); + expect(statuses).to.deep.equal([ + 'fulfilled', + ...Array.from({ length: 19 }, () => 'rejected') + ]); + } + ); + + it( + 'rejects promises already returned by next after awaiting half of them', + { requires: { topology: 'replicaset' } }, + async function () { + const changes = Array.from({ length: 20 }, () => changeStream.next()); + const allChanges = Promise.allSettled(changes); + + await Promise.allSettled(changes.slice(10)); + + await changeStream.close(); + + const results = await allChanges; + + const statuses = results.map(({ status }) => status); + expect(statuses).to.deep.equal([ + ...Array.from({ length: 10 }, () => 'fulfilled'), + ...Array.from({ length: 10 }, () => 'rejected') + ]); + } + ); }); describe('iterator api', function () { From a971fffe5c05dc02c22a1413b025ecbee983d32e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 11 Sep 2024 16:19:16 -0400 Subject: [PATCH 3/4] chore: make assertions clearer and skip parallel tests --- .../change-streams/change_stream.test.ts | 64 +++++++++++++------ 1 file changed, 46 insertions(+), 18 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 09a2b6fd8ed..761a97e18f6 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -773,7 +773,7 @@ describe('Change Streams', function () { }); }); - describe.only('when close is called while changes are pending', function () { + describe('when close is called while changes are pending', function () { let client; let db; let collection: Collection<{ insertCount: number }>; @@ -825,17 +825,23 @@ describe('Change Streams', function () { await changeStream.close(); const results = await Promise.allSettled(changes); - for (const i of changes.keys()) { - expect(results) - .to.have.nested.property(`[${i}].reason`) - .that.is.instanceOf(MongoAPIError); - const message = /ChangeStream is closed/i; - expect(results).nested.property(`[${i}].reason`).to.match(message); - } + const statuses = results.map(({ status, reason, value }) => { + const res = + status === 'rejected' + ? reason.message + : value.operationType === 'insert' + ? `insert count = ${value.fullDocument.insertCount}` + : null; + return `${status}:${res}`; + }); + + expect(statuses).to.deep.equal( + Array.from({ length: 20 }, () => 'rejected:ChangeStream is closed') + ); } ); - it( + it.skip( 'rejects promises already returned by next after awaiting the first one', { requires: { topology: 'replicaset' } }, async function () { @@ -847,15 +853,26 @@ describe('Change Streams', function () { const results = await allChanges; - const statuses = results.map(({ status }) => status); + const statuses = results.map(({ status, reason, value }) => { + const res = + status === 'rejected' + ? reason.message + : value.operationType === 'insert' + ? `insert count = ${value.fullDocument.insertCount}` + : null; + return `${status}:${res}`; + }); + + console.log(statuses); + expect(statuses).to.deep.equal([ - 'fulfilled', - ...Array.from({ length: 19 }, () => 'rejected') + 'fulfilled:insert count = 1', + ...Array.from({ length: 19 }, () => 'rejected:ChangeStream is closed') ]); } - ); + ).skipReason = 'TODO(NODE-5221): Parallel change streams and close are nondeterministic'; - it( + it.skip( 'rejects promises already returned by next after awaiting half of them', { requires: { topology: 'replicaset' } }, async function () { @@ -868,13 +885,24 @@ describe('Change Streams', function () { const results = await allChanges; - const statuses = results.map(({ status }) => status); + const statuses = results.map(({ status, reason, value }) => { + const res = + status === 'rejected' + ? reason.message + : value.operationType === 'insert' + ? `insert count = ${value.fullDocument.insertCount}` + : null; + return `${status}:${res}`; + }); + + console.log(statuses); + expect(statuses).to.deep.equal([ - ...Array.from({ length: 10 }, () => 'fulfilled'), - ...Array.from({ length: 10 }, () => 'rejected') + ...Array.from({ length: 1 }, () => 'fulfilled:insert count = 0'), + ...Array.from({ length: 19 }, () => 'fulfilled:insert count = 1') ]); } - ); + ).skipReason = 'TODO(NODE-5221): Parallel change streams and close are nondeterministic'; }); describe('iterator api', function () { From ffcf61e2e7bcad9f84509cb5f7bfff8c878383b1 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 11 Sep 2024 17:02:15 -0400 Subject: [PATCH 4/4] chore: lint --- .../integration/change-streams/change_stream.test.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 761a97e18f6..5b402ca1803 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -830,8 +830,8 @@ describe('Change Streams', function () { status === 'rejected' ? reason.message : value.operationType === 'insert' - ? `insert count = ${value.fullDocument.insertCount}` - : null; + ? `insert count = ${value.fullDocument.insertCount}` + : null; return `${status}:${res}`; }); @@ -858,8 +858,8 @@ describe('Change Streams', function () { status === 'rejected' ? reason.message : value.operationType === 'insert' - ? `insert count = ${value.fullDocument.insertCount}` - : null; + ? `insert count = ${value.fullDocument.insertCount}` + : null; return `${status}:${res}`; }); @@ -890,8 +890,8 @@ describe('Change Streams', function () { status === 'rejected' ? reason.message : value.operationType === 'insert' - ? `insert count = ${value.fullDocument.insertCount}` - : null; + ? `insert count = ${value.fullDocument.insertCount}` + : null; return `${status}:${res}`; });