Skip to content

Commit ea4d8d3

Browse files
CSHARP-2988: Use white list for change stream resumability.
1 parent a5c8294 commit ea4d8d3

23 files changed

+5940
-197
lines changed

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public class Feature
7878
private static readonly Feature __scramSha1Authentication = new Feature("ScramSha1Authentication", new SemanticVersion(3, 0, 0));
7979
private static readonly Feature __scramSha256Authentication = new Feature("ScramSha256Authentication", new SemanticVersion(4, 0, 0, ""));
8080
private static readonly Feature __serverExtractsUsernameFromX509Certificate = new Feature("ServerExtractsUsernameFromX509Certificate", new SemanticVersion(3, 3, 12));
81+
private static readonly Feature __serverReturnsResumableChangeStreamErrorLabel = new Feature("ServerReturnsResumableChangeStreamErrorLabel", new SemanticVersion(4, 3, 0));
8182
private static readonly Feature __serverReturnsRetryableWriteErrorLabel = new Feature("ServerReturnsRetryableWriteErrorLabel", new SemanticVersion(4, 3, 0));
8283
private static readonly Feature __shardedTransactions = new Feature("ShardedTransactions", new SemanticVersion(4, 1, 6));
8384
private static readonly Feature __tailableCursor = new Feature("TailableCursor", new SemanticVersion(3, 2, 0));
@@ -362,6 +363,11 @@ public class Feature
362363
/// </summary>
363364
public static Feature ServerExtractsUsernameFromX509Certificate => __serverExtractsUsernameFromX509Certificate;
364365

366+
/// <summary>
367+
/// Gets the server returns resumableChangeStream label feature.
368+
/// </summary>
369+
public static Feature ServerReturnsResumableChangeStreamErrorLabel => __serverReturnsResumableChangeStreamErrorLabel;
370+
365371
/// <summary>
366372
/// Gets the server returns retryable writeError label feature.
367373
/// </summary>

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamCursor.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ internal sealed class ChangeStreamCursor<TDocument> : IChangeStreamCursor<TDocum
4545
private readonly BsonDocument _initialResumeAfter;
4646
private readonly BsonDocument _initialStartAfter;
4747
private readonly BsonTimestamp _initialStartAtOperationTime;
48+
private readonly SemanticVersion _serverVersion;
4849

4950
// public properties
5051
/// <inheritdoc />
@@ -63,6 +64,7 @@ internal sealed class ChangeStreamCursor<TDocument> : IChangeStreamCursor<TDocum
6364
/// <param name="initialStartAfter">The start after value.</param>
6465
/// <param name="initialResumeAfter">The resume after value.</param>
6566
/// <param name="initialStartAtOperationTime">The start at operation time value.</param>
67+
/// <param name="serverVersion">The server version.</param>
6668
public ChangeStreamCursor(
6769
IAsyncCursor<RawBsonDocument> cursor,
6870
IBsonSerializer<TDocument> documentSerializer,
@@ -72,7 +74,8 @@ public ChangeStreamCursor(
7274
BsonTimestamp initialOperationTime,
7375
BsonDocument initialStartAfter,
7476
BsonDocument initialResumeAfter,
75-
BsonTimestamp initialStartAtOperationTime)
77+
BsonTimestamp initialStartAtOperationTime,
78+
SemanticVersion serverVersion)
7679
{
7780
_cursor = Ensure.IsNotNull(cursor, nameof(cursor));
7881
_documentSerializer = Ensure.IsNotNull(documentSerializer, nameof(documentSerializer));
@@ -84,6 +87,7 @@ public ChangeStreamCursor(
8487
_initialStartAfter = initialStartAfter;
8588
_initialResumeAfter = initialResumeAfter;
8689
_initialStartAtOperationTime = initialStartAtOperationTime;
90+
_serverVersion = Ensure.IsNotNull(serverVersion, nameof(serverVersion));
8791
}
8892

8993
// public methods
@@ -119,7 +123,7 @@ public BsonDocument GetResumeToken()
119123
hasMore = _cursor.MoveNext(cancellationToken);
120124
break;
121125
}
122-
catch (Exception ex) when (RetryabilityHelper.IsResumableChangeStreamException(ex))
126+
catch (Exception ex) when (RetryabilityHelper.IsResumableChangeStreamException(ex, _serverVersion))
123127
{
124128
var newCursor = Resume(cancellationToken);
125129
_cursor.Dispose();
@@ -142,7 +146,7 @@ public BsonDocument GetResumeToken()
142146
hasMore = await _cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false);
143147
break;
144148
}
145-
catch (Exception ex) when (RetryabilityHelper.IsResumableChangeStreamException(ex))
149+
catch (Exception ex) when (RetryabilityHelper.IsResumableChangeStreamException(ex, _serverVersion))
146150
{
147151
var newCursor = await ResumeAsync(cancellationToken).ConfigureAwait(false);
148152
_cursor.Dispose();

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamOperation.cs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -300,20 +300,21 @@ public IChangeStreamCursor<TResult> Execute(IReadBinding binding, CancellationTo
300300
cursor = ExecuteAggregateOperation(context, cancellationToken);
301301
cursorBatchInfo = (ICursorBatchInfo)cursor;
302302
initialOperationTime = GetInitialOperationTimeIfRequired(context, cursorBatchInfo);
303-
}
304303

305-
var postBatchResumeToken = GetInitialPostBatchResumeTokenIfRequired(cursorBatchInfo);
306-
307-
return new ChangeStreamCursor<TResult>(
308-
cursor,
309-
_resultSerializer,
310-
bindingHandle.Fork(),
311-
this,
312-
postBatchResumeToken,
313-
initialOperationTime,
314-
_startAfter,
315-
_resumeAfter,
316-
_startAtOperationTime);
304+
var postBatchResumeToken = GetInitialPostBatchResumeTokenIfRequired(cursorBatchInfo);
305+
306+
return new ChangeStreamCursor<TResult>(
307+
cursor,
308+
_resultSerializer,
309+
bindingHandle.Fork(),
310+
this,
311+
postBatchResumeToken,
312+
initialOperationTime,
313+
_startAfter,
314+
_resumeAfter,
315+
_startAtOperationTime,
316+
context.Channel.ConnectionDescription.ServerVersion);
317+
}
317318
}
318319

