Skip to content

Commit 5c569d1

Browse files
committed
CSHARP-2221: Implement transactions.
1 parent a01b836 commit 5c569d1

File tree

204 files changed

+30251
-353
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

204 files changed

+30251
-353
lines changed

evergreen/evergreen.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,10 @@ axes:
349349
- id: version
350350
display_name: MongoDB Version
351351
values:
352-
# - id: "latest"
353-
# display_name: "latest"
354-
# variables:
355-
# VERSION: "latest"
352+
- id: "latest"
353+
display_name: "latest"
354+
variables:
355+
VERSION: "latest"
356356
- id: "3.6"
357357
display_name: "3.6"
358358
variables:

src/MongoDB.Bson/ObjectModel/RawBsonArray.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System;
1717
using System.Collections;
1818
using System.Collections.Generic;
19+
using System.IO;
1920
using System.Linq;
2021
using MongoDB.Bson.IO;
2122
using MongoDB.Bson.Serialization;
@@ -521,6 +522,28 @@ public override void Insert(int index, BsonValue value)
521522
throw new NotSupportedException("RawBsonArray instances are immutable.");
522523
}
523524

525+
/// <summary>
526+
/// Materializes the RawBsonArray into a regular BsonArray.
527+
/// </summary>
528+
/// <param name="binaryReaderSettings">The binary reader settings.</param>
529+
/// <returns>A BsonArray.</returns>
530+
public BsonArray Materialize(BsonBinaryReaderSettings binaryReaderSettings)
531+
{
532+
ThrowIfDisposed();
533+
534+
// because BsonBinaryReader can only read documents at the top level we have to wrap the RawBsonArray in a document
535+
var document = new BsonDocument("array", this);
536+
var bytes = document.ToBson();
537+
538+
using (var stream = new MemoryStream(bytes))
539+
using (var reader = new BsonBinaryReader(stream, binaryReaderSettings))
540+
{
541+
var context = BsonDeserializationContext.CreateRoot(reader);
542+
var materializedDocument = BsonDocumentSerializer.Instance.Deserialize(context);
543+
return materializedDocument["array"].AsBsonArray;
544+
}
545+
}
546+
524547
/// <summary>
525548
/// Removes the first occurrence of a value from the array.
526549
/// </summary>

src/MongoDB.Driver.Core/Core/Bindings/CoreSession.cs

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
* limitations under the License.
1414
*/
1515

