Skip to content

Commit 8535a7c

Browse files
committed
test(NODE-6858): add tests for all iterator methods
1 parent 3f65dd7 commit 8535a7c

File tree

2 files changed

+114
-35
lines changed

2 files changed

+114
-35
lines changed

src/error.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,6 +1551,10 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean {
15511551
return true;
15521552
}
15531553

1554+
// if (error instanceof MongoServerSelectionError) {
1555+
// return true;
1556+
// }
1557+
15541558
if (wireVersion != null && wireVersion >= 9) {
15551559
// DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable
15561560
if (error.code === MONGODB_ERROR_CODES.CursorNotFound) {

test/integration/change-streams/change_stream.test.ts

Lines changed: 110 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import * as mock from '../../tools/mongodb-mock/index';
2727
import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder';
2828
import { type FailCommandFailPoint, sleep } from '../../tools/utils';
2929
import { delay, filterForCommands } from '../shared';
30+
import { UUID } from 'bson';
3031

3132
const initIteratorMode = async (cs: ChangeStream) => {
3233
const initEvent = once(cs.cursor, 'init');
@@ -2007,6 +2008,7 @@ describe.only('ChangeStream resumability', function () {
20072008
let collection: Collection;
20082009
let changeStream: ChangeStream;
20092010
let aggregateEvents: CommandStartedEvent[] = [];
2011+
let appName: string;
20102012

20112013
const changeStreamResumeOptions: ChangeStreamOptions = {
20122014
fullDocument: 'updateLookup',
@@ -2065,7 +2067,15 @@ describe.only('ChangeStream resumability', function () {
20652067
await utilClient.db(dbName).createCollection(collectionName);
20662068
await utilClient.close();
20672069

2068-
client = this.configuration.newClient({ monitorCommands: true });
2070+
// we are going to switch primary in tests and cleanup of failpoints is difficult,
2071+
// so generating unique appname instead of cleaning for each test is an easier solution
2072+
appName = new UUID().toString();
2073+
2074+
client = this.configuration.newClient({
2075+
monitorCommands: true,
2076+
serverSelectionTimeoutMS: 5_000,
2077+
appName: appName
2078+
});
20692079
client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents));
20702080
collection = client.db(dbName).collection(collectionName);
20712081
});
@@ -2230,61 +2240,42 @@ describe.only('ChangeStream resumability', function () {
22302240
});
22312241
});
22322242

2233-
context.only('when the error is not a server error', function () {
2234-
let client1: MongoClient;
2235-
let client2: MongoClient;
2236-
2237-
beforeEach(async function () {
2238-
client1 = this.configuration.newClient(
2239-
{},
2240-
{ serverSelectionTimeoutMS: 1000, appName: 'client-errors' }
2241-
);
2242-
client2 = this.configuration.newClient();
2243-
2244-
collection = client1.db('client-errors').collection('test');
2245-
});
2246-
2247-
afterEach(async function () {
2248-
await client2.db('admin').command({
2249-
configureFailPoint: 'failCommand',
2250-
mode: 'off',
2251-
data: { appName: 'client-errors' }
2252-
} as FailCommandFailPoint);
2253-
2254-
await client1?.close();
2255-
await client2?.close();
2256-
});
2257-
2243+
context('when the error is not a server error', function () {
2244+
// This test requires a replica set to call replSetFreeze command
22582245
it(
22592246
'should resume on ServerSelectionError',
2260-
{ requires: { topology: '!single' } },
2247+
{ requires: { topology: ['replicaset'] } },
22612248
async function () {
22622249
changeStream = collection.watch([]);
22632250
await initIteratorMode(changeStream);
22642251

22652252
await collection.insertOne({ a: 1 });
22662253

2267-
await client2.db('admin').command({
2254+
// mimic the node termination by closing the connection and failing on heartbeat
2255+
await client.db('admin').command({
22682256
configureFailPoint: 'failCommand',
22692257
mode: 'alwaysOn',
22702258
data: {
22712259
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
22722260
closeConnection: true,
22732261
handshakeCommands: true,
22742262
failInternalCommands: true,
2275-
appName: 'client-errors'
2263+
appName: appName
22762264
}
22772265
} as FailCommandFailPoint);
2278-
await client2
2279-
.db('admin')
2280-
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.secondary });
2281-
await client2
2266+
// force new election in the cluster
2267+
await client
22822268
.db('admin')
2283-
.command({ replSetStepDown: 15, secondaryCatchUpPeriodSecs: 10, force: true });
2284-
// await sleep(15_000);
2269+
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2270+
await client.db('admin').command({ replSetStepDown: 30, force: true });
2271+
await sleep(1500);
22852272

22862273
const change = await changeStream.next();
22872274
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } });
2275+
2276+
expect(aggregateEvents).to.have.lengthOf(2);
2277+
const [e1, e2] = aggregateEvents;
2278+
expect(e1.address).to.not.equal(e2.address);
22882279
}
22892280
);
22902281
});
@@ -2601,6 +2592,46 @@ describe.only('ChangeStream resumability', function () {
26012592
expect(changeStream.closed).to.be.true;
26022593
});
26032594
});
2595+
2596+
context('when the error is not a server error', function () {
2597+
// This test requires a replica set to call replSetFreeze command
2598+
it(
2599+
'should resume on ServerSelectionError',
2600+
{ requires: { topology: ['replicaset'] } },
2601+
async function () {
2602+
changeStream = collection.watch([]);
2603+
await initIteratorMode(changeStream);
2604+
2605+
await collection.insertOne({ a: 1 });
2606+
2607+
// mimic the node termination by closing the connection and failing on heartbeat
2608+
await client.db('admin').command({
2609+
configureFailPoint: 'failCommand',
2610+
mode: 'alwaysOn',
2611+
data: {
2612+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
2613+
closeConnection: true,
2614+
handshakeCommands: true,
2615+
failInternalCommands: true,
2616+
appName: appName
2617+
}
2618+
} as FailCommandFailPoint);
2619+
// force new election in the cluster
2620+
await client
2621+
.db('admin')
2622+
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2623+
await client.db('admin').command({ replSetStepDown: 30, force: true });
2624+
await sleep(1500);
2625+
2626+
const change = await changeStream.tryNext();
2627+
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } });
2628+
2629+
expect(aggregateEvents).to.have.lengthOf(2);
2630+
const [e1, e2] = aggregateEvents;
2631+
expect(e1.address).to.not.equal(e2.address);
2632+
}
2633+
);
2634+
});
26042635
});
26052636

