Skip to content

Commit 21e68b5

Browse files
CSHARP-3156: Treat CursorNotFound as a resumable change stream error
1 parent 579a295 commit 21e68b5

File tree

8 files changed

+227
-50
lines changed

8 files changed

+227
-50
lines changed

src/MongoDB.Driver.Core/Core/Operations/AsyncCursor.cs

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -273,39 +273,55 @@ private BsonDocument CreateKillCursorsCommand()
273273
private CursorBatch<TDocument> ExecuteGetMoreCommand(IChannelHandle channel, CancellationToken cancellationToken)
274274
{
275275
var command = CreateGetMoreCommand();
276-
var result = channel.Command<BsonDocument>(
277-
_channelSource.Session,
278-
null, // readPreference
279-
_collectionNamespace.DatabaseNamespace,
280-
command,
281-
null, // commandPayloads
282-
NoOpElementNameValidator.Instance,
283-
null, // additionalOptions
284-
null, // postWriteAction
285-
CommandResponseHandling.Return,
286-
__getMoreCommandResultSerializer,
287-
_messageEncoderSettings,
288-
cancellationToken);
276+
BsonDocument result;
277+
try
278+
{
279+
result = channel.Command<BsonDocument>(
280+
_channelSource.Session,
281+
null, // readPreference
282+
_collectionNamespace.DatabaseNamespace,
283+
command,
284+
null, // commandPayloads
285+
NoOpElementNameValidator.Instance,
286+
null, // additionalOptions
287+
null, // postWriteAction
288+
CommandResponseHandling.Return,
289+
__getMoreCommandResultSerializer,
290+
_messageEncoderSettings,
291+
cancellationToken);
292+
}
293+
catch (MongoCommandException ex) when (IsMongoCursorNotFoundException(ex))
294+
{
295+
throw new MongoCursorNotFoundException(channel.ConnectionDescription.ConnectionId, _cursorId, command);
296+
}
289297

290298
return CreateCursorBatch(result);
291299
}
292300

293301
private async Task<CursorBatch<TDocument>> ExecuteGetMoreCommandAsync(IChannelHandle channel, CancellationToken cancellationToken)
294302
{
295303
var command = CreateGetMoreCommand();
296-
var result = await channel.CommandAsync<BsonDocument>(
297-
_channelSource.Session,
298-
null, // readPreference
299-
_collectionNamespace.DatabaseNamespace,
300-
command,
301-
null, // commandPayloads
302-
NoOpElementNameValidator.Instance,
303-
null, // additionalOptions
304-
null, // postWriteAction
305-
CommandResponseHandling.Return,
306-
__getMoreCommandResultSerializer,
307-
_messageEncoderSettings,
308-
cancellationToken).ConfigureAwait(false);
304+
BsonDocument result;
305+
try
306+
{
307+
result = await channel.CommandAsync<BsonDocument>(
308+
_channelSource.Session,
309+
null, // readPreference
310+
_collectionNamespace.DatabaseNamespace,
311+
command,
312+
null, // commandPayloads
313+
NoOpElementNameValidator.Instance,
314+
null, // additionalOptions
315+
null, // postWriteAction
316+
CommandResponseHandling.Return,
317+
__getMoreCommandResultSerializer,
318+
_messageEncoderSettings,
319+
cancellationToken).ConfigureAwait(false);
320+
}
321+
catch (MongoCommandException ex) when (IsMongoCursorNotFoundException(ex))
322+
{
323+
throw new MongoCursorNotFoundException(channel.ConnectionDescription.ConnectionId, _cursorId, command);
324+
}
309325

310326
return CreateCursorBatch(result);
311327
}
@@ -515,6 +531,11 @@ private async Task<CursorBatch<TDocument>> GetNextBatchAsync(CancellationToken c
515531
}
516532
}
517533

