Skip to content

Commit d250d9f

Browse files
committed
CSHARP-2229: Added RetryabilityHelper
1 parent d29c782 commit d250d9f

File tree

8 files changed

+195
-145
lines changed

8 files changed

+195
-145
lines changed

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamCursor.cs

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void Dispose()
8888
}
8989
catch (Exception ex)
9090
{
91-
if (CanResumeAfter(ex))
91+
if (RetryabilityHelper.IsResumableChangeStreamException(ex))
9292
{
9393
_cursor = _changeStreamOperation.Resume(_binding, _resumeToken, cancellationToken);
9494
hasMore = _cursor.MoveNext(cancellationToken);
@@ -113,7 +113,7 @@ public void Dispose()
113113
}
114114
catch (Exception ex)
115115
{
116-
if (CanResumeAfter(ex))
116+
if (RetryabilityHelper.IsResumableChangeStreamException(ex))
117117
{
118118
_cursor = await _changeStreamOperation.ResumeAsync(_binding, _resumeToken, cancellationToken).ConfigureAwait(false);
119119
hasMore = await _cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false);
@@ -129,36 +129,6 @@ public void Dispose()
129129
}
130130

131131
// private methods
132-
private bool CanResumeAfter(Exception exception)
133-
{
134-
var commandException = exception as MongoCommandException;
135-
if (commandException != null)
136-
{
137-
var code = (ServerErrorCode)commandException.Code;
138-
switch (code)
139-
{
140-
case ServerErrorCode.HostNotFound:
141-
case ServerErrorCode.HostUnreachable:
142-
case ServerErrorCode.InterruptedAtShutdown:
143-
case ServerErrorCode.InterruptedDueToReplStateChange:
144-
case ServerErrorCode.NetworkTimeout:
145-
case ServerErrorCode.NotMaster:
146-
case ServerErrorCode.NotMasterNoSlaveOk:
147-
case ServerErrorCode.NotMasterOrSecondary:
148-
case ServerErrorCode.PrimarySteppedDown:
149-
case ServerErrorCode.ShutdownInProgress:
150-
case ServerErrorCode.SocketException:
151-
case ServerErrorCode.WriteConcernFailed:
152-
return true;
153-
154-
default:
155-
return false;
156-
}
157-
}
158-
159-
return exception is MongoConnectionException || exception is MongoCursorNotFoundException || exception is MongoNotPrimaryException;
160-
}
161-
162132
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
163133
private TDocument DeserializeDocument(RawBsonDocument rawDocument)
164134
{
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/* Copyright 2018-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Collections.Generic;
18+
19+
namespace MongoDB.Driver.Core.Operations
20+
{
21+
internal static class RetryabilityHelper
22+
{
23+
// private static fields
24+
private static readonly HashSet<Type> __resumableChangeStreamExceptions;
25+
private static readonly HashSet<ServerErrorCode> __resumableChangeStreamErrorCodes;
26+
private static readonly HashSet<Type> __retryableWriteExceptions;
27+
private static readonly HashSet<ServerErrorCode> __retryableWriteErrorCodes;
28+
29+
// static constructor
30+
static RetryabilityHelper()
31+
{
32+
var resumableAndRetryableExceptions = new HashSet<Type>()
33+
{
34+
typeof(MongoConnectionException),
35+
typeof(MongoNotPrimaryException)
36+
};
37+
38+
__resumableChangeStreamExceptions = new HashSet<Type>(resumableAndRetryableExceptions)
39+
{
40+
typeof(MongoCursorNotFoundException)
41+
};
42+
43+
__retryableWriteExceptions = new HashSet<Type>(resumableAndRetryableExceptions)
44+
{
45+
typeof(MongoNodeIsRecoveringException)
46+
};
47+
48+
var resumableAndRetryableErrorCodes = new HashSet<ServerErrorCode>
49+
{
50+
ServerErrorCode.HostNotFound,
51+
ServerErrorCode.HostUnreachable,
52+
ServerErrorCode.InterruptedAtShutdown,
53+
ServerErrorCode.InterruptedDueToReplStateChange,
54+
ServerErrorCode.NetworkTimeout,
55+
ServerErrorCode.NotMaster,
56+
ServerErrorCode.NotMasterNoSlaveOk,
57+
ServerErrorCode.NotMasterOrSecondary,
58+
ServerErrorCode.PrimarySteppedDown,
59+
ServerErrorCode.ShutdownInProgress,
60+
ServerErrorCode.SocketException
61+
};
62+
63+
__resumableChangeStreamErrorCodes = new HashSet<ServerErrorCode>(resumableAndRetryableErrorCodes)
64+
{
65+
};
66+
67+
__retryableWriteErrorCodes = new HashSet<ServerErrorCode>(resumableAndRetryableErrorCodes)
68+
{
69+
ServerErrorCode.WriteConcernFailed
70+
};
71+
}
72+
73+
// public static methods
74+
public static bool IsResumableChangeStreamException(Exception exception)
75+
{
76+
return IsMatch(exception, __resumableChangeStreamExceptions, __resumableChangeStreamErrorCodes);
77+
}
78+
79+
public static bool IsRetryableWriteException(Exception exception)
80+
{
81+
return IsMatch(exception, __retryableWriteExceptions, __retryableWriteErrorCodes);
82+
}
83+
84+
// private static methods
85+
private static bool IsMatch(Exception exception, HashSet<Type> matchingExceptions, HashSet<ServerErrorCode> matchingErrorCodes)
86+
{
87+
var commandException = exception as MongoCommandException;
88+
if (commandException != null)
89+
{
90+
return matchingErrorCodes.Contains((ServerErrorCode)commandException.Code);
91+
}
92+
else
93+
{
94+
return matchingExceptions.Contains(exception.GetType());
95+
}
96+
}
97+
}
98+
}

src/MongoDB.Driver.Core/Core/Operations/RetryableWriteOperationExecutor.cs

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static TResult Execute<TResult>(IRetryableWriteOperation<TResult> operati
4646
{
4747
return operation.ExecuteAttempt(context, 1, transactionNumber, cancellationToken);
4848
}
49-
catch (Exception ex) when (IsRetryableException(ex))
49+
catch (Exception ex) when (RetryabilityHelper.IsRetryableWriteException(ex))
5050
{
5151
originalException = ex;
5252
}
@@ -97,7 +97,7 @@ public static async Task<TResult> ExecuteAsync<TResult>(IRetryableWriteOperation
9797
{
9898
return await operation.ExecuteAttemptAsync(context, 1, transactionNumber, cancellationToken).ConfigureAwait(false);
9999
}
100-
catch (Exception ex) when (IsRetryableException(ex))
100+
catch (Exception ex) when (RetryabilityHelper.IsRetryableWriteException(ex))
101101
{
102102
originalException = ex;
103103
}
@@ -135,39 +135,6 @@ private static bool AreRetryableWritesSupported(ConnectionDescription connection
135135
connectionDescription.IsMasterResult.ServerType != ServerType.Standalone;
136136
}
137137

138-
private static bool IsRetryableException(Exception ex)
139-
{
140-
var commandException = ex as MongoCommandException;
141-
if (commandException != null)
142-
{
143-
var code = (ServerErrorCode)commandException.Code;
144-
switch (code)
145-
{
146-
case ServerErrorCode.HostNotFound:
147-
case ServerErrorCode.HostUnreachable:
148-
case ServerErrorCode.InterruptedAtShutdown:
149-
case ServerErrorCode.InterruptedDueToReplStateChange:
150-
case ServerErrorCode.NetworkTimeout:
151-
case ServerErrorCode.NotMaster:
152-
case ServerErrorCode.NotMasterNoSlaveOk:
153-
case ServerErrorCode.NotMasterOrSecondary:
154-
case ServerErrorCode.PrimarySteppedDown:
155-
case ServerErrorCode.ShutdownInProgress:
156-
case ServerErrorCode.SocketException:
157-
case ServerErrorCode.WriteConcernFailed:
158-
return true;
159-
160-
default:
161-
return false;
162-
}
163-
}
164-
165-
return
166-
ex is MongoConnectionException ||
167-
ex is MongoNotPrimaryException ||
168-
ex is MongoNodeIsRecoveringException;
169-
}
170-
171138
private static bool ShouldThrowOriginalException(Exception retryException)
172139
{
173140
return retryException is MongoException && !(retryException is MongoConnectionException);

src/MongoDB.Driver.Core/MongoDB.Driver.Core.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
<Compile Include="Core\Clusters\IClusterClock.cs" />
128128
<Compile Include="Core\Operations\DelayedEvaluationWriteConcernSerializer.cs" />
129129
<Compile Include="Core\Operations\RetryableDeleteCommandOperation.cs" />
130+
<Compile Include="Core\Operations\RetryabilityHelper.cs" />
130131
<Compile Include="Core\Operations\RetryableInsertCommandOperation.cs" />
131132
<Compile Include="Core\Operations\IOperationClock.cs" />
132133
<Compile Include="Core\Operations\IRetryableOperation.cs" />

tests/MongoDB.Bson.TestHelpers/Reflector.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
* limitations under the License.
1414
*/
1515

16-
using System;
1716
using System.Linq;
1817
using System.Reflection;
1918

@@ -43,14 +42,5 @@ public static object Invoke<T1>(object obj, string name, T1 arg1)
4342
.Single();
4443
return methodInfo.Invoke(obj, new object[] { arg1 });
4544
}
46-
47-
public static object InvokeStatic<T1>(Type type, string name, T1 arg1)
48-
{
49-
var parameterTypes = new[] { typeof(T1) };
50-
var methodInfo = type.GetMethods(BindingFlags.NonPublic | BindingFlags.Static)
51-
.Where(m => m.Name == name && m.GetParameters().Select(p => p.ParameterType).SequenceEqual(parameterTypes))
52-
.Single();
53-
return methodInfo.Invoke(null, new object[] { arg1 });
54-
}
5545
}
5646
}

