Skip to content

Commit cf3d3e3

Browse files
committed
CSHARP-2561: StartTransaction should throw an exception when transactions are not supported.
1 parent 1411fc0 commit cf3d3e3

File tree

2 files changed

+260
-10
lines changed

2 files changed

+260
-10
lines changed

src/MongoDB.Driver.Core/Core/Bindings/CoreSession.cs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using System.Linq;
1718
using System.Threading;
1819
using System.Threading.Tasks;
1920
using MongoDB.Bson;
@@ -425,17 +426,41 @@ private void EnsureStartTransactionCanBeCalled()
425426
{
426427
if (_currentTransaction == null)
427428
{
428-
return;
429+
EnsureTransactionsAreSupported();
429430
}
431+
else
432+
{
433+
switch (_currentTransaction.State)
434+
{
435+
case CoreTransactionState.Aborted:
436+
case CoreTransactionState.Committed:
437+
break;
430438

431-
switch (_currentTransaction.State)
439+
default:
440+
throw new InvalidOperationException("Transaction already in progress.");
441+
}
442+
}
443+
}
444+
445+
private void EnsureTransactionsAreSupported()
446+
{
447+
var connectedDataBearingServers = _cluster.Description.Servers.Where(s => s.State == ServerState.Connected && s.IsDataBearing).ToList();
448+
449+
if (connectedDataBearingServers.Count == 0)
432450
{
433-
case CoreTransactionState.Aborted:
434-
case CoreTransactionState.Committed:
435-
return;
451+
throw new NotSupportedException("StartTransaction cannot determine if transactions are supported because there are no connected servers.");
452+
}
436453

437-
default:
438-
throw new InvalidOperationException("Transaction already in progress.");
454+
foreach (var connectedDataBearingServer in connectedDataBearingServers)
455+
{
456+
if (connectedDataBearingServer.Type == ServerType.ShardRouter)
457+
{
458+
Feature.ShardedTransactions.ThrowIfNotSupported(connectedDataBearingServer.Version);
459+
}
460+
else
461+
{
462+
Feature.Transactions.ThrowIfNotSupported(connectedDataBearingServer.Version);
463+
}
439464
}
440465
}
441466

tests/MongoDB.Driver.Core.Tests/Core/Bindings/CoreSessionTests.cs

Lines changed: 228 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414
*/
1515

1616
using System;
17-
using System.Reflection;
17+
using System.Collections;
18+
using System.Collections.Generic;
19+
using System.Linq;
20+
using System.Net;
1821
using FluentAssertions;
1922
using MongoDB.Bson;
2023
using MongoDB.Bson.TestHelpers;
2124
using MongoDB.Bson.TestHelpers.XunitExtensions;
2225
using MongoDB.Driver.Core.Clusters;
26+
using MongoDB.Driver.Core.Misc;
27+
using MongoDB.Driver.Core.Servers;
28+
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
2329
using Moq;
2430
using Xunit;
2531

@@ -218,9 +224,10 @@ public void Dispose_should_have_expected_result(
218224
Mock.Get(subject.ServerSession).Verify(m => m.Dispose(), Times.Once);
219225
}
220226