319320
/// <inheritdoc />
@@ -334,20 +335,21 @@ public async Task<IChangeStreamCursor<TResult>> ExecuteAsync(IReadBinding bindin
334335
cursor = await ExecuteAggregateOperationAsync(context, cancellationToken).ConfigureAwait(false);
335336
cursorBatchInfo = (ICursorBatchInfo)cursor;
336337
initialOperationTime = GetInitialOperationTimeIfRequired(context, cursorBatchInfo);
337-
}
338338

339-
var postBatchResumeToken = GetInitialPostBatchResumeTokenIfRequired(cursorBatchInfo);
340-
341-
return new ChangeStreamCursor<TResult>(
342-
cursor,
343-
_resultSerializer,
344-
bindingHandle.Fork(),
345-
this,
346-
postBatchResumeToken,
347-
initialOperationTime,
348-
_startAfter,
349-
_resumeAfter,
350-
_startAtOperationTime);
339+
var postBatchResumeToken = GetInitialPostBatchResumeTokenIfRequired(cursorBatchInfo);
340+
341+
return new ChangeStreamCursor<TResult>(
342+
cursor,
343+
_resultSerializer,
344+
bindingHandle.Fork(),
345+
this,
346+
postBatchResumeToken,
347+
initialOperationTime,
348+
_startAfter,
349+
_resumeAfter,
350+
_startAtOperationTime,
351+
context.Channel.ConnectionDescription.ServerVersion);
352+
}
351353
}
352354

353355
/// <inheritdoc />

src/MongoDB.Driver.Core/Core/Operations/RetryabilityHelper.cs

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,18 @@
1515

1616
using System;
1717
using System.Collections.Generic;
18-
using System.Linq;
19-
using MongoDB.Bson;
18+
using MongoDB.Driver.Core.Misc;
2019

