Skip to content

Commit 2561bc4

Browse files
committed
test(NODE-6858): add tests for event emmitter based iteration
1 parent c3ee219 commit 2561bc4

File tree

2 files changed

+56
-18
lines changed

2 files changed

+56
-18
lines changed

src/error.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,9 +1551,9 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean {
15511551
return true;
15521552
}
15531553

1554-
// if (error instanceof MongoServerSelectionError) {
1555-
// return true;
1556-
// }
1554+
if (error instanceof MongoServerSelectionError) {
1555+
return true;
1556+
}
15571557

15581558
if (wireVersion != null && wireVersion >= 9) {
15591559
// DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable

test/integration/change-streams/change_stream.test.ts

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2072,23 +2072,19 @@ describe.only('ChangeStream resumability', function () {
20722072

20732073
await collection.insertOne({ a: 1 });
20742074

2075-
// mimic the node termination by closing the connection and failing on heartbeat
20762075
await client.db('admin').command({
20772076
configureFailPoint: 'failCommand',
20782077
mode: 'alwaysOn',
20792078
data: {
20802079
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
20812080
closeConnection: true,
2082-
handshakeCommands: true,
2083-
failInternalCommands: true,
20842081
appName: appName
20852082
}
20862083
} as FailCommandFailPoint);
2087-
// force new election in the cluster
20882084
await client
20892085
.db('admin')
20902086
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2091-
await client.db('admin').command({ replSetStepDown: 30, force: true });
2087+
await client.db('admin').command({ replSetStepDown: 5, force: true });
20922088

20932089
await sleep(500);
20942090

@@ -2426,23 +2422,19 @@ describe.only('ChangeStream resumability', function () {
24262422

24272423
await collection.insertOne({ a: 1 });
24282424

2429-
// mimic the node termination by closing the connection and failing on heartbeat
24302425
await client.db('admin').command({
24312426
configureFailPoint: 'failCommand',
24322427
mode: 'alwaysOn',
24332428
data: {
24342429
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
24352430
closeConnection: true,
2436-
handshakeCommands: true,
2437-
failInternalCommands: true,
24382431
appName: appName
24392432
}
24402433
} as FailCommandFailPoint);
2441-
// force new election in the cluster
24422434
await client
24432435
.db('admin')
24442436
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2445-
await client.db('admin').command({ replSetStepDown: 30, force: true });
2437+
await client.db('admin').command({ replSetStepDown: 5, force: true });
24462438

24472439
await sleep(500);
24482440

@@ -2604,23 +2596,19 @@ describe.only('ChangeStream resumability', function () {
26042596

26052597
await collection.insertOne({ a: 1 });
26062598

2607-
// mimic the node termination by closing the connection and failing on heartbeat
26082599
await client.db('admin').command({
26092600
configureFailPoint: 'failCommand',
26102601
mode: 'alwaysOn',
26112602
data: {
26122603
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
26132604
closeConnection: true,
2614-
handshakeCommands: true,
2615-
failInternalCommands: true,
26162605
appName: appName
26172606
}
26182607
} as FailCommandFailPoint);
2619-
// force new election in the cluster
26202608
await client
26212609
.db('admin')
26222610
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2623-
await client.db('admin').command({ replSetStepDown: 30, force: true });
2611+
await client.db('admin').command({ replSetStepDown: 5, force: true });
26242612

26252613
await sleep(500);
26262614

@@ -2825,6 +2813,56 @@ describe.only('ChangeStream resumability', function () {
28252813
expect(changeStream.closed).to.be.true;
28262814
});
28272815
});
2816+
2817+
context('when the error is not a server error', function () {
2818+
// This test requires a replica set to call replSetFreeze command
2819+
it(
2820+
'should resume on ServerSelectionError',
2821+
{ requires: { topology: ['replicaset'] } },
2822+
async function () {
2823+
changeStream = collection.watch([]);
2824+
2825+
const changes = on(changeStream, 'change');
2826+
await once(changeStream.cursor, 'init');
2827+
2828+
await collection.insertOne({ a: 1 });
2829+
2830+
const change = await changes.next();
2831+
expect(change.value[0]).to.containSubset({
2832+
operationType: 'insert',
2833+
fullDocument: { a: 1 }
2834+
});
2835+
2836+
await client.db('admin').command({
2837+
configureFailPoint: 'failCommand',
2838+
mode: 'alwaysOn',
2839+
data: {
2840+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
2841+
closeConnection: true,
2842+
appName: appName
2843+
}
2844+
} as FailCommandFailPoint);
2845+
await client
2846+
.db('admin')
2847+
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2848+
await client.db('admin').command({ replSetStepDown: 5, force: true });
2849+
2850+
await sleep(500);
2851+
2852+
await collection.insertOne({ a: 2 });
2853+
2854+
const change2 = await changes.next();
2855+
expect(change2.value[0]).to.containSubset({
2856+
operationType: 'insert',
2857+
fullDocument: { a: 2 }
2858+
});
2859+
2860+
expect(aggregateEvents).to.have.lengthOf(2);
2861+
const [e1, e2] = aggregateEvents;
2862+
expect(e1.address).to.not.equal(e2.address);
2863+
}
2864+
);
2865+
});
28282866
});
28292867

28302868
it(

0 commit comments

Comments
 (0)