tests/MongoDB.Driver.Core.Tests/Core/Operations/ChangeStreamCursorTests.cs

Lines changed: 51 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,26 @@
1313
* limitations under the License.
1414
*/
1515

16-
using System;
17-
using System.Collections.Generic;
18-
using System.IO;
19-
using System.Reflection;
20-
using System.Threading;
21-
using System.Threading.Tasks;
2216
using FluentAssertions;
2317
using MongoDB.Bson;
2418
using MongoDB.Bson.IO;
2519
using MongoDB.Bson.Serialization;
2620
using MongoDB.Bson.Serialization.Serializers;
27-
using MongoDB.Bson.TestHelpers;
2821
using MongoDB.Bson.TestHelpers.XunitExtensions;
2922
using MongoDB.Driver.Core.Bindings;
23+
using MongoDB.Driver.Core.Clusters;
24+
using MongoDB.Driver.Core.Connections;
25+
using MongoDB.Driver.Core.Servers;
3026
using MongoDB.Driver.Core.TestHelpers;
3127
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
3228
using Moq;
29+
using System;
30+
using System.Collections.Generic;
31+
using System.IO;
32+
using System.Net;
33+
using System.Reflection;
34+
using System.Threading;
35+
using System.Threading.Tasks;
3336
using Xunit;
3437