534+
private bool IsMongoCursorNotFoundException(MongoCommandException exception)
535+
{
536+
return exception.Code == (int)ServerErrorCode.CursorNotFound;
537+
}
538+
518539
private void KillCursors(CancellationToken cancellationToken)
519540
{
520541
using (EventContext.BeginOperation(_operationId))

src/MongoDB.Driver.Core/Core/Operations/RetryabilityHelper.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ public static bool IsResumableChangeStreamException(Exception exception, Semanti
109109
{
110110
return true;
111111
}
112+
if (exception is MongoCursorNotFoundException)
113+
{
114+
return true;
115+
}
112116

113117
if (Feature.ServerReturnsResumableChangeStreamErrorLabel.IsSupported(serverVersion))
114118
{

src/MongoDB.Driver.Core/ServerErrorCode.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ internal enum ServerErrorCode
2121
// see: https://github.com/mongodb/mongo/blob/master/src/mongo/base/error_codes.yml
2222
CappedPositionLost = 136,
2323
CursorKilled = 237,
24+
CursorNotFound = 43,
2425
ElectionInProgress = 216,
2526
ExceededTimeLimit = 262,
2627
FailedToSatisfyReadPreference = 133,

tests/MongoDB.Driver.Core.Tests/Core/Operations/RetryabilityHelperTests.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void IsCommandRetryable_should_return_expected_result(string command, boo
135135
[InlineData(typeof(MongoConnectionClosedException), false)]
136136
[InlineData(typeof(MongoNodeIsRecoveringException), true)]
137137
[InlineData(typeof(MongoNotPrimaryException), true)]
138-
[InlineData(typeof(MongoCursorNotFoundException), false)]
138+
[InlineData(typeof(MongoCursorNotFoundException), true)]
139139
[InlineData(ServerErrorCode.HostNotFound, true)]
140140
[InlineData(ServerErrorCode.HostUnreachable, true)]
141141
[InlineData(ServerErrorCode.NetworkTimeout, true)]
@@ -193,9 +193,10 @@ public void IsResumableChangeStreamException_should_return_expected_result_for_s
193193
[Theory]
194194
[InlineData(typeof(MongoConnectionException), true)] // network exception
195195
[InlineData(typeof(MongoConnectionClosedException), false)]
196-
public void IsResumableChangeStreamException_should_return_expected_result_for_servers_with_new_behavior_and_connection_errors(Type exceptionType, bool isResumable)
196+
[InlineData(typeof(MongoCursorNotFoundException), true)]
197+
public void IsResumableChangeStreamException_should_return_expected_result_for_servers_with_new_behavior_and_errors(Type exceptionType, bool isResumable)
197198
{
198-
var exception = (MongoConnectionException)CoreExceptionHelper.CreateException(exceptionType);
199+
var exception = (MongoException)CoreExceptionHelper.CreateException(exceptionType);
199200

200201
var result = RetryabilityHelper.IsResumableChangeStreamException(exception, Feature.ServerReturnsResumableChangeStreamErrorLabel.FirstSupportedVersion);
201202

tests/MongoDB.Driver.Tests/Specifications/change-streams/tests/change-streams-errors.json

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,12 @@
102102
],
103103
"result": {
104104
"error": {
105-
"code": 280,
106-
"errorLabels": [
107-
"NonResumableChangeStreamError"
108-
]
105+
"code": 280
109106
}
110107
}
111108
},
112109
{
113-
"description": "change stream errors on MaxTimeMSExpired",
110+
"description": "change stream errors on ElectionInProgress",
114111
"minServerVersion": "4.2",
115112
"failPoint": {
116113
"configureFailPoint": "failCommand",
@@ -121,7 +118,7 @@
121118
"failCommands": [
122119
"getMore"
123120
],
124-
"errorCode": 50,
121+
"errorCode": 216,
125122
"closeConnection": false
126123
}
127124
},
@@ -130,13 +127,7 @@
130127
"replicaset",
131128
"sharded"
132129
],
133-
"changeStreamPipeline": [
134-
{
135-
"$project": {
136-
"_id": 0
137-
}
138-
}
139-
],
130+
"changeStreamPipeline": [],
140131
"changeStreamOptions": {},
141132
"operations": [
142133
{
@@ -152,7 +143,7 @@
152143
],
153144
"result": {
154145
"error": {
155-
"code": 50
146+
"code": 216
156147
}
157148
}
158149
}

tests/MongoDB.Driver.Tests/Specifications/change-streams/tests/change-streams-errors.yml

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,24 +72,21 @@ tests:
7272
result:
7373
error:
7474
code: 280
75-
errorLabels: [ "NonResumableChangeStreamError" ]
7675
-
77-
description: change stream errors on MaxTimeMSExpired
76+
description: change stream errors on ElectionInProgress
7877
minServerVersion: "4.2"
7978
failPoint:
8079
configureFailPoint: failCommand
8180
mode: { times: 1 }
8281
data:
8382
failCommands: ["getMore"]
84-
errorCode: 50 # An error code that's not on the old blacklist or whitelist
83+
errorCode: 216 # An error code that's not on the old blacklist or whitelist
8584
closeConnection: false
8685
target: collection
8786
topology:
8887
- replicaset
8988
- sharded
90-
changeStreamPipeline:
91-
-
92-
$project: { _id: 0 }
89+
changeStreamPipeline: []
9390
changeStreamOptions: {}
9491
operations:
9592
-
@@ -101,4 +98,4 @@ tests:
10198
z: 3
10299
result:
103100
error:
104-
code: 50
101+
code: 216

tests/MongoDB.Driver.Tests/Specifications/change-streams/tests/change-streams-resume-whitelist.json

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,6 +1648,102 @@
16481648
}
16491649
]
16501650
}
1651+
},
1652+
{
1653+
"description": "change stream resumes after CursorNotFound",
1654+
"minServerVersion": "4.2",
1655+
"failPoint": {
1656+
"configureFailPoint": "failCommand",
1657+
"mode": {
1658+
"times": 1
1659+
},
1660+
"data": {
1661+
"failCommands": [
1662+
"getMore"
1663+
],
1664+
"errorCode": 43,
1665+
"closeConnection": false
1666+
}
1667+
},
1668+
"target": "collection",
1669+
"topology": [
1670+
"replicaset",
1671+
"sharded"
1672+
],
1673+
"changeStreamPipeline": [],
1674+
"changeStreamOptions": {},
1675+
"operations": [
1676+
{
1677+
"database": "change-stream-tests",
1678+
"collection": "test",
1679+
"name": "insertOne",
1680+
"arguments": {
1681+
"document": {
1682+
"x": 1
1683+
}
1684+
}
1685+
}
1686+
],
1687+
"expectations": [
1688+
{
1689+
"command_started_event": {
1690+
"command": {
1691+
"aggregate": "test",
1692+
"cursor": {},
1693+
"pipeline": [
1694+
{
1695+
"$changeStream": {}
1696+
}
1697+
]
1698+
},
1699+
"command_name": "aggregate",
1700+
"database_name": "change-stream-tests"
1701+
}
1702+
},
1703+
{
1704+
"command_started_event": {
1705+
"command": {
1706+
"getMore": 42,
1707+
"collection": "test"
1708+
},
1709+
"command_name": "getMore",
1710+
"database_name": "change-stream-tests"
1711+
}
1712+
},
1713+
{
1714+
"command_started_event": {
1715+
"command": {
1716+
"aggregate": "test",
1717+
"cursor": {},
1718+
"pipeline": [
1719+
{
1720+
"$changeStream": {}
1721+
}
1722+
]
1723+
},
1724+
"command_name": "aggregate",
1725+
"database_name": "change-stream-tests"
1726+
}
1727+
}
1728+
],
1729+
"result": {
1730+
"success": [
1731+
{
1732+
"_id": "42",
1733+
"documentKey": "42",
1734+
"operationType": "insert",
1735+
"ns": {
1736+
"db": "change-stream-tests",
1737+
"coll": "test"
1738+
},
1739+
"fullDocument": {
1740+
"x": {
1741+
"$numberInt": "1"
1742+
}
1743+
}
1744+
}
1745+
]
1746+
}
16511747
}
16521748
]
16531749
}

