Skip to content

Commit 255f8b4

Browse files
authored
CSHARP-3537: CSOT: retryable reads and writes (#1717)
1 parent c4cc5e3 commit 255f8b4

18 files changed

+476
-416
lines changed

src/MongoDB.Driver/Core/Operations/BulkMixedWriteOperation.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,9 @@ public WriteConcern WriteConcern
139139
public BulkWriteOperationResult Execute(OperationContext operationContext, IWriteBinding binding)
140140
{
141141
using (BeginOperation())
142-
using (var context = RetryableWriteContext.Create(operationContext, binding, _retryRequested))
142+
using (var context = RetryableWriteContext.Create(operationContext, binding, IsOperationRetryable()))
143143
{
144144
EnsureHintIsSupportedIfAnyRequestHasHint();
145-
context.DisableRetriesIfAnyWriteRequestIsNotRetryable(_requests);
146145
var helper = new BatchHelper(_requests, _isOrdered, _writeConcern);
147146
foreach (var batch in helper.GetBatches())
148147
{
@@ -155,10 +154,9 @@ public BulkWriteOperationResult Execute(OperationContext operationContext, IWrit
155154
public async Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operationContext, IWriteBinding binding)
156155
{
157156
using (BeginOperation())
158-
using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false))
157+
using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, IsOperationRetryable()).ConfigureAwait(false))
159158
{
160159
EnsureHintIsSupportedIfAnyRequestHasHint();
161-
context.DisableRetriesIfAnyWriteRequestIsNotRetryable(_requests);
162160
var helper = new BatchHelper(_requests, _isOrdered, _writeConcern);
163161
foreach (var batch in helper.GetBatches())
164162
{
@@ -168,6 +166,9 @@ public async Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operat
168166
}
169167
}
170168

169+
private bool IsOperationRetryable()
170+
=> _retryRequested && _requests.All(r => r.IsRetryable());
171+
171172
private IDisposable BeginOperation() =>
172173
// Execution starts with the first request
173174
EventContext.BeginOperation(null, _requests.FirstOrDefault()?.RequestType.ToString().ToLower());

src/MongoDB.Driver/Core/Operations/BulkUnmixedWriteOperationBase.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,8 @@ public BulkWriteOperationResult Execute(OperationContext operationContext, Retry
129129
public BulkWriteOperationResult Execute(OperationContext operationContext, IWriteBinding binding)
130130
{
131131
using (BeginOperation())
132-
using (var context = RetryableWriteContext.Create(operationContext, binding, _retryRequested))
132+
using (var context = RetryableWriteContext.Create(operationContext, binding, IsOperationRetryable()))
133133
{
134-
context.DisableRetriesIfAnyWriteRequestIsNotRetryable(_requests);
135134
return Execute(operationContext, context);
136135
}
137136
}
@@ -146,9 +145,8 @@ public Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operationCon
146145
public async Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operationContext, IWriteBinding binding)
147146
{
148147
using (BeginOperation())
149-
using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false))
148+
using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, IsOperationRetryable()).ConfigureAwait(false))
150149
{
151-
context.DisableRetriesIfAnyWriteRequestIsNotRetryable(_requests);
152150
return await ExecuteAsync(operationContext, context).ConfigureAwait(false);
153151
}
154152
}
@@ -159,6 +157,9 @@ public async Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operat
159157
protected abstract bool RequestHasHint(TWriteRequest request);
160158

161159
// private methods
160+
private bool IsOperationRetryable()
161+
=> _retryRequested && _requests.All(r => r.IsRetryable());
162+
162163
private IDisposable BeginOperation() =>
163164
EventContext.BeginOperation(null, _requests.FirstOrDefault()?.RequestType.ToString().ToLower());
164165

src/MongoDB.Driver/Core/Operations/DeleteRequest.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
*/
1515

1616
using MongoDB.Bson;
17-
using MongoDB.Driver.Core.Connections;
1817
using MongoDB.Driver.Core.Misc;
1918