221-
[Fact]
227+
[SkippableFact]
222228
public void StartTransaction_should_throw_when_write_concern_is_unacknowledged()
223229
{
230+
RequireServer.Check().ClusterType(ClusterType.ReplicaSet).Supports(Feature.Transactions);
224231
var cluster = CoreTestConfiguration.Cluster;
225232
var session = cluster.StartSession();
226233
var transactionOptions = new TransactionOptions(writeConcern: WriteConcern.Unacknowledged);
@@ -240,18 +247,234 @@ public void WasUsed_should_call_serverSession()
240247

241248
Mock.Get(subject.ServerSession).Verify(m => m.WasUsed(), Times.Once);
242249
}
250+
251+
[Theory]
252+
[ParameterAttributeData]
253+
public void EnsureTransactionsAreSupported_should_throw_when_there_are_no_connected_servers(
254+
[Values(0, 1, 2, 3)] int numberOfDisconnectedServers)
255+
{
256+
var clusterDescription = CreateClusterDescriptionWithDisconnectedServers(numberOfDisconnectedServers);
257+
var subject = CreateSubject(clusterDescription);
258+
259+
var exception = Record.Exception(() => subject.EnsureTransactionsAreSupported());
260+
261+
var e = exception.Should().BeOfType<NotSupportedException>().Subject;
262+
e.Message.Should().Be("StartTransaction cannot determine if transactions are supported because there are no connected servers.");
263+
}
264+
265+
// EnsureTransactionsAreSupported scenario codes
266+
// C = Connected, D = Disconnected
267+
// P = Primary, S = Secondary, A = Arbiter, R = ShardRouter, U = Unknown
268+
// T = transactions are supported, N = transactions are not supported
269+
270+
[Theory]
271+
[InlineData("DU,CP")]
272+
[InlineData("CP,DU")]
273+
[InlineData("DU,CR")]
274+
[InlineData("CR,DU")]
275+
public void EnsureTransactionsAreSupported_should_ignore_disconnected_servers(string scenarios)
276+
{
277+
var clusterId = new ClusterId(1);
278+
var servers =
279+
SplitScenarios(scenarios)
280+
.Select((scenario, i) =>
281+
{
282+
var endPoint = new DnsEndPoint("localhost", 27017 + i);
283+
var serverId = new ServerId(clusterId, endPoint);
284+
var state = MapServerStateCode(scenario[0]);
285+
var type = MapServerTypeCode(scenario[1]);
286+
var version = type == ServerType.ShardRouter ? Feature.ShardedTransactions.FirstSupportedVersion : Feature.Transactions.FirstSupportedVersion;
287+
return CreateServerDescription(serverId, endPoint, state, type, version);
288+
})
289+
.ToList();
290+
var cluster = CreateClusterDescription(clusterId, servers: servers);
291+
var subject = CreateSubject(cluster);
292+
293+
subject.EnsureTransactionsAreSupported();
294+
}
295+
296+
[Theory]
297+
[InlineData("")]
298+
[InlineData("DU")]
299+
[InlineData("CA")]
300+
[InlineData("DU,DU")]
301+
[InlineData("DU,CA")]
302+
[InlineData("CA,DU")]
303+
[InlineData("CA,CA")]
304+
public void EnsureTransactionsAreSupported_should_throw_when_there_are_no_connected_data_bearing_servers(string scenarios)
305+
{
306+
var clusterId = new ClusterId(1);
307+
var servers =
308+
SplitScenarios(scenarios)
309+
.Select((scenario, i) =>
310+
{
311+
var endPoint = new DnsEndPoint("localhost", 27017 + i);
312+
var serverId = new ServerId(clusterId, endPoint);
313+
var state = MapServerStateCode(scenario[0]);
314+
var type = MapServerTypeCode(scenario[1]);
315+
return CreateServerDescription(serverId, endPoint, state, type);
316+
})
317+
.ToList();
318+
var cluster = CreateClusterDescription(clusterId, servers: servers);
319+
var subject = CreateSubject(cluster);
320+
321+
var exception = Record.Exception(() => subject.EnsureTransactionsAreSupported());
322+
323+
var e = exception.Should().BeOfType<NotSupportedException>().Subject;
324+
e.Message.Should().Be("StartTransaction cannot determine if transactions are supported because there are no connected servers.");
325+
}
326+
327+
[Theory]
328+
[InlineData("PN")]
329+
[InlineData("PN,ST")]
330+
[InlineData("PT,SN")]
331+
[InlineData("RN")]
332+
[InlineData("RN,RT")]
333+
[InlineData("RT,RN")]
334+
public void EnsureTransactionsAreSupported_should_throw_when_any_connected_data_bearing_server_does_not_support_transactions(string scenarios)
335+
{
336+
var clusterId = new ClusterId(1);
337+
string unsupportedFeatureName = null;
338+
var servers =
339+
SplitScenarios(scenarios)
340+
.Select((scenario, i) =>
341+
{
342+
var endPoint = new DnsEndPoint("localhost", 27017 + i);
343+
var serverId = new ServerId(clusterId, endPoint);
344+
var type = MapServerTypeCode(scenario[0]);
345+
var supportsTransactions = MapSupportsTransactionsCode(scenario[1]);
346+
var feature = type == ServerType.ShardRouter ? Feature.ShardedTransactions : Feature.Transactions;
347+
if (!supportsTransactions)
348+
{
349+
unsupportedFeatureName = feature.Name;
350+
}
351+
var version = supportsTransactions ? feature.FirstSupportedVersion : feature.LastNotSupportedVersion;
352+
return CreateServerDescription(serverId, endPoint, ServerState.Connected, type, version);
353+
})
354+
.ToList();
355+
var cluster = CreateClusterDescription(clusterId, servers: servers);
356+
var subject = CreateSubject(cluster);
357+
358+
var exception = Record.Exception(() => subject.EnsureTransactionsAreSupported());
359+
360+
var e = exception.Should().BeOfType<NotSupportedException>().Subject;
361+
e.Message.Should().Contain($"does not support the {unsupportedFeatureName} feature.");
362+
}
243363