16+
using System;
17+
using System.Threading;
18+
using System.Threading.Tasks;
1619
using MongoDB.Bson;
1720
using MongoDB.Driver.Core.Clusters;
1821
using MongoDB.Driver.Core.Misc;
@@ -27,7 +30,9 @@ namespace MongoDB.Driver.Core.Bindings
2730
public sealed class CoreSession : ICoreSession
2831
{
2932
// private fields
33+
private readonly ICluster _cluster;
3034
private readonly IClusterClock _clusterClock = new ClusterClock();
35+
private CoreTransaction _currentTransaction;
3136
private bool _disposed;
3237
private readonly IOperationClock _operationClock = new OperationClock();
3338
private readonly CoreSessionOptions _options;
@@ -37,20 +42,29 @@ public sealed class CoreSession : ICoreSession
3742
/// <summary>
3843
/// Initializes a new instance of the <see cref="CoreSession" /> class.
3944
/// </summary>
45+
/// <param name="cluster">The cluster.</param>
4046
/// <param name="serverSession">The server session.</param>
4147
/// <param name="options">The options.</param>
4248
public CoreSession(
49+
ICluster cluster,
4350
ICoreServerSession serverSession,
4451
CoreSessionOptions options)
4552
{
53+
_cluster = Ensure.IsNotNull(cluster, nameof(cluster));
4654
_serverSession = Ensure.IsNotNull(serverSession, nameof(serverSession));
4755
_options = Ensure.IsNotNull(options, nameof(options));
4856
}
4957

5058
// public properties
59+
/// <inheritdoc />
60+
public ICluster Cluster => _cluster;
61+
5162
/// <inheritdoc />
5263
public BsonDocument ClusterTime => _clusterClock.ClusterTime;
5364

65+
/// <inheritdoc />
66+
public CoreTransaction CurrentTransaction => _currentTransaction;
67+
5468
/// <inheritdoc />
5569
public BsonDocument Id => _serverSession.Id;
5670

@@ -60,6 +74,9 @@ public CoreSession(
6074
/// <inheritdoc />
6175
public bool IsImplicit => _options.IsImplicit;
6276

77+
/// <inheritdoc />
78+
public bool IsInTransaction => _currentTransaction != null;
79+
6380
/// <inheritdoc />
6481
public BsonTimestamp OperationTime => _operationClock.OperationTime;
6582

@@ -70,6 +87,100 @@ public CoreSession(
7087
public ICoreServerSession ServerSession => _serverSession;
7188

7289
// public methods
90+
/// <inheritdoc />
91+
public void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken))
92+
{
93+
EnsureIsInTransaction(nameof(AbortTransaction));
94+
95+
try
96+
{
97+
if (_currentTransaction.StatementId == 0)
98+
{
99+
return;
100+
}
101+
102+
try
103+
{
104+
var firstAttempt = CreateAbortTransactionOperation();
105+
ExecuteEndTransactionOnPrimary(firstAttempt, cancellationToken);
106+
return;
107+
}
108+
catch (Exception exception) when (ShouldIgnoreAbortTransactionException(exception))
109+
{
110+
return; // ignore exception and return
111+
}
112+
catch (Exception exception) when (ShouldRetryEndTransactionException(exception))
113+
{
114+
// ignore exception and retry
115+
}
116+
catch
117+
{
118+
return; // ignore exception and return
119+
}
120+
121+
try
122+
{
123+
var secondAttempt = CreateAbortTransactionOperation();
124+
ExecuteEndTransactionOnPrimary(secondAttempt, cancellationToken);
125+
}
126+
catch
127+
{
128+
return; // ignore exception and return
129+
}
130+
}
131+
finally
132+
{
133+
_currentTransaction = null;
134+
}
135+
}
136+
137+
/// <inheritdoc />
138+
public async Task AbortTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
139+
{
140+
EnsureIsInTransaction(nameof(AbortTransaction));
141+
142+
try
143+
{
144+
if (_currentTransaction.StatementId == 0)
145+
{
146+
return;
147+
}
148+
149+
try
150+
{
151+
var firstAttempt = CreateAbortTransactionOperation();
152+
await ExecuteEndTransactionOnPrimaryAsync(firstAttempt, cancellationToken).ConfigureAwait(false);
153+
return;
154+
}
155+
catch (Exception exception) when (ShouldIgnoreAbortTransactionException(exception))
156+
{
157+
return; // ignore exception and return
158+
}
159+
catch (Exception exception) when (ShouldRetryEndTransactionException(exception))
160+
{
161+
// ignore exception and retry
162+
}
163+
catch
164+
{
165+
return; // ignore exception and return
166+
}
167+
168+
try
169+
{
170+
var secondAttempt = CreateAbortTransactionOperation();
171+
await ExecuteEndTransactionOnPrimaryAsync(secondAttempt, cancellationToken).ConfigureAwait(false);
172+
}
173+
catch
174+
{
175+
return; // ignore exception and return
176+
}
177+
}
178+
finally
179+
{
180+
_currentTransaction = null;
181+
}
182+
}
183+
73184
/// <inheritdoc />
74185
public void AdvanceClusterTime(BsonDocument newClusterTime)
75186
{
@@ -88,20 +199,175 @@ public long AdvanceTransactionNumber()
88199
return _serverSession.AdvanceTransactionNumber();
89200
}
90201

202+
/// <inheritdoc />
203+
public void CommitTransaction(CancellationToken cancellationToken = default(CancellationToken))
204+
{
205+
EnsureIsInTransaction(nameof(CommitTransaction));
206+
207+
try
208+
{
209+
if (_currentTransaction.StatementId == 0)
210+
{
211+
return;
212+
}
213+
214+
try
215+
{
216+
var firstAttempt = CreateCommitTransactionOperation();
217+
ExecuteEndTransactionOnPrimary(firstAttempt, cancellationToken);
218+
return;
219+
}
220+
catch (Exception exception) when (ShouldRetryEndTransactionException(exception))
221+
{
222+
// ignore exception and retry
223+
}
224+
225+
var secondAttempt = CreateCommitTransactionOperation();
226+
ExecuteEndTransactionOnPrimary(secondAttempt, cancellationToken);
227+
}
228+
finally
229+
{
230+
_currentTransaction = null;
231+
}
232+
}
233+
234+
/// <inheritdoc />
235+
public async Task CommitTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
236+
{
237+
EnsureIsInTransaction(nameof(CommitTransaction));
238+
239+
try
240+
{
241+
if (_currentTransaction.StatementId == 0)
242+
{
243+
return;
244+
}
245+
246+
try
247+
{
248+
var firstAttempt = CreateCommitTransactionOperation();
249+
await ExecuteEndTransactionOnPrimaryAsync(firstAttempt, cancellationToken).ConfigureAwait(false);
250+
return;
251+
}
252+
catch (Exception exception) when (ShouldRetryEndTransactionException(exception))
253+
{
254+
// ignore exception and retry
255+
}
256+
257+
var secondAttempt = CreateCommitTransactionOperation();
258+
await ExecuteEndTransactionOnPrimaryAsync(secondAttempt, cancellationToken).ConfigureAwait(false);
259+
}
260+
finally
261+
{
262+
_currentTransaction = null;
263+
}
264+
}
265+
91266
/// <inheritdoc />
92267
public void Dispose()
93268
{
94269
if (!_disposed)
95270
{
271+
if (_currentTransaction != null)
272+
{
273+
try
274+
{
275+
AbortTransaction(CancellationToken.None);
276+
}
277+
catch
278+
{
279+
// ignore exceptions
280+
}
281+
}
282+
96283
_serverSession.Dispose();
97284
_disposed = true;
98285
}
99286
}
100287

288+
/// <inheritdoc />
289+
public void StartTransaction(TransactionOptions transactionOptions = null)
290+
{
291+
if (_currentTransaction != null)
292+
{
293+
throw new InvalidOperationException("Transaction already in progress.");
294+
}
295+
296+
var transactionNumber = AdvanceTransactionNumber();
297+
var readConcern = transactionOptions?.ReadConcern ?? _options.DefaultTransactionOptions?.ReadConcern ?? ReadConcern.Default;
298+
var writeConcern = transactionOptions?.WriteConcern ?? _options.DefaultTransactionOptions?.WriteConcern ?? new WriteConcern();
299+
var effectiveTransactionOptions = new TransactionOptions(readConcern, writeConcern);
300+
var transaction = new CoreTransaction(transactionNumber, effectiveTransactionOptions);
301+
302+
_currentTransaction = transaction;
303+
}
304+
101305
/// <inheritdoc />
102306
public void WasUsed()
103307
{
104308
_serverSession.WasUsed();
105309
}
310+
311+
// private methods
312+
private IReadOperation<BsonDocument> CreateAbortTransactionOperation()
313+
{
314+
return new AbortTransactionOperation(GetTransactionWriteConcern());
315+
}
316+
317+
private IReadOperation<BsonDocument> CreateCommitTransactionOperation()
318+
{
319+
return new CommitTransactionOperation(GetTransactionWriteConcern());
320+
}
321+
322+
private void EnsureIsInTransaction(string methodName)
323+
{
324+
this.AutoStartTransactionIfApplicable();
325+
if (_currentTransaction == null)
326+
{
327+
throw new InvalidOperationException("No transaction started.");
328+
}
329+
}
330+
331+
private TResult ExecuteEndTransactionOnPrimary<TResult>(IReadOperation<TResult> operation, CancellationToken cancellationToken)
332+
{
333+
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
334+
using (var binding = new WritableServerBinding(_cluster, sessionHandle))
335+
{
336+
return operation.Execute(binding, cancellationToken);
337+
}
338+
}
339+
340+
private async Task<TResult> ExecuteEndTransactionOnPrimaryAsync<TResult>(IReadOperation<TResult> operation, CancellationToken cancellationToken)
341+
{
342+
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
343+
using (var binding = new WritableServerBinding(_cluster, sessionHandle))
344+
{
345+
return await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
346+
}
347+
}
348+
349+
private WriteConcern GetTransactionWriteConcern()
350+
{
351+
return
352+
_currentTransaction.TransactionOptions?.WriteConcern ??
353+
_options.DefaultTransactionOptions?.WriteConcern ??
354+
WriteConcern.WMajority;
355+
}
356+
357+
private bool ShouldIgnoreAbortTransactionException(Exception exception)
358+
{
359+
var commandException = exception as MongoCommandException;
360+
if (commandException != null)
361+
{
362+
return true;
363+
}
364+
365+
return false;
366+
}
367+
368+
private bool ShouldRetryEndTransactionException(Exception exception)
369+
{
370+
return RetryabilityHelper.IsRetryableWriteException(exception);
371+
}
106372
}
107373
}

0 commit comments

Comments
 (0)