2019
namespace MongoDB.Driver.Core.Operations
@@ -36,6 +35,6 @@ public DeleteRequest(BsonDocument filter)
3635
public int Limit { get; init; }
3736

3837
// public methods
39-
public override bool IsRetryable(ConnectionDescription connectionDescription) => Limit != 0;
38+
public override bool IsRetryable() => Limit != 0;
4039
}
4140
}

src/MongoDB.Driver/Core/Operations/InsertRequest.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
*/
1515

1616
using MongoDB.Bson;
17-
using MongoDB.Driver.Core.Connections;
1817
using MongoDB.Driver.Core.Misc;
1918

2019
namespace MongoDB.Driver.Core.Operations
@@ -32,6 +31,6 @@ public InsertRequest(BsonDocument document)
3231
public BsonDocument Document { get; }
3332

3433
// public methods
35-
public override bool IsRetryable(ConnectionDescription connectionDescription) => true;
34+
public override bool IsRetryable() => true;
3635
}
3736
}

src/MongoDB.Driver/Core/Operations/RetryabilityHelper.cs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2018-present MongoDB Inc.
1+
/* Copyright 2010-present MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -135,20 +135,17 @@ public static bool IsResumableChangeStreamException(Exception exception, int max
135135
{
136136
return exception is MongoException mongoException ? mongoException.HasErrorLabel(ResumableChangeStreamErrorLabel) : false;
137137
}
138-
else
138+
139+
if (exception is MongoCommandException commandException)
139140
{
140-
var commandException = exception as MongoCommandException;
141-
if (commandException != null)
141+
var code = (ServerErrorCode)commandException.Code;
142+
if (__resumableChangeStreamErrorCodes.Contains(code))
142143
{
143-
var code = (ServerErrorCode)commandException.Code;
144-
if (__resumableChangeStreamErrorCodes.Contains(code))
145-
{
146-
return true;
147-
}
144+
return true;
148145
}
149-
150-
return __resumableChangeStreamExceptions.Contains(exception.GetType());
151146
}
147+
148+
return __resumableChangeStreamExceptions.Contains(exception.GetType());
152149
}
153150

154151
/// <summary>
Lines changed: 51 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2019-present MongoDB Inc.
1+
/* Copyright 2010-present MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -14,9 +14,11 @@
1414
*/
1515

1616
using System;
17+
using System.Collections.Generic;
1718
using System.Threading.Tasks;
1819
using MongoDB.Driver.Core.Bindings;
1920
using MongoDB.Driver.Core.Misc;
21+
using MongoDB.Driver.Core.Servers;
2022

2123
namespace MongoDB.Driver.Core.Operations
2224
{
@@ -29,41 +31,33 @@ public static RetryableReadContext Create(OperationContext operationContext, IRe
2931
var context = new RetryableReadContext(binding, retryRequested);
3032
try
3133
{
32-
context.Initialize(operationContext);
33-
34-
ChannelPinningHelper.PinChannellIfRequired(
35-
context.ChannelSource,
36-
context.Channel,
37-
context.Binding.Session);
38-
39-
return context;
34+
context.AcquireOrReplaceChannel(operationContext, null);
4035
}
4136
catch
4237
{
4338
context.Dispose();
4439
throw;
4540
}
41+
42+
ChannelPinningHelper.PinChannellIfRequired(context.ChannelSource, context.Channel, context.Binding.Session);
43+
return context;
4644
}
4745

4846
public static async Task<RetryableReadContext> CreateAsync(OperationContext operationContext, IReadBinding binding, bool retryRequested)
4947
{
5048
var context = new RetryableReadContext(binding, retryRequested);
5149
try
5250
{
53-
await context.InitializeAsync(operationContext).ConfigureAwait(false);
54-
55-
ChannelPinningHelper.PinChannellIfRequired(
56-
context.ChannelSource,
57-
context.Channel,
58-
context.Binding.Session);
59-
60-
return context;
51+
await context.AcquireOrReplaceChannelAsync(operationContext, null).ConfigureAwait(false);
6152
}
6253
catch
6354
{
6455
context.Dispose();
6556
throw;
6657
}
58+
59+
ChannelPinningHelper.PinChannellIfRequired(context.ChannelSource, context.Channel, context.Binding.Session);
60+
return context;
6761
}
6862
#endregion
6963

@@ -96,50 +90,58 @@ public void Dispose()
9690
}
9791
}
9892