244364
// private methods
365+
private ClusterDescription CreateClusterDescription(
366+
ClusterId clusterId = null,
367+
ClusterConnectionMode connectionMode = ClusterConnectionMode.Automatic,
368+
ClusterType type = ClusterType.Unknown,
369+
IEnumerable<ServerDescription> servers = null)
370+
{
371+
clusterId = clusterId ?? new ClusterId(1);
372+
servers = servers ?? new ServerDescription[0];
373+
return new ClusterDescription(clusterId, connectionMode, type, servers);
374+
}
375+
376+
private ClusterDescription CreateClusterDescriptionWithDisconnectedServers(int numberOfDisconnectedServers)
377+
{
378+
var clusterId = new ClusterId(1);
379+
var servers = Enumerable.Range(27017, numberOfDisconnectedServers).Select(port => CreateDisconnectedServerDescription(clusterId, port)).ToList();
380+
return CreateClusterDescription(servers: servers);
381+
}
382+
383+
private ServerDescription CreateDisconnectedServerDescription(ClusterId clusterId, int port)
384+
{
385+
var endPoint = new DnsEndPoint("localhost", port);
386+
var serverId = new ServerId(clusterId, endPoint);
387+
return new ServerDescription(serverId, endPoint, state: ServerState.Disconnected, type: ServerType.Unknown);
388+
}
389+
390+
private ICluster CreateMockReplicaSetCluster()
391+
{
392+
var clusterId = new ClusterId(1);
393+
var endPoint = new DnsEndPoint("localhost", 27017);
394+
var serverId = new ServerId(clusterId, endPoint);
395+
var version = Feature.Transactions.FirstSupportedVersion;
396+
var servers = new[] { new ServerDescription(serverId, endPoint, state: ServerState.Connected, type: ServerType.ReplicaSetPrimary, version: version) };
397+
var clusterDescription = new ClusterDescription(clusterId, ClusterConnectionMode.Automatic, ClusterType.ReplicaSet, servers);
398+
var mockCluster = new Mock<ICluster>();
399+
mockCluster.SetupGet(m => m.Description).Returns(clusterDescription);
400+
return mockCluster.Object;
401+
}
402+
403+
private ServerDescription CreateServerDescription(
404+
ServerId serverId = null,
405+
EndPoint endPoint = null,
406+
ServerState state = ServerState.Disconnected,
407+
ServerType type = ServerType.Unknown,
408+
SemanticVersion version = null)
409+
{
410+
endPoint = endPoint ?? new DnsEndPoint("localhost", 27017);
411+
serverId = serverId ?? new ServerId(new ClusterId(1), endPoint);
412+
version = version ?? SemanticVersion.Parse("4.0.0");
413+
return new ServerDescription(serverId, endPoint, state: state, type: type, version: version);
414+
}
415+
245416
private CoreSession CreateSubject(
246417
ICluster cluster = null,
247418
ICoreServerSession serverSession = null,
248419
CoreSessionOptions options = null)
249420
{
250-
cluster = cluster ?? Mock.Of<ICluster>();
421+
cluster = cluster ?? CreateMockReplicaSetCluster();
251422
serverSession = serverSession ?? Mock.Of<ICoreServerSession>();
252423
options = options ?? new CoreSessionOptions();
253424
return new CoreSession(cluster, serverSession, options);
254425
}
426+
427+
private CoreSession CreateSubject(ClusterDescription clusterDescription)
428+
{
429+
var mockCluster = new Mock<ICluster>();
430+
mockCluster.SetupGet(m => m.Description).Returns(clusterDescription);
431+
return CreateSubject(cluster: mockCluster.Object);
432+
}
433+
434+
private ServerState MapServerStateCode(char code)
435+
{
436+
switch (code)
437+
{
438+
case 'C': return ServerState.Connected;
439+
case 'D': return ServerState.Disconnected;
440+
default: throw new ArgumentException($"Invalid ServerState code: \"{code}\".", nameof(code));
441+
}
442+
}
443+
444+
private ServerType MapServerTypeCode(char code)
445+
{
446+
switch (code)
447+
{
448+
case 'A': return ServerType.ReplicaSetArbiter;
449+
case 'P': return ServerType.ReplicaSetPrimary;
450+
case 'R': return ServerType.ShardRouter;
451+
case 'S': return ServerType.ReplicaSetSecondary;
452+
case 'U': return ServerType.Unknown;
453+
default: throw new ArgumentException($"Invalid ServerType code: \"{code}\".", nameof(code));
454+
}
455+
}
456+
457+
private bool MapSupportsTransactionsCode(char code)
458+
{
459+
switch (code)
460+
{
461+
case 'N': return false;
462+
case 'T': return true;
463+
default: throw new ArgumentException($"Invalid SupportsTransactions code: \"{code}\".", nameof(code));
464+
}
465+
}
466+
467+
private IEnumerable<string> SplitScenarios(string scenarios)
468+
{
469+
if (scenarios == "")
470+
{
471+
return Enumerable.Empty<string>();
472+
}
473+
else
474+
{
475+
return scenarios.Split(',');
476+
}
477+
}
255478
}
256479

257480
public static class CoreSessionReflector
@@ -262,5 +485,7 @@ public static void _isCommitTransactionInProgress(this CoreSession obj, bool val
262485
{
263486
Reflector.SetFieldValue(obj, nameof(_isCommitTransactionInProgress), value);
264487
}
488+
489+
public static void EnsureTransactionsAreSupported(this CoreSession obj) => Reflector.Invoke(obj, nameof(EnsureTransactionsAreSupported));
265490
}
266491
}

0 commit comments

Comments
 (0)