3538
namespace MongoDB.Driver.Core.Operations
@@ -369,43 +372,6 @@ void ProcessBatch_should_throw_when_resume_token_is_missing(
369372
exception.Message.Should().Be("Cannot provide resume functionality when the resume token is missing.");
370373
}
371374

372-
[Theory]
373-
[InlineData(1, false)]
374-
[InlineData(ServerErrorCode.HostNotFound, true)]
375-
[InlineData(ServerErrorCode.HostUnreachable, true)]
376-
[InlineData(ServerErrorCode.InterruptedAtShutdown, true)]
377-
[InlineData(ServerErrorCode.InterruptedDueToReplStateChange, true)]
378-
[InlineData(ServerErrorCode.NetworkTimeout, true)]
379-
[InlineData(ServerErrorCode.NotMaster, true)]
380-
[InlineData(ServerErrorCode.NotMasterNoSlaveOk, true)]
381-
[InlineData(ServerErrorCode.NotMasterOrSecondary, true)]
382-
[InlineData(ServerErrorCode.PrimarySteppedDown, true)]
383-
[InlineData(ServerErrorCode.ShutdownInProgress, true)]
384-
[InlineData(ServerErrorCode.SocketException, true)]
385-
[InlineData(ServerErrorCode.WriteConcernFailed, true)]
386-
public void CanResumeAfter_should_return_expected_result_for_MongoCommandExceptions(int code, bool expectedResult)
387-
{
388-
var subject = CreateSubject();
389-
var exception = CoreExceptionHelper.CreateMongoCommandException(code);
390-
var result = subject.CanResumeAfter(exception);
391-
392-
result.Should().Be(expectedResult);
393-
}
394-
395-
[Theory]
396-
[InlineData(typeof(IOException), false)]
397-
[InlineData(typeof(MongoConnectionException), true)]
398-
[InlineData(typeof(MongoCursorNotFoundException), true)]
399-
[InlineData(typeof(MongoNotPrimaryException), true)]
400-
public void CanResumeAfter_should_return_expected_result_for_other_exceptions(Type exceptionType, bool expectedResult)
401-
{
402-
var subject = CreateSubject();
403-
var exception = CoreExceptionHelper.CreateException(exceptionType);
404-
var result = subject.CanResumeAfter(exception);
405-
406-
result.Should().Be(expectedResult);
407-
}
408-
409375
// private methods
410376
private ChangeStreamOperation<BsonDocument> CreateChangeStreamOperation()
411377
{
@@ -448,14 +414,46 @@ private RawBsonDocument ToRawDocument(BsonDocument document)
448414

449415
internal static class ChangeStreamCursorReflector
450416
{
451-
public static IReadBinding _binding(this ChangeStreamCursor<BsonDocument> cursor) => (IReadBinding)Reflector.GetFieldValue(cursor, nameof(_binding));
452-
public static ChangeStreamOperation<BsonDocument> _changeStreamOperation(this ChangeStreamCursor<BsonDocument> cursor) => (ChangeStreamOperation<BsonDocument>)Reflector.GetFieldValue(cursor, nameof(_changeStreamOperation));
453-
public static IEnumerable<BsonDocument> _current(this ChangeStreamCursor<BsonDocument> cursor) => (IEnumerable<BsonDocument>)Reflector.GetFieldValue(cursor, nameof(_current));
454-
public static IAsyncCursor<RawBsonDocument> _cursor(this ChangeStreamCursor<BsonDocument> cursor) => (IAsyncCursor<RawBsonDocument>)Reflector.GetFieldValue(cursor, nameof(_cursor));
455-
public static bool _disposed(this ChangeStreamCursor<BsonDocument> cursor) => (bool)Reflector.GetFieldValue(cursor, nameof(_disposed));
456-
public static IBsonSerializer<BsonDocument> _documentSerializer(this ChangeStreamCursor<BsonDocument> cursor) => (IBsonSerializer<BsonDocument>)Reflector.GetFieldValue(cursor, nameof(_documentSerializer));
457-
public static BsonDocument _resumeToken(this ChangeStreamCursor<BsonDocument> cursor) => (BsonDocument)Reflector.GetFieldValue(cursor, nameof(_resumeToken));
458-
459-
public static bool CanResumeAfter(this ChangeStreamCursor<BsonDocument> cursor, Exception exception) => (bool)Reflector.Invoke(cursor, nameof(CanResumeAfter), exception);
417+
public static IReadBinding _binding(this ChangeStreamCursor<BsonDocument> cursor)
418+
{
419+
var fieldInfo = typeof(ChangeStreamCursor<BsonDocument>).GetField("_binding", BindingFlags.NonPublic | BindingFlags.Instance);
420+
return (IReadBinding)fieldInfo.GetValue(cursor);
421+
}
422+
423+
public static ChangeStreamOperation<BsonDocument> _changeStreamOperation(this ChangeStreamCursor<BsonDocument> cursor)
424+
{
425+
var fieldInfo = typeof(ChangeStreamCursor<BsonDocument>).GetField("_changeStreamOperation", BindingFlags.NonPublic | BindingFlags.Instance);
426+
return (ChangeStreamOperation<BsonDocument>)fieldInfo.GetValue(cursor);
427+
}
428+
429+
public static IEnumerable<BsonDocument> _current(this ChangeStreamCursor<BsonDocument> cursor)
430+
{
431+
var fieldInfo = typeof(ChangeStreamCursor<BsonDocument>).GetField("_current", BindingFlags.NonPublic | BindingFlags.Instance);
432+
return (IEnumerable<BsonDocument>)fieldInfo.GetValue(cursor);
433+
}
434+
435+
public static IAsyncCursor<RawBsonDocument> _cursor(this ChangeStreamCursor<BsonDocument> cursor)
436+
{
437+
var fieldInfo = typeof(ChangeStreamCursor<BsonDocument>).GetField("_cursor", BindingFlags.NonPublic | BindingFlags.Instance);
438+
return (IAsyncCursor<RawBsonDocument>)fieldInfo.GetValue(cursor);
439+
}
440+
441+
public static bool _disposed(this ChangeStreamCursor<BsonDocument> cursor)
442+
{
443+
var fieldInfo = typeof(ChangeStreamCursor<BsonDocument>).GetField("_disposed", BindingFlags.NonPublic | BindingFlags.Instance);
444+
return (bool)fieldInfo.GetValue(cursor);
445+
}
446+
447+
public static IBsonSerializer<BsonDocument> _documentSerializer(this ChangeStreamCursor<BsonDocument> cursor)
448+
{
449+
var fieldInfo = typeof(ChangeStreamCursor<BsonDocument>).GetField("_documentSerializer", BindingFlags.NonPublic | BindingFlags.Instance);
450+
return (IBsonSerializer<BsonDocument>)fieldInfo.GetValue(cursor);
451+
}
452+
453+
public static BsonDocument _resumeToken(this ChangeStreamCursor<BsonDocument> cursor)
454+
{
455+
var fieldInfo = typeof(ChangeStreamCursor<BsonDocument>).GetField("_resumeToken", BindingFlags.NonPublic | BindingFlags.Instance);
456+
return (BsonDocument)fieldInfo.GetValue(cursor);
457+
}
460458
}
461459
}

0 commit comments

Comments
 (0)