diff --git a/src/MongoDB.Driver/AbortTransactionOptions.cs b/src/MongoDB.Driver/AbortTransactionOptions.cs
new file mode 100644
index 00000000000..649e7be1413
--- /dev/null
+++ b/src/MongoDB.Driver/AbortTransactionOptions.cs
@@ -0,0 +1,31 @@
+/* Copyright 2010-present MongoDB Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using MongoDB.Driver.Core.Misc;
+
+namespace MongoDB.Driver
+{
+ // TODO: CSOT: Make it public when CSOT will be ready for GA
+ internal sealed class AbortTransactionOptions
+ {
+ public AbortTransactionOptions(TimeSpan? timeout)
+ {
+ Timeout = Ensure.IsNullOrValidTimeout(timeout, nameof(timeout));
+ }
+
+ public TimeSpan? Timeout { get; }
+ }
+}
diff --git a/src/MongoDB.Driver/ClientSessionHandle.cs b/src/MongoDB.Driver/ClientSessionHandle.cs
index 2d606fdeb1b..144e6a94991 100644
--- a/src/MongoDB.Driver/ClientSessionHandle.cs
+++ b/src/MongoDB.Driver/ClientSessionHandle.cs
@@ -1,4 +1,4 @@
-/* Copyright 2017-present MongoDB Inc.
+/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@ namespace MongoDB.Driver
/// A client session handle.
///
///
- internal sealed class ClientSessionHandle : IClientSessionHandle
+ internal sealed class ClientSessionHandle : IClientSessionHandle, IClientSessionInternal
{
// private fields
private readonly IMongoClient _client;
@@ -94,16 +94,20 @@ public IServerSession ServerSession
// public methods
///
- public void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken))
- {
- _coreSession.AbortTransaction(cancellationToken);
- }
+ public void AbortTransaction(CancellationToken cancellationToken = default)
+ => _coreSession.AbortTransaction(cancellationToken);
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ void IClientSessionInternal.AbortTransaction(AbortTransactionOptions options, CancellationToken cancellationToken)
+ => _coreSession.AbortTransaction(options, cancellationToken);
///
- public Task AbortTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
- {
- return _coreSession.AbortTransactionAsync(cancellationToken);
- }
+ public Task AbortTransactionAsync(CancellationToken cancellationToken = default)
+ => _coreSession.AbortTransactionAsync(cancellationToken);
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ Task IClientSessionInternal.AbortTransactionAsync(AbortTransactionOptions options, CancellationToken cancellationToken)
+ => _coreSession.AbortTransactionAsync(options, cancellationToken);
///
public void AdvanceClusterTime(BsonDocument newClusterTime)
@@ -118,16 +122,20 @@ public void AdvanceOperationTime(BsonTimestamp newOperationTime)
}
///
- public void CommitTransaction(CancellationToken cancellationToken = default(CancellationToken))
- {
- _coreSession.CommitTransaction(cancellationToken);
- }
+ public void CommitTransaction(CancellationToken cancellationToken = default)
+ => _coreSession.CommitTransaction(cancellationToken);
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ void IClientSessionInternal.CommitTransaction(CommitTransactionOptions options, CancellationToken cancellationToken)
+ => _coreSession.CommitTransaction(options, cancellationToken);
///
- public Task CommitTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
- {
- return _coreSession.CommitTransactionAsync(cancellationToken);
- }
+ public Task CommitTransactionAsync(CancellationToken cancellationToken = default)
+ => _coreSession.CommitTransactionAsync(cancellationToken);
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ Task IClientSessionInternal.CommitTransactionAsync(CommitTransactionOptions options, CancellationToken cancellationToken)
+ => _coreSession.CommitTransactionAsync(options, cancellationToken);
///
public void Dispose()
diff --git a/src/MongoDB.Driver/CommitTransactionOptions.cs b/src/MongoDB.Driver/CommitTransactionOptions.cs
new file mode 100644
index 00000000000..008e902815e
--- /dev/null
+++ b/src/MongoDB.Driver/CommitTransactionOptions.cs
@@ -0,0 +1,32 @@
+/* Copyright 2010-present MongoDB Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using MongoDB.Driver.Core.Misc;
+
+namespace MongoDB.Driver
+{
+ // TODO: CSOT: Make it public when CSOT will be ready for GA
+ internal sealed class CommitTransactionOptions
+ {
+ public CommitTransactionOptions(TimeSpan? timeout)
+ {
+ Timeout = Ensure.IsNullOrValidTimeout(timeout, nameof(timeout));
+ }
+
+ public TimeSpan? Timeout { get; }
+ }
+}
+
diff --git a/src/MongoDB.Driver/Core/Bindings/CoreSession.cs b/src/MongoDB.Driver/Core/Bindings/CoreSession.cs
index d91eee1610e..0cc6ff66483 100644
--- a/src/MongoDB.Driver/Core/Bindings/CoreSession.cs
+++ b/src/MongoDB.Driver/Core/Bindings/CoreSession.cs
@@ -1,4 +1,4 @@
-/* Copyright 2018-present MongoDB Inc.
+/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ namespace MongoDB.Driver.Core.Bindings
/// Represents a session.
///
///
- public sealed class CoreSession : ICoreSession
+ public sealed class CoreSession : ICoreSession, ICoreSessionInternal
{
// private fields
#pragma warning disable CA2213 // Disposable fields should be disposed
@@ -141,12 +141,15 @@ public bool IsInTransaction
// public methods
///
- public void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken))
+ public void AbortTransaction(CancellationToken cancellationToken = default)
+ => ((ICoreSessionInternal)this).AbortTransaction(null, cancellationToken);
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ void ICoreSessionInternal.AbortTransaction(AbortTransactionOptions options, CancellationToken cancellationToken)
{
EnsureAbortTransactionCanBeCalled(nameof(AbortTransaction));
- // TODO: CSOT implement proper way to obtain the operationContext
- var operationContext = new OperationContext(null, cancellationToken);
+ using var operationContext = new OperationContext(GetTimeout(options?.Timeout), cancellationToken);
try
{
if (_currentTransaction.IsEmpty)
@@ -192,12 +195,15 @@ public bool IsInTransaction
}
///
- public async Task AbortTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
+ public Task AbortTransactionAsync(CancellationToken cancellationToken = default)
+ => ((ICoreSessionInternal)this).AbortTransactionAsync(null, cancellationToken);
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ async Task ICoreSessionInternal.AbortTransactionAsync(AbortTransactionOptions options, CancellationToken cancellationToken)
{
EnsureAbortTransactionCanBeCalled(nameof(AbortTransaction));
- // TODO: CSOT implement proper way to obtain the operationContext
- var operationContext = new OperationContext(null, cancellationToken);
+ using var operationContext = new OperationContext(GetTimeout(options?.Timeout), cancellationToken);
try
{
if (_currentTransaction.IsEmpty)
@@ -292,12 +298,15 @@ public long AdvanceTransactionNumber()
}
///
- public void CommitTransaction(CancellationToken cancellationToken = default(CancellationToken))
+ public void CommitTransaction(CancellationToken cancellationToken = default)
+ => ((ICoreSessionInternal)this).CommitTransaction(null, cancellationToken);
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ void ICoreSessionInternal.CommitTransaction(CommitTransactionOptions options, CancellationToken cancellationToken)
{
EnsureCommitTransactionCanBeCalled(nameof(CommitTransaction));
- // TODO: CSOT implement proper way to obtain the operationContext
- var operationContext = new OperationContext(null, cancellationToken);
+ using var operationContext = new OperationContext(GetTimeout(options?.Timeout), cancellationToken);
try
{
_isCommitTransactionInProgress = true;
@@ -329,12 +338,15 @@ public long AdvanceTransactionNumber()
}
///
- public async Task CommitTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
+ public Task CommitTransactionAsync(CancellationToken cancellationToken = default)
+ => ((ICoreSessionInternal)this).CommitTransactionAsync(null, cancellationToken);
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ async Task ICoreSessionInternal.CommitTransactionAsync(CommitTransactionOptions options, CancellationToken cancellationToken)
{
EnsureCommitTransactionCanBeCalled(nameof(CommitTransaction));
- // TODO: CSOT implement proper way to obtain the operationContext
- var operationContext = new OperationContext(null, cancellationToken);
+ using var operationContext = new OperationContext(GetTimeout(options?.Timeout), cancellationToken);
try
{
_isCommitTransactionInProgress = true;
@@ -563,6 +575,9 @@ private async Task ExecuteEndTransactionOnPrimaryAsync(Operati
}
}
+ private TimeSpan? GetTimeout(TimeSpan? timeout)
+ => timeout ?? _options.DefaultTransactionOptions?.Timeout;
+
private TransactionOptions GetEffectiveTransactionOptions(TransactionOptions transactionOptions)
{
var readConcern = transactionOptions?.ReadConcern ?? _options.DefaultTransactionOptions?.ReadConcern ?? ReadConcern.Default;
diff --git a/src/MongoDB.Driver/Core/Bindings/CoreTransaction.cs b/src/MongoDB.Driver/Core/Bindings/CoreTransaction.cs
index 53747c8530c..6ed2a4f849e 100644
--- a/src/MongoDB.Driver/Core/Bindings/CoreTransaction.cs
+++ b/src/MongoDB.Driver/Core/Bindings/CoreTransaction.cs
@@ -1,4 +1,4 @@
-/* Copyright 2018-present MongoDB Inc.
+/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -56,6 +56,8 @@ public CoreTransaction(long transactionNumber, TransactionOptions transactionOpt
///
public bool IsEmpty => _isEmpty;
+ internal OperationContext OperationContext { get; set; }
+
///
/// Gets the transaction state.
///
diff --git a/src/MongoDB.Driver/Core/Bindings/ICoreSessionExtensions.cs b/src/MongoDB.Driver/Core/Bindings/ICoreSessionExtensions.cs
new file mode 100644
index 00000000000..5d176c6181b
--- /dev/null
+++ b/src/MongoDB.Driver/Core/Bindings/ICoreSessionExtensions.cs
@@ -0,0 +1,67 @@
+/* Copyright 2010-present MongoDB Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MongoDB.Driver.Core.Bindings
+{
+ // TODO: CSOT: Make it public when CSOT will be ready for GA
+ internal static class ICoreSessionExtensions
+ {
+ // TODO: Merge these extension methods in ICoreSession interface on major release
+ public static void AbortTransaction(this ICoreSession session, AbortTransactionOptions options, CancellationToken cancellationToken = default)
+ {
+ if (options == null || session.Options.DefaultTransactionOptions?.Timeout == options.Timeout)
+ {
+ session.AbortTransaction(cancellationToken);
+ return;
+ }
+
+ ((ICoreSessionInternal)session).AbortTransaction(options, cancellationToken);
+ }
+
+ public static Task AbortTransactionAsync(this ICoreSession session, AbortTransactionOptions options, CancellationToken cancellationToken = default)
+ {
+ if (options == null || session.Options.DefaultTransactionOptions?.Timeout == options.Timeout)
+ {
+ return session.AbortTransactionAsync(cancellationToken);
+ }
+
+ return ((ICoreSessionInternal)session).AbortTransactionAsync(options, cancellationToken);
+ }
+
+ public static void CommitTransaction(this ICoreSession session, CommitTransactionOptions options, CancellationToken cancellationToken = default)
+ {
+ if (options == null || session.Options.DefaultTransactionOptions?.Timeout == options.Timeout)
+ {
+ session.CommitTransaction(cancellationToken);
+ return;
+ }
+
+ ((ICoreSessionInternal)session).CommitTransaction(options, cancellationToken);
+ }
+
+ public static Task CommitTransactionAsync(this ICoreSession session, CommitTransactionOptions options, CancellationToken cancellationToken = default)
+ {
+ if (options == null || session.Options.DefaultTransactionOptions?.Timeout == options.Timeout)
+ {
+ return session.CommitTransactionAsync(cancellationToken);
+ }
+
+ return ((ICoreSessionInternal)session).CommitTransactionAsync(options, cancellationToken);
+ }
+ }
+}
diff --git a/src/MongoDB.Driver/Core/Bindings/ICoreSessionInternal.cs b/src/MongoDB.Driver/Core/Bindings/ICoreSessionInternal.cs
new file mode 100644
index 00000000000..1844ae6fa8c
--- /dev/null
+++ b/src/MongoDB.Driver/Core/Bindings/ICoreSessionInternal.cs
@@ -0,0 +1,28 @@
+/* Copyright 2010-present MongoDB Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MongoDB.Driver.Core.Bindings;
+
+// TODO: Merge this interface into ICoreSession on major release
+internal interface ICoreSessionInternal
+{
+ void AbortTransaction(AbortTransactionOptions options, CancellationToken cancellationToken = default);
+ Task AbortTransactionAsync(AbortTransactionOptions options, CancellationToken cancellationToken = default);
+ void CommitTransaction(CommitTransactionOptions options, CancellationToken cancellationToken = default);
+ Task CommitTransactionAsync(CommitTransactionOptions options, CancellationToken cancellationToken = default);
+}
diff --git a/src/MongoDB.Driver/Core/Bindings/NoCoreSession.cs b/src/MongoDB.Driver/Core/Bindings/NoCoreSession.cs
index 7be93d81b43..ac7e68abf06 100644
--- a/src/MongoDB.Driver/Core/Bindings/NoCoreSession.cs
+++ b/src/MongoDB.Driver/Core/Bindings/NoCoreSession.cs
@@ -24,7 +24,7 @@ namespace MongoDB.Driver.Core.Bindings
/// An object that represents no core session.
///
///
- public sealed class NoCoreSession : ICoreSession
+ public sealed class NoCoreSession : ICoreSession, ICoreSessionInternal
{
#region static
// private static fields
@@ -89,13 +89,25 @@ public static ICoreSessionHandle NewHandle()
// public methods
///
- public void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken))
+ public void AbortTransaction(CancellationToken cancellationToken = default)
+ {
+ throw new NotSupportedException("NoCoreSession does not support AbortTransaction.");
+ }
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ void ICoreSessionInternal.AbortTransaction(AbortTransactionOptions options, CancellationToken cancellationToken )
{
throw new NotSupportedException("NoCoreSession does not support AbortTransaction.");
}
///
- public Task AbortTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
+ public Task AbortTransactionAsync(CancellationToken cancellationToken = default)
+ {
+ throw new NotSupportedException("NoCoreSession does not support AbortTransactionAsync.");
+ }
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ Task ICoreSessionInternal.AbortTransactionAsync(AbortTransactionOptions options, CancellationToken cancellationToken )
{
throw new NotSupportedException("NoCoreSession does not support AbortTransactionAsync.");
}
@@ -122,13 +134,25 @@ public long AdvanceTransactionNumber()
}
///
- public void CommitTransaction(CancellationToken cancellationToken = default(CancellationToken))
+ public void CommitTransaction(CancellationToken cancellationToken = default)
+ {
+ throw new NotSupportedException("NoCoreSession does not support CommitTransaction.");
+ }
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ void ICoreSessionInternal.CommitTransaction(CommitTransactionOptions options, CancellationToken cancellationToken)
{
throw new NotSupportedException("NoCoreSession does not support CommitTransaction.");
}
///
- public Task CommitTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
+ public Task CommitTransactionAsync(CancellationToken cancellationToken = default)
+ {
+ throw new NotSupportedException("NoCoreSession does not support CommitTransactionAsync.");
+ }
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ Task ICoreSessionInternal.CommitTransactionAsync(CommitTransactionOptions options, CancellationToken cancellationToken)
{
throw new NotSupportedException("NoCoreSession does not support CommitTransactionAsync.");
}
diff --git a/src/MongoDB.Driver/Core/Bindings/WrappingCoreSession.cs b/src/MongoDB.Driver/Core/Bindings/WrappingCoreSession.cs
index 991e46ab115..1d61b552d9d 100644
--- a/src/MongoDB.Driver/Core/Bindings/WrappingCoreSession.cs
+++ b/src/MongoDB.Driver/Core/Bindings/WrappingCoreSession.cs
@@ -25,7 +25,7 @@ namespace MongoDB.Driver.Core.Bindings
/// An abstract base class for a core session that wraps another core session.
///
///
- public abstract class WrappingCoreSession : ICoreSession
+ public abstract class WrappingCoreSession : ICoreSession, ICoreSessionInternal
{
// private fields
private bool _disposed;
@@ -182,19 +182,33 @@ public ICoreSession Wrapped
// public methods
///
- public virtual void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken))
+ public virtual void AbortTransaction(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
_wrapped.AbortTransaction(cancellationToken);
}
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ void ICoreSessionInternal.AbortTransaction(AbortTransactionOptions options, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+ _wrapped.AbortTransaction(options, cancellationToken);
+ }
+
///
- public virtual Task AbortTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
+ public virtual Task AbortTransactionAsync(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
return _wrapped.AbortTransactionAsync(cancellationToken);
}
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ Task ICoreSessionInternal.AbortTransactionAsync(AbortTransactionOptions options, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+ return _wrapped.AbortTransactionAsync(options, cancellationToken);
+ }
+
///
public virtual void AboutToSendCommand()
{
@@ -223,19 +237,34 @@ public long AdvanceTransactionNumber()
}
///
- public virtual void CommitTransaction(CancellationToken cancellationToken = default(CancellationToken))
+ public virtual void CommitTransaction(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
_wrapped.CommitTransaction(cancellationToken);
}
+
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ void ICoreSessionInternal.CommitTransaction(CommitTransactionOptions options, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+ _wrapped.CommitTransaction(options, cancellationToken);
+ }
+
///
- public virtual Task CommitTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
+ public virtual Task CommitTransactionAsync(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
return _wrapped.CommitTransactionAsync(cancellationToken);
}
+ // TODO: CSOT: Make it public when CSOT will be ready for GA and add default value to cancellationToken parameter.
+ Task ICoreSessionInternal.CommitTransactionAsync(CommitTransactionOptions options, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+ return _wrapped.CommitTransactionAsync(options, cancellationToken);
+ }
+
///
public void Dispose()
{
diff --git a/src/MongoDB.Driver/Core/Misc/IClock.cs b/src/MongoDB.Driver/Core/Misc/IClock.cs
index d409bb604ee..804f6912d68 100644
--- a/src/MongoDB.Driver/Core/Misc/IClock.cs
+++ b/src/MongoDB.Driver/Core/Misc/IClock.cs
@@ -1,4 +1,4 @@
-/* Copyright 2013-present MongoDB Inc.
+/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,5 +20,7 @@ namespace MongoDB.Driver.Core.Misc
internal interface IClock
{
DateTime UtcNow { get; }
+
+ IStopwatch StartStopwatch();
}
}
diff --git a/src/MongoDB.Driver/Core/Misc/IStopwatch.cs b/src/MongoDB.Driver/Core/Misc/IStopwatch.cs
new file mode 100644
index 00000000000..87a8fbfd0fc
--- /dev/null
+++ b/src/MongoDB.Driver/Core/Misc/IStopwatch.cs
@@ -0,0 +1,24 @@
+/* Copyright 2010-present MongoDB Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace MongoDB.Driver.Core.Misc
+{
+ internal interface IStopwatch
+ {
+ TimeSpan Elapsed { get; }
+ }
+}
diff --git a/src/MongoDB.Driver/Core/Misc/SystemClock.cs b/src/MongoDB.Driver/Core/Misc/SystemClock.cs
index f972f9aed62..f418b635e07 100644
--- a/src/MongoDB.Driver/Core/Misc/SystemClock.cs
+++ b/src/MongoDB.Driver/Core/Misc/SystemClock.cs
@@ -1,4 +1,4 @@
-/* Copyright 2013-present MongoDB Inc.
+/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,5 +32,7 @@ public DateTime UtcNow
{
get { return DateTime.UtcNow; }
}
+
+ public IStopwatch StartStopwatch() => new SystemStopwatch();
}
}
diff --git a/src/MongoDB.Driver/Core/Misc/SystemStopwatch.cs b/src/MongoDB.Driver/Core/Misc/SystemStopwatch.cs
new file mode 100644
index 00000000000..4d9a19b5b47
--- /dev/null
+++ b/src/MongoDB.Driver/Core/Misc/SystemStopwatch.cs
@@ -0,0 +1,32 @@
+/* Copyright 2010-present MongoDB Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+
+namespace MongoDB.Driver.Core.Misc
+{
+ internal sealed class SystemStopwatch : IStopwatch
+ {
+ private readonly Stopwatch _wrappedStopwatch;
+
+ public SystemStopwatch()
+ {
+ _wrappedStopwatch = Stopwatch.StartNew();
+ }
+
+ public TimeSpan Elapsed => _wrappedStopwatch.Elapsed;
+ }
+}
diff --git a/src/MongoDB.Driver/Core/Operations/EndTransactionOperation.cs b/src/MongoDB.Driver/Core/Operations/EndTransactionOperation.cs
index 0544b2498c3..007721c3ceb 100644
--- a/src/MongoDB.Driver/Core/Operations/EndTransactionOperation.cs
+++ b/src/MongoDB.Driver/Core/Operations/EndTransactionOperation.cs
@@ -58,7 +58,7 @@ public virtual BsonDocument Execute(OperationContext operationContext, IReadBind
using (var channel = channelSource.GetChannel(operationContext))
using (var channelBinding = new ChannelReadWriteBinding(channelSource.Server, channel, binding.Session.Fork()))
{
- var operation = CreateOperation();
+ var operation = CreateOperation(operationContext);
return operation.Execute(operationContext, channelBinding);
}
}
@@ -71,12 +71,12 @@ public virtual async Task ExecuteAsync(OperationContext operationC
using (var channel = await channelSource.GetChannelAsync(operationContext).ConfigureAwait(false))
using (var channelBinding = new ChannelReadWriteBinding(channelSource.Server, channel, binding.Session.Fork()))
{
- var operation = CreateOperation();
+ var operation = CreateOperation(operationContext);
return await operation.ExecuteAsync(operationContext, channelBinding).ConfigureAwait(false);
}
}
- protected virtual BsonDocument CreateCommand()
+ protected virtual BsonDocument CreateCommand(OperationContext operationContext)
{
return new BsonDocument
{
@@ -86,9 +86,9 @@ protected virtual BsonDocument CreateCommand()
};
}
- private IReadOperation CreateOperation()
+ private IReadOperation CreateOperation(OperationContext operationContext)
{
- var command = CreateCommand();
+ var command = CreateCommand(operationContext);
return new ReadCommandOperation(DatabaseNamespace.Admin, command, BsonDocumentSerializer.Instance, _messageEncoderSettings)
{
RetryRequested = false
@@ -159,10 +159,10 @@ public override async Task ExecuteAsync(OperationContext operation
}
}
- protected override BsonDocument CreateCommand()
+ protected override BsonDocument CreateCommand(OperationContext operationContext)
{
- var command = base.CreateCommand();
- if (_maxCommitTime.HasValue)
+ var command = base.CreateCommand(operationContext);
+ if (_maxCommitTime.HasValue && !operationContext.IsRootContextTimeoutConfigured())
{
command.Add("maxTimeMS", (long)_maxCommitTime.Value.TotalMilliseconds);
}
diff --git a/src/MongoDB.Driver/Core/TransactionOptions.cs b/src/MongoDB.Driver/Core/TransactionOptions.cs
index 9003b0829d4..c4305a95ad3 100644
--- a/src/MongoDB.Driver/Core/TransactionOptions.cs
+++ b/src/MongoDB.Driver/Core/TransactionOptions.cs
@@ -1,4 +1,4 @@
-/* Copyright 2018-present MongoDB Inc.
+/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,6 +14,7 @@
*/
using System;
+using MongoDB.Driver.Core.Misc;
namespace MongoDB.Driver
{
@@ -26,6 +27,7 @@ public class TransactionOptions
private readonly TimeSpan? _maxCommitTime;
private readonly ReadConcern _readConcern;
private readonly ReadPreference _readPreference;
+ private readonly TimeSpan? _timeout;
private readonly WriteConcern _writeConcern;
// public constructors
@@ -41,7 +43,27 @@ public TransactionOptions(
Optional readPreference = default(Optional),
Optional writeConcern = default(Optional),
Optional maxCommitTime = default(Optional))
+ : this(null, readConcern, readPreference, writeConcern, maxCommitTime)
{
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The per operation timeout
+ /// The read concern.
+ /// The read preference.
+ /// The write concern.
+ /// The max commit time.
+ // TODO: CSOT: Make it public when CSOT will be ready for GA
+ internal TransactionOptions(
+ TimeSpan? timeout,
+ Optional readConcern = default(Optional),
+ Optional readPreference = default(Optional),
+ Optional writeConcern = default(Optional),
+ Optional maxCommitTime = default(Optional))
+ {
+ _timeout = Ensure.IsNullOrValidTimeout(timeout, nameof(timeout));
_readConcern = readConcern.WithDefault(null);
_readPreference = readPreference.WithDefault(null);
_writeConcern = writeConcern.WithDefault(null);
@@ -73,6 +95,12 @@ public TransactionOptions(
///
public ReadPreference ReadPreference => _readPreference;
+ ///
+ /// Gets the per operation timeout.
+ ///
+ // TODO: CSOT: Make it public when CSOT will be ready for GA
+ internal TimeSpan? Timeout => _timeout;
+
///
/// Gets the write concern.
///
diff --git a/src/MongoDB.Driver/IClientSessionExtensions.cs b/src/MongoDB.Driver/IClientSessionExtensions.cs
index b1ce144636c..1c16957a8db 100644
--- a/src/MongoDB.Driver/IClientSessionExtensions.cs
+++ b/src/MongoDB.Driver/IClientSessionExtensions.cs
@@ -13,22 +13,69 @@
* limitations under the License.
*/
-namespace MongoDB.Driver;
+using System.Threading;
+using System.Threading.Tasks;
-internal static class IClientSessionExtensions
+namespace MongoDB.Driver
{
- public static ReadPreference GetEffectiveReadPreference(this IClientSession session, ReadPreference defaultReadPreference)
+ // TODO: CSOT: Make it public when CSOT will be ready for GA
+ internal static class IClientSessionExtensions
{
- if (session.IsInTransaction)
+ // TODO: Merge these extension methods in IClientSession interface on major release
+ public static void AbortTransaction(this IClientSession session, AbortTransactionOptions options, CancellationToken cancellationToken = default)
{
- var transactionReadPreference = session.WrappedCoreSession.CurrentTransaction.TransactionOptions?.ReadPreference;
- if (transactionReadPreference != null)
+ if (options?.Timeout == null || session.Options.DefaultTransactionOptions?.Timeout == options.Timeout)
{
- return transactionReadPreference;
+ session.AbortTransaction(cancellationToken);
+ return;
}
+
+ ((IClientSessionInternal)session).AbortTransaction(options, cancellationToken);
+ }
+
+ public static Task AbortTransactionAsync(this IClientSession session, AbortTransactionOptions options, CancellationToken cancellationToken = default)
+ {
+ if (options?.Timeout == null || session.Options.DefaultTransactionOptions?.Timeout == options.Timeout)
+ {
+ return session.AbortTransactionAsync(cancellationToken);
+ }
+
+ return ((IClientSessionInternal)session).AbortTransactionAsync(options, cancellationToken);
+ }
+
+ public static void CommitTransaction(this IClientSession session, CommitTransactionOptions options, CancellationToken cancellationToken = default)
+ {
+ if (options?.Timeout == null || session.Options.DefaultTransactionOptions?.Timeout == options.Timeout)
+ {
+ session.CommitTransaction(cancellationToken);
+ return;
+ }
+
+ ((IClientSessionInternal)session).CommitTransaction(options, cancellationToken);
}
- return defaultReadPreference ?? ReadPreference.Primary;
+ public static Task CommitTransactionAsync(this IClientSession session, CommitTransactionOptions options, CancellationToken cancellationToken = default)
+ {
+ if (options?.Timeout == null || session.Options.DefaultTransactionOptions?.Timeout == options.Timeout)
+ {
+ return session.CommitTransactionAsync(cancellationToken);
+ }
+
+ return ((IClientSessionInternal)session).CommitTransactionAsync(options, cancellationToken);
+ }
+
+ internal static ReadPreference GetEffectiveReadPreference(this IClientSession session, ReadPreference defaultReadPreference)
+ {
+ if (session.IsInTransaction)
+ {
+ var transactionReadPreference = session.WrappedCoreSession.CurrentTransaction.TransactionOptions?.ReadPreference;
+ if (transactionReadPreference != null)
+ {
+ return transactionReadPreference;
+ }
+ }
+
+ return defaultReadPreference ?? ReadPreference.Primary;
+ }
}
}
-
diff --git a/src/MongoDB.Driver/IClientSessionInternal.cs b/src/MongoDB.Driver/IClientSessionInternal.cs
new file mode 100644
index 00000000000..4107b7b811c
--- /dev/null
+++ b/src/MongoDB.Driver/IClientSessionInternal.cs
@@ -0,0 +1,28 @@
+/* Copyright 2010-present MongoDB Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MongoDB.Driver;
+
+// TODO: Merge this interface into ICoreSession on major release
+internal interface IClientSessionInternal
+{
+ void AbortTransaction(AbortTransactionOptions options, CancellationToken cancellationToken = default);
+ Task AbortTransactionAsync(AbortTransactionOptions options, CancellationToken cancellationToken = default);
+ void CommitTransaction(CommitTransactionOptions options, CancellationToken cancellationToken = default);
+ Task CommitTransactionAsync(CommitTransactionOptions options, CancellationToken cancellationToken = default);
+}
diff --git a/src/MongoDB.Driver/IOperationExecutor.cs b/src/MongoDB.Driver/IOperationExecutor.cs
index b64c2380776..6dfd8e741c5 100644
--- a/src/MongoDB.Driver/IOperationExecutor.cs
+++ b/src/MongoDB.Driver/IOperationExecutor.cs
@@ -14,7 +14,6 @@
*/
using System;
-using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core.Operations;
@@ -23,34 +22,30 @@ namespace MongoDB.Driver
internal interface IOperationExecutor : IDisposable
{
TResult ExecuteReadOperation(
+ OperationContext operationContext,
IClientSessionHandle session,
IReadOperation operation,
ReadPreference readPreference,
- bool allowChannelPinning,
- TimeSpan? timeout,
- CancellationToken cancellationToken);
+ bool allowChannelPinning);
Task ExecuteReadOperationAsync(
+ OperationContext operationContext,
IClientSessionHandle session,
IReadOperation operation,
ReadPreference readPreference,
- bool allowChannelPinning,
- TimeSpan? timeout,
- CancellationToken cancellationToken);
+ bool allowChannelPinning);
TResult ExecuteWriteOperation(
+ OperationContext operationContext,
IClientSessionHandle session,
IWriteOperation operation,
- bool allowChannelPinning,
- TimeSpan? timeout,
- CancellationToken cancellationToken);
+ bool allowChannelPinning);
Task ExecuteWriteOperationAsync(
+ OperationContext operationContext,
IClientSessionHandle session,
IWriteOperation operation,
- bool allowChannelPinning,
- TimeSpan? timeout,
- CancellationToken cancellationToken);
+ bool allowChannelPinning);
IClientSessionHandle StartImplicitSession();
}
diff --git a/src/MongoDB.Driver/MongoClient.cs b/src/MongoDB.Driver/MongoClient.cs
index 4fde2d760bd..2b39518cb17 100644
--- a/src/MongoDB.Driver/MongoClient.cs
+++ b/src/MongoDB.Driver/MongoClient.cs
@@ -563,24 +563,42 @@ private ChangeStreamOperation CreateChangeStreamOperation(
_settings.RetryReads,
_settings.TranslationOptions);
+ private OperationContext CreateOperationContext(IClientSessionHandle session, TimeSpan? timeout, CancellationToken cancellationToken)
+ {
+ var operationContext = session.WrappedCoreSession.CurrentTransaction?.OperationContext;
+ if (operationContext != null && timeout != null)
+ {
+ throw new InvalidOperationException("Cannot specify per operation timeout inside transaction.");
+ }
+
+ return operationContext ?? new OperationContext(timeout ?? _settings.Timeout, cancellationToken);
+ }
+
private TResult ExecuteReadOperation(IClientSessionHandle session, IReadOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
{
var readPreference = session.GetEffectiveReadPreference(_settings.ReadPreference);
- return _operationExecutor.ExecuteReadOperation(session, operation, readPreference, false, timeout ?? _settings.Timeout, cancellationToken);
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return _operationExecutor.ExecuteReadOperation(operationContext, session, operation, readPreference, false);
}
- private Task ExecuteReadOperationAsync(IClientSessionHandle session, IReadOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
+ private async Task ExecuteReadOperationAsync(IClientSessionHandle session, IReadOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
{
var readPreference = session.GetEffectiveReadPreference(_settings.ReadPreference);
- return _operationExecutor.ExecuteReadOperationAsync(session, operation, readPreference, false, timeout ?? _settings.Timeout, cancellationToken);
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return await _operationExecutor.ExecuteReadOperationAsync(operationContext, session, operation, readPreference, false).ConfigureAwait(false);
}
private TResult ExecuteWriteOperation(IClientSessionHandle session, IWriteOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
- => _operationExecutor.ExecuteWriteOperation(session, operation, false, timeout ?? _settings.Timeout, cancellationToken);
-
- private Task ExecuteWriteOperationAsync(IClientSessionHandle session, IWriteOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
- => _operationExecutor.ExecuteWriteOperationAsync(session, operation, false, timeout ?? _settings.Timeout, cancellationToken);
+ {
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return _operationExecutor.ExecuteWriteOperation(operationContext, session, operation, false);
+ }
+ private async Task ExecuteWriteOperationAsync(IClientSessionHandle session, IWriteOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
+ {
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return await _operationExecutor.ExecuteWriteOperationAsync(operationContext, session, operation, false).ConfigureAwait(false);
+ }
private MessageEncoderSettings GetMessageEncoderSettings()
{
@@ -609,7 +627,17 @@ private IClientSessionHandle StartSession(ClientSessionOptions options)
throw new NotSupportedException("Combining both causal consistency and snapshot options is not supported.");
}
- options = options ?? new ClientSessionOptions();
+ options ??= new ClientSessionOptions();
+ if (_settings.Timeout.HasValue && options.DefaultTransactionOptions?.Timeout == null)
+ {
+ options.DefaultTransactionOptions = new TransactionOptions(
+ _settings.Timeout,
+ options.DefaultTransactionOptions?.ReadConcern,
+ options.DefaultTransactionOptions?.ReadPreference,
+ options.DefaultTransactionOptions?.WriteConcern,
+ options.DefaultTransactionOptions?.MaxCommitTime);
+ }
+
var coreSession = _cluster.StartSession(options.ToCore());
return new ClientSessionHandle(this, options, coreSession);
diff --git a/src/MongoDB.Driver/MongoCollectionImpl.cs b/src/MongoDB.Driver/MongoCollectionImpl.cs
index 730f78ea165..c2800d1044f 100644
--- a/src/MongoDB.Driver/MongoCollectionImpl.cs
+++ b/src/MongoDB.Driver/MongoCollectionImpl.cs
@@ -1199,29 +1199,48 @@ private IAsyncCursor CreateMapReduceOutputToCollectionResultCursor(IClientSessionHandle session, IReadOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
=> ExecuteReadOperation(session, operation, null, timeout, cancellationToken);
private TResult ExecuteReadOperation(IClientSessionHandle session, IReadOperation operation, ReadPreference explicitReadPreference, TimeSpan? timeout, CancellationToken cancellationToken)
{
var readPreference = explicitReadPreference ?? session.GetEffectiveReadPreference(_settings.ReadPreference);
- return _operationExecutor.ExecuteReadOperation(session, operation, readPreference, true, timeout ?? _settings.Timeout, cancellationToken);
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return _operationExecutor.ExecuteReadOperation(operationContext, session, operation, readPreference, true);
}
private Task ExecuteReadOperationAsync(IClientSessionHandle session, IReadOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
=> ExecuteReadOperationAsync(session, operation, null, timeout, cancellationToken);
- private Task ExecuteReadOperationAsync(IClientSessionHandle session, IReadOperation operation, ReadPreference explicitReadPreference, TimeSpan? timeout, CancellationToken cancellationToken)
+ private async Task ExecuteReadOperationAsync(IClientSessionHandle session, IReadOperation operation, ReadPreference explicitReadPreference, TimeSpan? timeout, CancellationToken cancellationToken)
{
var readPreference = explicitReadPreference ?? session.GetEffectiveReadPreference(_settings.ReadPreference);
- return _operationExecutor.ExecuteReadOperationAsync(session, operation, readPreference, true, timeout ?? _settings.Timeout, cancellationToken);
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return await _operationExecutor.ExecuteReadOperationAsync(operationContext, session, operation, readPreference, true).ConfigureAwait(false);
}
private TResult ExecuteWriteOperation(IClientSessionHandle session, IWriteOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
- => _operationExecutor.ExecuteWriteOperation(session, operation, true, timeout ?? _settings.Timeout, cancellationToken);
+ {
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return _operationExecutor.ExecuteWriteOperation(operationContext, session, operation, true);
+ }
- private Task ExecuteWriteOperationAsync(IClientSessionHandle session, IWriteOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
- => _operationExecutor.ExecuteWriteOperationAsync(session, operation, true, timeout ?? _settings.Timeout, cancellationToken);
+ private async Task ExecuteWriteOperationAsync(IClientSessionHandle session, IWriteOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
+ {
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return await _operationExecutor.ExecuteWriteOperationAsync(operationContext, session, operation, true).ConfigureAwait(false);
+ }
private MessageEncoderSettings GetMessageEncoderSettings()
{
diff --git a/src/MongoDB.Driver/MongoDatabase.cs b/src/MongoDB.Driver/MongoDatabase.cs
index 779af6deaed..87016378309 100644
--- a/src/MongoDB.Driver/MongoDatabase.cs
+++ b/src/MongoDB.Driver/MongoDatabase.cs
@@ -755,29 +755,48 @@ private ChangeStreamOperation CreateChangeStreamOperation(
translationOptions);
}
+ private OperationContext CreateOperationContext(IClientSessionHandle session, TimeSpan? timeout, CancellationToken cancellationToken)
+ {
+ var operationContext = session.WrappedCoreSession.CurrentTransaction?.OperationContext;
+ if (operationContext != null && timeout != null)
+ {
+ throw new InvalidOperationException("Cannot specify per operation timeout inside transaction.");
+ }
+
+ return operationContext ?? new OperationContext(timeout ?? _settings.Timeout, cancellationToken);
+ }
+
private TResult ExecuteReadOperation(IClientSessionHandle session, IReadOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
=> ExecuteReadOperation(session, operation, null, timeout, cancellationToken);
private TResult ExecuteReadOperation(IClientSessionHandle session, IReadOperation operation, ReadPreference explicitReadPreference, TimeSpan? timeout, CancellationToken cancellationToken)
{
var readPreference = explicitReadPreference ?? session.GetEffectiveReadPreference(_settings.ReadPreference);
- return _operationExecutor.ExecuteReadOperation(session, operation, readPreference, true, timeout ?? _settings.Timeout, cancellationToken);
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return _operationExecutor.ExecuteReadOperation(operationContext, session, operation, readPreference, true);
}
private Task ExecuteReadOperationAsync(IClientSessionHandle session, IReadOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
=> ExecuteReadOperationAsync(session, operation, null, timeout, cancellationToken);
- private Task ExecuteReadOperationAsync(IClientSessionHandle session, IReadOperation operation, ReadPreference explicitReadPreference, TimeSpan? timeout, CancellationToken cancellationToken)
+ private async Task ExecuteReadOperationAsync(IClientSessionHandle session, IReadOperation operation, ReadPreference explicitReadPreference, TimeSpan? timeout, CancellationToken cancellationToken)
{
var readPreference = explicitReadPreference ?? session.GetEffectiveReadPreference(_settings.ReadPreference);
- return _operationExecutor.ExecuteReadOperationAsync(session, operation, readPreference, true, timeout ?? _settings.Timeout, cancellationToken);
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return await _operationExecutor.ExecuteReadOperationAsync(operationContext, session, operation, readPreference, true).ConfigureAwait(false);
}
private TResult ExecuteWriteOperation(IClientSessionHandle session, IWriteOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
- => _operationExecutor.ExecuteWriteOperation(session, operation, true, timeout ?? _settings.Timeout, cancellationToken);
+ {
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return _operationExecutor.ExecuteWriteOperation(operationContext, session, operation, true);
+ }
- private Task ExecuteWriteOperationAsync(IClientSessionHandle session, IWriteOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
- => _operationExecutor.ExecuteWriteOperationAsync(session, operation, true, timeout ?? _settings.Timeout, cancellationToken);
+ private async Task ExecuteWriteOperationAsync(IClientSessionHandle session, IWriteOperation operation, TimeSpan? timeout, CancellationToken cancellationToken)
+ {
+ using var operationContext = CreateOperationContext(session, timeout, cancellationToken);
+ return await _operationExecutor.ExecuteWriteOperationAsync(operationContext, session, operation, true).ConfigureAwait(false);
+ }
private IEnumerable ExtractCollectionNames(IEnumerable collections)
{
diff --git a/src/MongoDB.Driver/OperationContext.cs b/src/MongoDB.Driver/OperationContext.cs
index d2a18e6e729..5a00621a442 100644
--- a/src/MongoDB.Driver/OperationContext.cs
+++ b/src/MongoDB.Driver/OperationContext.cs
@@ -14,7 +14,6 @@
*/
using System;
-using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core.Misc;
@@ -30,13 +29,14 @@ internal sealed class OperationContext : IDisposable
private CancellationTokenSource _combinedCancellationTokenSource;
public OperationContext(TimeSpan? timeout, CancellationToken cancellationToken)
- : this(Stopwatch.StartNew(), timeout, cancellationToken)
+ : this(SystemClock.Instance, timeout, cancellationToken)
{
}
- internal OperationContext(Stopwatch stopwatch, TimeSpan? timeout, CancellationToken cancellationToken)
+ internal OperationContext(IClock clock, TimeSpan? timeout, CancellationToken cancellationToken)
{
- Stopwatch = stopwatch;
+ Clock = Ensure.IsNotNull(clock, nameof(clock));
+ Stopwatch = clock.StartStopwatch();
Timeout = Ensure.IsNullOrInfiniteOrGreaterThanOrEqualToZero(timeout, nameof(timeout));
CancellationToken = cancellationToken;
RootContext = this;
@@ -85,7 +85,11 @@ public CancellationToken CombinedCancellationToken
return _combinedCancellationTokenSource.Token;
}
}
- private Stopwatch Stopwatch { get; }
+ private IStopwatch Stopwatch { get; }
+
+ private IClock Clock { get; }
+
+ public TimeSpan Elapsed => Stopwatch.Elapsed;
public TimeSpan? Timeout { get; }
@@ -182,11 +186,10 @@ public OperationContext WithTimeout(TimeSpan timeout)
timeout = remainingTimeout;
}
- return new OperationContext(timeout, CancellationToken)
+ return new OperationContext(Clock, timeout, CancellationToken)
{
RootContext = RootContext
};
}
}
}
-
diff --git a/src/MongoDB.Driver/OperationExecutor.cs b/src/MongoDB.Driver/OperationExecutor.cs
index 06c8534b70c..929c28b6063 100644
--- a/src/MongoDB.Driver/OperationExecutor.cs
+++ b/src/MongoDB.Driver/OperationExecutor.cs
@@ -14,7 +14,6 @@
*/
using System;
-using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core;
using MongoDB.Driver.Core.Bindings;
@@ -39,73 +38,65 @@ public void Dispose()
}
public TResult ExecuteReadOperation(
+ OperationContext operationContext,
IClientSessionHandle session,
IReadOperation operation,
ReadPreference readPreference,
- bool allowChannelPinning,
- TimeSpan? timeout,
- CancellationToken cancellationToken)
+ bool allowChannelPinning)
{
+ Ensure.IsNotNull(operationContext, nameof(operationContext));
Ensure.IsNotNull(session, nameof(session));
Ensure.IsNotNull(operation, nameof(operation));
Ensure.IsNotNull(readPreference, nameof(readPreference));
- Ensure.IsNullOrValidTimeout(timeout, nameof(timeout));
ThrowIfDisposed();
- using var operationContext = new OperationContext(timeout, cancellationToken);
using var binding = CreateReadBinding(session, readPreference, allowChannelPinning);
return operation.Execute(operationContext, binding);
}
public async Task ExecuteReadOperationAsync(
+ OperationContext operationContext,
IClientSessionHandle session,
IReadOperation operation,
ReadPreference readPreference,
- bool allowChannelPinning,
- TimeSpan? timeout,
- CancellationToken cancellationToken)
+ bool allowChannelPinning)
{
+ Ensure.IsNotNull(operationContext, nameof(operationContext));
Ensure.IsNotNull(session, nameof(session));
Ensure.IsNotNull(operation, nameof(operation));
Ensure.IsNotNull(readPreference, nameof(readPreference));
- Ensure.IsNullOrValidTimeout(timeout, nameof(timeout));
ThrowIfDisposed();
- using var operationContext = new OperationContext(timeout, cancellationToken);
using var binding = CreateReadBinding(session, readPreference, allowChannelPinning);
return await operation.ExecuteAsync(operationContext, binding).ConfigureAwait(false);
}
public TResult ExecuteWriteOperation(
+ OperationContext operationContext,
IClientSessionHandle session,
IWriteOperation operation,
- bool allowChannelPinning,
- TimeSpan? timeout,
- CancellationToken cancellationToken)
+ bool allowChannelPinning)
{
+ Ensure.IsNotNull(operationContext, nameof(operationContext));
Ensure.IsNotNull(session, nameof(session));
Ensure.IsNotNull(operation, nameof(operation));
- Ensure.IsNullOrValidTimeout(timeout, nameof(timeout));
ThrowIfDisposed();
- using var operationContext = new OperationContext(timeout, cancellationToken);
using var binding = CreateReadWriteBinding(session, allowChannelPinning);
return operation.Execute(operationContext, binding);
}
public async Task ExecuteWriteOperationAsync(
+ OperationContext operationContext,
IClientSessionHandle session,
IWriteOperation operation,
- bool allowChannelPinning,
- TimeSpan? timeout,
- CancellationToken cancellationToken)
+ bool allowChannelPinning)
{
+ Ensure.IsNotNull(operationContext, nameof(operationContext));
Ensure.IsNotNull(session, nameof(session));
Ensure.IsNotNull(operation, nameof(operation));
- Ensure.IsNullOrValidTimeout(timeout, nameof(timeout));
ThrowIfDisposed();
- using var operationContext = new OperationContext(timeout, cancellationToken);
using var binding = CreateReadWriteBinding(session, allowChannelPinning);
return await operation.ExecuteAsync(operationContext, binding).ConfigureAwait(false);
}
diff --git a/src/MongoDB.Driver/TransactionExecutor.cs b/src/MongoDB.Driver/TransactionExecutor.cs
index cb4e5daf9a8..32f94861a2e 100644
--- a/src/MongoDB.Driver/TransactionExecutor.cs
+++ b/src/MongoDB.Driver/TransactionExecutor.cs
@@ -1,4 +1,4 @@
-/* Copyright 2019-present MongoDB Inc.
+/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@
using System.Threading.Tasks;
using MongoDB.Driver.Core.Bindings;
using MongoDB.Driver.Core.Misc;
-using MongoDB.Driver.Support;
namespace MongoDB.Driver
{
@@ -27,7 +26,6 @@ internal static class TransactionExecutor
// constants
private const string TransientTransactionErrorLabel = "TransientTransactionError";
private const string UnknownTransactionCommitResultLabel = "UnknownTransactionCommitResult";
- private const int MaxTimeMSExpiredErrorCode = 50;
private static readonly TimeSpan __transactionTimeout = TimeSpan.FromSeconds(120);
public static TResult ExecuteWithRetries(
@@ -37,13 +35,15 @@ public static TResult ExecuteWithRetries(
IClock clock,
CancellationToken cancellationToken)
{
- var startTime = clock.UtcNow;
+ var transactionTimeout = transactionOptions?.Timeout ?? clientSession.Options.DefaultTransactionOptions?.Timeout;
+ using var operationContext = new OperationContext(clock, transactionTimeout, cancellationToken);
while (true)
{
clientSession.StartTransaction(transactionOptions);
+ clientSession.WrappedCoreSession.CurrentTransaction.OperationContext = operationContext;
- var callbackOutcome = ExecuteCallback(clientSession, callback, startTime, clock, cancellationToken);
+ var callbackOutcome = ExecuteCallback(operationContext, clientSession, callback, cancellationToken);
if (callbackOutcome.ShouldRetryTransaction)
{
continue;
@@ -53,7 +53,7 @@ public static TResult ExecuteWithRetries(
return callbackOutcome.Result; // assume callback intentionally ended the transaction
}
- var transactionHasBeenCommitted = CommitWithRetries(clientSession, startTime, clock, cancellationToken);
+ var transactionHasBeenCommitted = CommitWithRetries(operationContext, clientSession, cancellationToken);
if (transactionHasBeenCommitted)
{
return callbackOutcome.Result;
@@ -68,12 +68,15 @@ public static async Task ExecuteWithRetriesAsync(
IClock clock,
CancellationToken cancellationToken)
{
- var startTime = clock.UtcNow;
+ TimeSpan? transactionTimeout = transactionOptions?.Timeout ?? clientSession.Options.DefaultTransactionOptions?.Timeout;
+ using var operationContext = new OperationContext(clock, transactionTimeout, cancellationToken);
+
while (true)
{
clientSession.StartTransaction(transactionOptions);
+ clientSession.WrappedCoreSession.CurrentTransaction.OperationContext = operationContext;
- var callbackOutcome = await ExecuteCallbackAsync(clientSession, callbackAsync, startTime, clock, cancellationToken).ConfigureAwait(false);
+ var callbackOutcome = await ExecuteCallbackAsync(operationContext, clientSession, callbackAsync, cancellationToken).ConfigureAwait(false);
if (callbackOutcome.ShouldRetryTransaction)
{
continue;
@@ -83,7 +86,7 @@ public static async Task ExecuteWithRetriesAsync(
return callbackOutcome.Result; // assume callback intentionally ended the transaction
}
- var transactionHasBeenCommitted = await CommitWithRetriesAsync(clientSession, startTime, clock, cancellationToken).ConfigureAwait(false);
+ var transactionHasBeenCommitted = await CommitWithRetriesAsync(operationContext, clientSession, cancellationToken).ConfigureAwait(false);
if (transactionHasBeenCommitted)
{
return callbackOutcome.Result;
@@ -91,12 +94,13 @@ public static async Task ExecuteWithRetriesAsync(
}
}
- private static bool HasTimedOut(DateTime startTime, DateTime currentTime)
+ private static bool HasTimedOut(OperationContext operationContext)
{
- return (currentTime - startTime) >= __transactionTimeout;
+ return operationContext.IsTimedOut() ||
+ (operationContext.RootContext.Timeout == null && operationContext.RootContext.Elapsed > __transactionTimeout);
}
- private static CallbackOutcome ExecuteCallback(IClientSessionHandle clientSession, Func callback, DateTime startTime, IClock clock, CancellationToken cancellationToken)
+ private static CallbackOutcome ExecuteCallback(OperationContext operationContext, IClientSessionHandle clientSession, Func callback, CancellationToken cancellationToken)
{
try
{
@@ -107,10 +111,16 @@ private static CallbackOutcome ExecuteCallback(IClientSessionH
{
if (IsTransactionInStartingOrInProgressState(clientSession))
{
- clientSession.AbortTransaction(cancellationToken);
+ AbortTransactionOptions abortOptions = null;
+ if (operationContext.IsRootContextTimeoutConfigured())
+ {
+ abortOptions = new AbortTransactionOptions(operationContext.RootContext.Timeout);
+ }
+
+ clientSession.AbortTransaction(abortOptions, cancellationToken);
}
- if (HasErrorLabel(ex, TransientTransactionErrorLabel) && !HasTimedOut(startTime, clock.UtcNow))
+ if (HasErrorLabel(ex, TransientTransactionErrorLabel) && !HasTimedOut(operationContext))
{
return new CallbackOutcome.WithShouldRetryTransaction();
}
@@ -119,7 +129,7 @@ private static CallbackOutcome ExecuteCallback(IClientSessionH
}
}
- private static async Task> ExecuteCallbackAsync(IClientSessionHandle clientSession, Func> callbackAsync, DateTime startTime, IClock clock, CancellationToken cancellationToken)
+ private static async Task> ExecuteCallbackAsync(OperationContext operationContext, IClientSessionHandle clientSession, Func> callbackAsync, CancellationToken cancellationToken)
{
try
{
@@ -130,10 +140,16 @@ private static async Task> ExecuteCallbackAsync.WithShouldRetryTransaction();
}
@@ -142,24 +158,29 @@ private static async Task> ExecuteCallbackAsync CommitWithRetriesAsync(IClientSessionHandle clientSession, DateTime startTime, IClock clock, CancellationToken cancellationToken)
+ private static async Task CommitWithRetriesAsync(OperationContext operationContext, IClientSessionHandle clientSession, CancellationToken cancellationToken)
{
while (true)
{
try
{
- await clientSession.CommitTransactionAsync(cancellationToken).ConfigureAwait(false);
+ CommitTransactionOptions commitOptions = null;
+ if (operationContext.IsRootContextTimeoutConfigured())
+ {
+ commitOptions = new CommitTransactionOptions(operationContext.RemainingTimeout);
+ }
+
+ await clientSession.CommitTransactionAsync(commitOptions, cancellationToken).ConfigureAwait(false);
return true;
}
catch (Exception ex)
{
- var now = clock.UtcNow; // call UtcNow once since we need to facilitate predictable mocking
- if (ShouldRetryCommit(ex, startTime, now))
+ if (ShouldRetryCommit(operationContext, ex))
{
continue;
}
- if (HasErrorLabel(ex, TransientTransactionErrorLabel) && !HasTimedOut(startTime, now))
+ if (HasErrorLabel(ex, TransientTransactionErrorLabel) && !HasTimedOut(operationContext))
{
return false; // the transaction will be retried
}
@@ -211,7 +237,7 @@ private static bool HasErrorLabel(Exception ex, string errorLabel)
private static bool IsMaxTimeMSExpiredException(Exception ex)
{
if (ex is MongoExecutionTimeoutException timeoutException &&
- timeoutException.Code == MaxTimeMSExpiredErrorCode)
+ timeoutException.Code == (int)ServerErrorCode.MaxTimeMSExpired)
{
return true;
}
@@ -222,7 +248,7 @@ private static bool IsMaxTimeMSExpiredException(Exception ex)
if (writeConcernError != null)
{
var code = writeConcernError.GetValue("code", -1).ToInt32();
- if (code == MaxTimeMSExpiredErrorCode)
+ if (code == (int)ServerErrorCode.MaxTimeMSExpired)
{
return true;
}
@@ -246,11 +272,11 @@ private static bool IsTransactionInStartingOrInProgressState(IClientSessionHandl
}
}
- private static bool ShouldRetryCommit(Exception ex, DateTime startTime, DateTime now)
+ private static bool ShouldRetryCommit(OperationContext operationContext, Exception ex)
{
return
HasErrorLabel(ex, UnknownTransactionCommitResultLabel) &&
- !HasTimedOut(startTime, now) &&
+ !HasTimedOut(operationContext) &&
!IsMaxTimeMSExpiredException(ex);
}
diff --git a/tests/MongoDB.Driver.Tests/ClientSessionHandleTests.cs b/tests/MongoDB.Driver.Tests/ClientSessionHandleTests.cs
index d05c34abb28..36dab703623 100644
--- a/tests/MongoDB.Driver.Tests/ClientSessionHandleTests.cs
+++ b/tests/MongoDB.Driver.Tests/ClientSessionHandleTests.cs
@@ -1,4 +1,4 @@
-/* Copyright 2017-present MongoDB Inc.
+/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,7 +25,6 @@
using MongoDB.Driver.Core.Clusters;
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.TestHelpers;
-using MongoDB.Driver.Support;
using Moq;
using Xunit;
@@ -318,21 +317,7 @@ public void WithTransaction_callback_should_be_processed_with_expected_result(
bool async)
{
var mockClock = CreateClockMock(DateTime.UtcNow, isRetryAttemptsWithTimeout, true);
-
var mockCoreSession = CreateCoreSessionMock();
- mockCoreSession.Setup(c => c.StartTransaction(It.IsAny()));
-
- // CommitTransaction
- if (async)
- {
- mockCoreSession
- .Setup(c => c.CommitTransactionAsync(It.IsAny()))
- .Returns(Task.FromResult(0));
- }
- else
- {
- mockCoreSession.Setup(c => c.CommitTransaction(It.IsAny()));
- }
// Initialize callbacks
var mockCallbackProcessing = new Mock();
@@ -438,9 +423,6 @@ public void WithTransaction_callback_should_propagate_result(object value)
public void WithTransaction_callback_with_a_custom_error_should_not_be_retried()
{
var mockCoreSession = CreateCoreSessionMock();
- mockCoreSession.Setup(c => c.StartTransaction(It.IsAny()));
- mockCoreSession.Setup(c => c.AbortTransaction(It.IsAny())); // abort ignores exceptions
- mockCoreSession.Setup(c => c.CommitTransaction(It.IsAny()));
var subject = CreateSubject(coreSession: mockCoreSession.Object);
@@ -453,13 +435,7 @@ public void WithTransaction_callback_with_a_custom_error_should_not_be_retried()
[Fact]
public void WithTransaction_callback_with_a_TransientTransactionError_and_exceeded_retry_timeout_should_not_be_retried()
{
- var now = DateTime.UtcNow;
- var mockClock = new Mock();
- mockClock
- .SetupSequence(c => c.UtcNow)
- .Returns(now)
- .Returns(now.AddSeconds(CalculateTime(true))); // the retry timeout has been exceeded
-
+ var mockClock = CreateClockMock(DateTime.UtcNow, TimeSpan.FromSeconds(CalculateTime(true)));
var subject = CreateSubject(clock: mockClock.Object);
var exResult = Assert.Throws(() => subject.WithTransaction((handle, cancellationToken) =>
@@ -473,13 +449,7 @@ public void WithTransaction_callback_with_a_TransientTransactionError_and_exceed
[ParameterAttributeData]
public void WithTransaction_callback_with_a_UnknownTransactionCommitResult_should_not_be_retried([Values(true, false)] bool hasTimedOut)
{
- var now = DateTime.UtcNow;
- var mockClock = new Mock();
- mockClock
- .SetupSequence(c => c.UtcNow)
- .Returns(now)
- .Returns(now.AddSeconds(CalculateTime(hasTimedOut)));
-
+ var mockClock = CreateClockMock(DateTime.UtcNow, TimeSpan.FromSeconds(CalculateTime(hasTimedOut)));
var subject = CreateSubject(clock: mockClock.Object);
var exResult = Assert.Throws(() => subject.WithTransaction((handle, cancellationToken) =>
@@ -491,43 +461,41 @@ public void WithTransaction_callback_with_a_UnknownTransactionCommitResult_shoul
[Theory]
// sync
- [InlineData(null, new[] { WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 1, true, false)]
- [InlineData(null, new[] { WithTransactionErrorState.TransientTransactionError }, false /*Should exception be thrown*/, 1, false, false)]
+ [InlineData(null, new[] { WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 1, false)]
+ [InlineData(null, new[] { WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 2, false)]
- [InlineData(null, new[] { WithTransactionErrorState.ErrorWithoutLabel }, true /*Should exception be thrown*/, 1, false, false)]
+ [InlineData(null, new[] { WithTransactionErrorState.ErrorWithoutLabel }, true /*Should exception be thrown*/, 1, false)]
- [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.TransientTransactionError }, false /*Should exception be thrown*/, 1, false, false)]
- [InlineData(new[] { true, true }, new[] { WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.TransientTransactionError }, true /*Should exception be thrown*/, 1, null, false)]
+ [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 3, false)]
+ [InlineData(new[] { true }, new[] { WithTransactionErrorState.TransientTransactionError }, true /*Should exception be thrown*/, 1, false)]
- [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 3, true, false)]
- [InlineData(new[] { false, true }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, true /*Should exception be thrown*/, 2, null, false)]
+ [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 1, false)]
+ [InlineData(new[] { false, true }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult }, true /*Should exception be thrown*/, 1, false)]
- [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 3, true, false)]
+ [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 1, false)]
// async
- [InlineData(null, new[] { WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 1, true, true)]
- [InlineData(null, new[] { WithTransactionErrorState.TransientTransactionError }, false /*Should exception be thrown*/, 1, false, true)]
+ [InlineData(null, new[] { WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 1, true)]
+ [InlineData(null, new[] { WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 2, true)]
- [InlineData(null, new[] { WithTransactionErrorState.ErrorWithoutLabel }, true /*Should exception be thrown*/, 1, false, true)]
+ [InlineData(null, new[] { WithTransactionErrorState.ErrorWithoutLabel }, true /*Should exception be thrown*/, 1, true)]
- [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.TransientTransactionError }, false /*Should exception be thrown*/, 1, false, true)]
- [InlineData(new[] { true, true }, new[] { WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.TransientTransactionError }, true /*Should exception be thrown*/, 1, null, true)]
+ [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.TransientTransactionError, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 3, true)]
+ [InlineData(new[] { true }, new[] { WithTransactionErrorState.TransientTransactionError }, true /*Should exception be thrown*/, 1, true)]
- [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 3, true, true)]
- [InlineData(new[] { false, true }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, true /*Should exception be thrown*/, 2, null, true)]
+ [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 1, true)]
+ [InlineData(new[] { false, true }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult }, true /*Should exception be thrown*/, 1, true)]
- [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 3, true, true)]
+ [InlineData(new[] { false, false }, new[] { WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.UnknownTransactionCommitResult, WithTransactionErrorState.NoError }, false /*Should exception be thrown*/, 1, true)]
public void WithTransaction_commit_after_callback_processing_should_be_processed_with_expected_result(
bool[] isRetryAttemptsWithTimeout, // the array length should be the same with a number of failed attempts from `commitTransactionErrorStates`
WithTransactionErrorState[] commitTransactionErrorStates,
bool shouldExceptionBeThrown,
- int expectedCommitTransactionAttempts,
- bool? expectedFullTransactionBeRetriedState,
+ int transactionCallbackAttempts,
bool async)
{
var now = DateTime.UtcNow;
var mockClock = CreateClockMock(now, isRetryAttemptsWithTimeout, false);
-
var mockCoreSession = CreateCoreSessionMock();
// Initialize commit result
@@ -566,34 +534,39 @@ public void WithTransaction_commit_after_callback_processing_should_be_processed
var subject = CreateSubject(coreSession: mockCoreSession.Object, clock: mockClock.Object);
- // Commit processing
if (async)
{
+ var callbackMock = new Mock>>();
+ var exception = Record.ExceptionAsync(() => subject.WithTransactionAsync(callbackMock.Object)).GetAwaiter().GetResult();
+
if (shouldExceptionBeThrown)
{
- Assert.ThrowsAnyAsync(() => TransactionExecutorReflector.CommitWithRetriesAsync(subject, now, mockClock.Object, CancellationToken.None)).GetAwaiter().GetResult();
+ exception.Should().BeOfType();
}
else
{
- var result = TransactionExecutorReflector.CommitWithRetriesAsync(subject, now, mockClock.Object, CancellationToken.None).Result;
- expectedFullTransactionBeRetriedState.Should().Be(result);
+ exception.Should().BeNull();
}
- mockCoreSession.Verify(handle => handle.CommitTransactionAsync(It.IsAny()), Times.Exactly(expectedCommitTransactionAttempts));
+ callbackMock.Verify(c => c(It.IsAny(), It.IsAny()), Times.Exactly(transactionCallbackAttempts));
+ mockCoreSession.Verify(handle => handle.CommitTransactionAsync(It.IsAny()), Times.Exactly(commitTransactionErrorStates.Length));
}
else
{
+ var callbackMock = new Mock>();
+ var exception = Record.Exception(() => subject.WithTransaction(callbackMock.Object));
+
if (shouldExceptionBeThrown)
{
- Assert.ThrowsAny(() => TransactionExecutorReflector.CommitWithRetries(subject, now, mockClock.Object, CancellationToken.None));
+ exception.Should().BeOfType();
}
else
{
- var result = TransactionExecutorReflector.CommitWithRetries(subject, now, mockClock.Object, CancellationToken.None);
- expectedFullTransactionBeRetriedState.Should().Be(result);
+ exception.Should().BeNull();
}
- mockCoreSession.Verify(handle => handle.CommitTransaction(It.IsAny()), Times.Exactly(expectedCommitTransactionAttempts));
+ callbackMock.Verify(c => c(It.IsAny(), It.IsAny()), Times.Exactly(transactionCallbackAttempts));
+ mockCoreSession.Verify(handle => handle.CommitTransaction(It.IsAny()), Times.Exactly(commitTransactionErrorStates.Length));
}
}
@@ -601,8 +574,6 @@ public void WithTransaction_commit_after_callback_processing_should_be_processed
public void WithTransaction_should_set_valid_session_to_callback()
{
var mockCoreSession = CreateCoreSessionMock();
- mockCoreSession.Setup(c => c.StartTransaction(It.IsAny()));
- mockCoreSession.Setup(c => c.CommitTransaction(It.IsAny()));
var subject = CreateSubject(coreSession: mockCoreSession.Object);
var result = subject.WithTransaction