26062637
context('#asyncIterator', function () {
@@ -2737,6 +2768,50 @@ describe.only('ChangeStream resumability', function () {
27372768
}
27382769
});
27392770
});
2771+
2772+
context('when the error is not a server error', function () {
2773+
// This test requires a replica set to call replSetFreeze command
2774+
it(
2775+
'should resume on ServerSelectionError',
2776+
{ requires: { topology: ['replicaset'] } },
2777+
async function () {
2778+
changeStream = collection.watch([]);
2779+
await initIteratorMode(changeStream);
2780+
const changeStreamIterator = changeStream[Symbol.asyncIterator]();
2781+
2782+
await collection.insertOne({ a: 1 });
2783+
2784+
// mimic the node termination by closing the connection and failing on heartbeat
2785+
await client.db('admin').command({
2786+
configureFailPoint: 'failCommand',
2787+
mode: 'alwaysOn',
2788+
data: {
2789+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
2790+
closeConnection: true,
2791+
handshakeCommands: true,
2792+
failInternalCommands: true,
2793+
appName: appName
2794+
}
2795+
} as FailCommandFailPoint);
2796+
// force new election in the cluster
2797+
await client
2798+
.db('admin')
2799+
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2800+
await client.db('admin').command({ replSetStepDown: 30, force: true });
2801+
await sleep(1500);
2802+
2803+
const change = await changeStreamIterator.next();
2804+
expect(change.value).to.containSubset({
2805+
operationType: 'insert',
2806+
fullDocument: { a: 1 }
2807+
});
2808+
2809+
expect(aggregateEvents).to.have.lengthOf(2);
2810+
const [e1, e2] = aggregateEvents;
2811+
expect(e1.address).to.not.equal(e2.address);
2812+
}
2813+
);
2814+
});
27402815
});
27412816
});
27422817

0 commit comments

Comments
 (0)