2120
namespace MongoDB.Driver.Core.Operations
2221
{
2322
internal static class RetryabilityHelper
2423
{
2524
// private constants
25+
private const string ResumableChangeStreamErrorLabel = "ResumableChangeStreamError";
2626
private const string RetryableWriteErrorLabel = "RetryableWriteError";
2727

2828
// private static fields
29-
private static readonly HashSet<ServerErrorCode> __notResumableChangeStreamErrorCodes;
30-
private static readonly HashSet<string> __notResumableChangeStreamErrorLabels;
29+
private static readonly HashSet<ServerErrorCode> __resumableChangeStreamErrorCodes;
3130
private static readonly HashSet<Type> __resumableChangeStreamExceptions;
3231
private static readonly HashSet<Type> __retryableReadExceptions;
3332
private static readonly HashSet<Type> __retryableWriteExceptions;
@@ -39,15 +38,11 @@ static RetryabilityHelper()
3938
{
4039
var resumableAndRetryableExceptions = new HashSet<Type>()
4140
{
42-
typeof(MongoConnectionException),
4341
typeof(MongoNotPrimaryException),
4442
typeof(MongoNodeIsRecoveringException)
4543
};
4644

47-
__resumableChangeStreamExceptions = new HashSet<Type>(resumableAndRetryableExceptions)
48-
{
49-
typeof(MongoCursorNotFoundException)
50-
};
45+
__resumableChangeStreamExceptions = new HashSet<Type>(resumableAndRetryableExceptions);
5146

5247
__retryableReadExceptions = new HashSet<Type>(resumableAndRetryableExceptions);
5348

@@ -68,16 +63,25 @@ static RetryabilityHelper()
6863
ServerErrorCode.ExceededTimeLimit
6964
};
7065

71-
__notResumableChangeStreamErrorCodes = new HashSet<ServerErrorCode>()
72-
{
73-
ServerErrorCode.CappedPositionLost,
74-
ServerErrorCode.CursorKilled,
75-
ServerErrorCode.Interrupted
76-
};
77-
78-
__notResumableChangeStreamErrorLabels = new HashSet<string>()
66+
__resumableChangeStreamErrorCodes = new HashSet<ServerErrorCode>()
7967
{
80-
"NonResumableChangeStreamError"
68+
ServerErrorCode.HostUnreachable,
69+
ServerErrorCode.HostNotFound,
70+
ServerErrorCode.NetworkTimeout,
71+
ServerErrorCode.ShutdownInProgress,
72+
ServerErrorCode.PrimarySteppedDown,
73+
ServerErrorCode.ExceededTimeLimit,
74+
ServerErrorCode.SocketException,
75+
ServerErrorCode.NotMaster,
76+
ServerErrorCode.InterruptedAtShutdown,
77+
ServerErrorCode.InterruptedDueToReplStateChange,
78+
ServerErrorCode.NotMasterNoSlaveOk,
79+
ServerErrorCode.NotMasterOrSecondary,
80+
ServerErrorCode.StaleShardVersion,
81+
ServerErrorCode.StaleEpoch,
82+
ServerErrorCode.StaleConfig,
83+
ServerErrorCode.RetryChangeStream,
84+
ServerErrorCode.FailedToSatisfyReadPreference
8185
};
8286
}
8387

@@ -90,26 +94,41 @@ public static void AddRetryableWriteErrorLabelIfRequired(MongoException exceptio
9094
}
9195
}
9296

93-
public static bool IsResumableChangeStreamException(Exception exception)
97+
public static bool IsNetworkException(Exception exception)
9498
{
95-
var commandException = exception as MongoCommandException;
96-
if (commandException != null)
99+
return exception is MongoConnectionException mongoConnectionException && mongoConnectionException.IsNetworkException;
100+
}
101+
102+
public static bool IsResumableChangeStreamException(Exception exception, SemanticVersion serverVersion)
103+
{
104+
if (IsNetworkException(exception))
97105
{
98-
var code = (ServerErrorCode)commandException.Code;
99-
var isNonResumable =
100-
__notResumableChangeStreamErrorCodes.Contains(code) ||
101-
__notResumableChangeStreamErrorLabels.Any(c => commandException.HasErrorLabel(c));
102-
return !isNonResumable;
106+
return true;
107+
}
108+
109+
if (Feature.ServerReturnsResumableChangeStreamErrorLabel.IsSupported(serverVersion))
110+
{
111+
return exception is MongoException mongoException ? mongoException.HasErrorLabel(ResumableChangeStreamErrorLabel) : false;
103112
}
104113
else
105114
{
115+
var commandException = exception as MongoCommandException;
116+
if (commandException != null)
117+
{
118+
var code = (ServerErrorCode)commandException.Code;
119+
if (__resumableChangeStreamErrorCodes.Contains(code))
120+
{
121+
return true;
122+
}
123+
}
124+
106125
return __resumableChangeStreamExceptions.Contains(exception.GetType());
107126
}
108127
}
109128