tests/MongoDB.Driver.Tests/Specifications/change-streams/tests/change-streams-resume-whitelist.yml

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,3 +1105,69 @@ tests:
11051105
fullDocument:
11061106
x:
11071107
$numberInt: "1"
1108+
-
1109+
# CursorNotFound is special-cased to be resumable regardless of server versions or error labels, so this test has
1110+
# no maxWireVersion.
1111+
description: "change stream resumes after CursorNotFound"
1112+
minServerVersion: "4.2"
1113+
failPoint:
1114+
configureFailPoint: failCommand
1115+
mode: { times: 1 }
1116+
data:
1117+
failCommands: ["getMore"]
1118+
errorCode: 43
1119+
closeConnection: false
1120+
target: collection
1121+
topology:
1122+
- replicaset
1123+
- sharded
1124+
changeStreamPipeline: []
1125+
changeStreamOptions: {}
1126+
operations:
1127+
-
1128+
database: *database_name
1129+
collection: *collection_name
1130+
name: insertOne
1131+
arguments:
1132+
document:
1133+
x: 1
1134+
expectations:
1135+
-
1136+
command_started_event:
1137+
command:
1138+
aggregate: *collection_name
1139+
cursor: {}
1140+
pipeline:
1141+
-
1142+
$changeStream: {}
1143+
command_name: aggregate
1144+
database_name: *database_name
1145+
-
1146+
command_started_event:
1147+
command:
1148+
getMore: 42
1149+
collection: *collection_name
1150+
command_name: getMore
1151+
database_name: *database_name
1152+
-
1153+
command_started_event:
1154+
command:
1155+
aggregate: *collection_name
1156+
cursor: {}
1157+
pipeline:
1158+
-
1159+
$changeStream: {}
1160+
command_name: aggregate
1161+
database_name: *database_name
1162+
result:
1163+
success:
1164+
-
1165+
_id: "42"
1166+
documentKey: "42"
1167+
operationType: insert
1168+
ns:
1169+
db: *database_name
1170+
coll: *collection_name
1171+
fullDocument:
1172+
x:
1173+
$numberInt: "1"

0 commit comments

Comments
 (0)