99-
public void ReplaceChannel(IChannelHandle channel)
93+
public void AcquireOrReplaceChannel(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers)
94+
{
95+
var attempt = 1;
96+
while (true)
97+
{
98+
operationContext.ThrowIfTimedOutOrCanceled();
99+
ReplaceChannelSource(Binding.GetReadChannelSource(operationContext, deprioritizedServers));
100+
try
101+
{
102+
ReplaceChannel(ChannelSource.GetChannel(operationContext));
103+
return;
104+
}
105+
catch (Exception ex) when (RetryableReadOperationExecutor.ShouldConnectionAcquireBeRetried(operationContext, this, ex, attempt))
106+
{
107+
attempt++;
108+
}
109+
}
110+
}
111+
112+
public async Task AcquireOrReplaceChannelAsync(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers)
113+
{
114+
var attempt = 1;
115+
while (true)
116+
{
117+
operationContext.ThrowIfTimedOutOrCanceled();
118+
ReplaceChannelSource(await Binding.GetReadChannelSourceAsync(operationContext, deprioritizedServers).ConfigureAwait(false));
119+
try
120+
{
121+
ReplaceChannel(await ChannelSource.GetChannelAsync(operationContext).ConfigureAwait(false));
122+
return;
123+
}
124+
catch (Exception ex) when (RetryableReadOperationExecutor.ShouldConnectionAcquireBeRetried(operationContext, this, ex, attempt))
125+
{
126+
attempt++;
127+
}
128+
}
129+
}
130+
131+
private void ReplaceChannel(IChannelHandle channel)
100132
{
101133
Ensure.IsNotNull(channel, nameof(channel));
102134
_channel?.Dispose();
103135
_channel = channel;
104136
}
105137

106-
public void ReplaceChannelSource(IChannelSourceHandle channelSource)
138+
private void ReplaceChannelSource(IChannelSourceHandle channelSource)
107139
{
108140
Ensure.IsNotNull(channelSource, nameof(channelSource));
109141
_channelSource?.Dispose();
110142
_channel?.Dispose();
111143
_channelSource = channelSource;
112144
_channel = null;
113145
}
114-
115-
private void Initialize(OperationContext operationContext)
116-
{
117-
_channelSource = _binding.GetReadChannelSource(operationContext);
118-
119-
try
120-
{
121-
_channel = _channelSource.GetChannel(operationContext);
122-
}
123-
catch (Exception ex) when (RetryableReadOperationExecutor.ShouldConnectionAcquireBeRetried(this, ex))
124-
{
125-
ReplaceChannelSource(_binding.GetReadChannelSource(operationContext));
126-
ReplaceChannel(_channelSource.GetChannel(operationContext));
127-
}
128-
}
129-
130-
private async Task InitializeAsync(OperationContext operationContext)
131-
{
132-
_channelSource = await _binding.GetReadChannelSourceAsync(operationContext).ConfigureAwait(false);
133-
134-
try
135-
{
136-
_channel = await _channelSource.GetChannelAsync(operationContext).ConfigureAwait(false);
137-
}
138-
catch (Exception ex) when (RetryableReadOperationExecutor.ShouldConnectionAcquireBeRetried(this, ex))
139-
{
140-
ReplaceChannelSource(await _binding.GetReadChannelSourceAsync(operationContext).ConfigureAwait(false));
141-
ReplaceChannel(await _channelSource.GetChannelAsync(operationContext).ConfigureAwait(false));
142-
}
143-
}
144146
}
145147
}

0 commit comments

Comments
 (0)