110129
public static bool IsRetryableReadException(Exception exception)
111130
{
112-
if (__retryableReadExceptions.Contains(exception.GetType()))
131+
if (__retryableReadExceptions.Contains(exception.GetType()) || IsNetworkException(exception))
113132
{
114133
return true;
115134
}
@@ -135,7 +154,7 @@ public static bool IsRetryableWriteException(Exception exception)
135154
// private static methods
136155
private static bool ShouldRetryableWriteExceptionLabelBeAdded(Exception exception)
137156
{
138-
if (__retryableWriteExceptions.Contains(exception.GetType()))
157+
if (__retryableWriteExceptions.Contains(exception.GetType()) || IsNetworkException(exception))
139158
{
140159
return true;
141160
}

src/MongoDB.Driver.Core/MongoAuthenticationException.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,8 @@ public MongoAuthenticationException(SerializationInfo info, StreamingContext con
6262
{
6363
}
6464
#endif
65+
66+
/// <inheritdoc/>
67+
public override bool IsNetworkException => false;
6568
}
6669
}

src/MongoDB.Driver.Core/MongoCommandException.cs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,32 +30,6 @@ namespace MongoDB.Driver
3030
#endif
3131
public class MongoCommandException : MongoServerException
3232
{
33-
#region static
34-
/// <summary>
35-
/// Adds error labels from a command result document into the top-level label list.
36-
/// </summary>
37-
/// <param name="exception">The exception.</param>
38-
/// <param name="result">The result document.</param>
39-
protected static void AddErrorLabelsFromCommandResult(MongoCommandException exception, BsonDocument result)
40-
{
41-
// note: make a best effort to extract the error labels from the result, but never throw an exception
42-
if (result != null)
43-
{
44-
BsonValue errorLabels;
45-
if (result.TryGetValue("errorLabels", out errorLabels) && errorLabels.IsBsonArray)
46-
{
47-
foreach (var errorLabel in errorLabels.AsBsonArray)
48-
{
49-
if (errorLabel.IsString)
50-
{
51-
exception.AddErrorLabel(errorLabel.AsString);
52-
}
53-
}
54-
}
55-
}
56-
}
57-
#endregion
58-
5933
// fields
6034
private readonly BsonDocument _command;
6135
private readonly BsonDocument _result;

src/MongoDB.Driver.Core/MongoConnectionClosedException.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,8 @@ protected MongoConnectionClosedException(SerializationInfo info, StreamingContex
5050
{
5151
}
5252
#endif
53+
54+
/// <inheritdoc/>
55+
public override bool IsNetworkException => false;
5356
}
5457
}

src/MongoDB.Driver.Core/MongoConnectionException.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public ConnectionId ConnectionId
7878
get { return _connectionId; }
7979
}
8080

81+
/// <summary>
82+
/// Determines whether the exception is network error or no.
83+
/// </summary>
84+
public virtual bool IsNetworkException => true; // true in subclasses, only if they can be considered as a network error
85+
8186
// methods
8287
#if NET452
8388
/// <inheritdoc/>

src/MongoDB.Driver.Core/MongoExecutionTimeoutException.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public MongoExecutionTimeoutException(ConnectionId connectionId, string message,
7676
: base(connectionId, message, innerException)
7777
{
7878
_result = result;
79+
AddErrorLabelsFromCommandResult(this, result);
7980
}
8081

8182
#if NET452

src/MongoDB.Driver.Core/MongoServerException.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using MongoDB.Bson;
1718
#if NET452
1819
using System.Runtime.Serialization;
1920
#endif
@@ -30,6 +31,32 @@ namespace MongoDB.Driver
3031
#endif
3132
public class MongoServerException : MongoException
3233
{
34+
#region static
35+
/// <summary>
36+
/// Adds error labels from a command result document into the top-level label list.
37+
/// </summary>
38+
/// <param name="exception">The exception.</param>
39+
/// <param name="result">The result document.</param>
40+
protected static void AddErrorLabelsFromCommandResult(MongoServerException exception, BsonDocument result)
41+
{
42+
// note: make a best effort to extract the error labels from the result, but never throw an exception
43+
if (result != null)
44+
{
45+
BsonValue errorLabels;
46+
if (result.TryGetValue("errorLabels", out errorLabels) && errorLabels.IsBsonArray)
47+
{
48+
foreach (var errorLabel in errorLabels.AsBsonArray)
49+
{
50+
if (errorLabel.IsString)
51+
{
52+
exception.AddErrorLabel(errorLabel.AsString);
53+
}
54+
}
55+
}
56+
}
57+
}
58+
#endregion
59+
3360
// fields
3461
private readonly ConnectionId _connectionId;
3562

0 commit comments

Comments
 (0)