Skip to content

Commit f8b9a0d

Browse files
committed
test(NODE-6858): add tests for event emmitter based iteration
1 parent 535b29c commit f8b9a0d

File tree

2 files changed

+56
-18
lines changed

2 files changed

+56
-18
lines changed

src/error.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,9 +1551,9 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean {
15511551
return true;
15521552
}
15531553

1554-
// if (error instanceof MongoServerSelectionError) {
1555-
// return true;
1556-
// }
1554+
if (error instanceof MongoServerSelectionError) {
1555+
return true;
1556+
}
15571557

15581558
if (wireVersion != null && wireVersion >= 9) {
15591559
// DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable

test/integration/change-streams/change_stream.test.ts

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2255,23 +2255,19 @@ describe.only('ChangeStream resumability', function () {
22552255

22562256
await collection.insertOne({ a: 1 });
22572257

2258-
// mimic the node termination by closing the connection and failing on heartbeat
22592258
await client.db('admin').command({
22602259
configureFailPoint: 'failCommand',
22612260
mode: 'alwaysOn',
22622261
data: {
22632262
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
22642263
closeConnection: true,
2265-
handshakeCommands: true,
2266-
failInternalCommands: true,
22672264
appName: appName
22682265
}
22692266
} as FailCommandFailPoint);
2270-
// force new election in the cluster
22712267
await client
22722268
.db('admin')
22732269
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2274-
await client.db('admin').command({ replSetStepDown: 30, force: true });
2270+
await client.db('admin').command({ replSetStepDown: 5, force: true });
22752271

22762272
await sleep(500);
22772273

@@ -2609,23 +2605,19 @@ describe.only('ChangeStream resumability', function () {
26092605

26102606
await collection.insertOne({ a: 1 });
26112607

2612-
// mimic the node termination by closing the connection and failing on heartbeat
26132608
await client.db('admin').command({
26142609
configureFailPoint: 'failCommand',
26152610
mode: 'alwaysOn',
26162611
data: {
26172612
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
26182613
closeConnection: true,
2619-
handshakeCommands: true,
2620-
failInternalCommands: true,
26212614
appName: appName
26222615
}
26232616
} as FailCommandFailPoint);
2624-
// force new election in the cluster
26252617
await client
26262618
.db('admin')
26272619
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2628-
await client.db('admin').command({ replSetStepDown: 30, force: true });
2620+
await client.db('admin').command({ replSetStepDown: 5, force: true });
26292621

26302622
await sleep(500);
26312623

@@ -2787,23 +2779,19 @@ describe.only('ChangeStream resumability', function () {
27872779

27882780
await collection.insertOne({ a: 1 });
27892781

2790-
// mimic the node termination by closing the connection and failing on heartbeat
27912782
await client.db('admin').command({
27922783
configureFailPoint: 'failCommand',
27932784
mode: 'alwaysOn',
27942785
data: {
27952786
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
27962787
closeConnection: true,
2797-
handshakeCommands: true,
2798-
failInternalCommands: true,
27992788
appName: appName
28002789
}
28012790
} as FailCommandFailPoint);
2802-
// force new election in the cluster
28032791
await client
28042792
.db('admin')
28052793
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2806-
await client.db('admin').command({ replSetStepDown: 30, force: true });
2794+
await client.db('admin').command({ replSetStepDown: 5, force: true });
28072795

28082796
await sleep(500);
28092797

@@ -3008,6 +2996,56 @@ describe.only('ChangeStream resumability', function () {
30082996
expect(changeStream.closed).to.be.true;
30092997
});
30102998
});
2999+
3000+
context('when the error is not a server error', function () {
3001+
// This test requires a replica set to call replSetFreeze command
3002+
it(
3003+
'should resume on ServerSelectionError',
3004+
{ requires: { topology: ['replicaset'] } },
3005+
async function () {
3006+
changeStream = collection.watch([]);
3007+
3008+
const changes = on(changeStream, 'change');
3009+
await once(changeStream.cursor, 'init');
3010+
3011+
await collection.insertOne({ a: 1 });
3012+
3013+
const change = await changes.next();
3014+
expect(change.value[0]).to.containSubset({
3015+
operationType: 'insert',
3016+
fullDocument: { a: 1 }
3017+
});
3018+
3019+
await client.db('admin').command({
3020+
configureFailPoint: 'failCommand',
3021+
mode: 'alwaysOn',
3022+
data: {
3023+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
3024+
closeConnection: true,
3025+
appName: appName
3026+
}
3027+
} as FailCommandFailPoint);
3028+
await client
3029+
.db('admin')
3030+
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
3031+
await client.db('admin').command({ replSetStepDown: 5, force: true });
3032+
3033+
await sleep(500);
3034+
3035+
await collection.insertOne({ a: 2 });
3036+
3037+
const change2 = await changes.next();
3038+
expect(change2.value[0]).to.containSubset({
3039+
operationType: 'insert',
3040+
fullDocument: { a: 2 }
3041+
});
3042+
3043+
expect(aggregateEvents).to.have.lengthOf(2);
3044+
const [e1, e2] = aggregateEvents;
3045+
expect(e1.address).to.not.equal(e2.address);
3046+
}
3047+
);
3048+
});
30113049
});
30123050

30133051
it(

0 commit comments

Comments
 (0)