Skip to content

Commit 00de389

Browse files
CSHARP-3673: Update connection pinning logic for LB mode.
1 parent ced67a1 commit 00de389

File tree

86 files changed

+11316
-171
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+11316
-171
lines changed

evergreen/evergreen.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ functions:
314314
working_dir: mongo-csharp-driver
315315
script: |
316316
# DO NOT ECHO WITH XTRACE (which PREPARE_SHELL does)
317-
ATLAS_FREE="${ATLAS_FREE}" ATLAS_FREE_SRV="${ATLAS_FREE_SRV}" ATLAS_REPLICA="${ATLAS_REPLICA}" ATLAS_REPLICA_SRV="${ATLAS_REPLICA_SRV}" ATLAS_SHARDED="${ATLAS_SHARDED}" ATLAS_SHARDED_SRV="${ATLAS_SHARDED_SRV}" ATLAS_TLS11="${ATLAS_TLS11}" ATLAS_TLS11_SRV="${ATLAS_TLS11_SRV}" ATLAS_TLS12="${ATLAS_TLS12}" ATLAS_TLS12_SRV="${ATLAS_TLS12_SRV}" evergreen/run-atlas-connectivity-tests.sh
317+
ATLAS_FREE="${ATLAS_FREE}" ATLAS_FREE_SRV="${ATLAS_FREE_SRV}" ATLAS_REPLICA="${ATLAS_REPLICA}" ATLAS_REPLICA_SRV="${ATLAS_REPLICA_SRV}" ATLAS_SHARDED="${ATLAS_SHARDED}" ATLAS_SHARDED_SRV="${ATLAS_SHARDED_SRV}" ATLAS_TLS11="${ATLAS_TLS11}" ATLAS_TLS11_SRV="${ATLAS_TLS11_SRV}" ATLAS_TLS12="${ATLAS_TLS12}" ATLAS_TLS12_SRV="${ATLAS_TLS12_SRV}" ATLAS_SERVERLESS="${ATLAS_SERVERLESS}" ATLAS_SERVERLESS_SRV="${ATLAS_SERVERLESS_SRV}" evergreen/run-atlas-connectivity-tests.sh
318318
319319
run-gssapi-auth-tests:
320320
- command: shell.exec

src/MongoDB.Driver.Core/Core/Authentication/SaslAuthenticator.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void Authenticate(IConnection connection, ConnectionDescription descripti
103103
var protocol = CreateCommandProtocol(command);
104104
result = protocol.Execute(connection, cancellationToken);
105105
}
106-
catch (MongoCommandException ex)
106+
catch (MongoException ex)
107107
{
108108
throw CreateException(connection, ex);
109109
}
@@ -142,7 +142,7 @@ public async Task AuthenticateAsync(IConnection connection, ConnectionDescriptio
142142
var protocol = CreateCommandProtocol(command);
143143
result = await protocol.ExecuteAsync(connection, cancellationToken).ConfigureAwait(false);
144144
}
145-
catch (MongoCommandException ex)
145+
catch (MongoException ex)
146146
{
147147
throw CreateException(connection, ex);
148148
}

src/MongoDB.Driver.Core/Core/Bindings/CoreSession.cs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public bool IsInTransaction
147147
catch (Exception exception) when (ShouldRetryEndTransactionException(exception))
148148
{
149149
// unpin if retryable error
150-
_currentTransaction.PinnedServer = null;
150+
_currentTransaction.UnpinAll();
151151

152152
// ignore exception and retry
153153
}
@@ -171,7 +171,7 @@ public bool IsInTransaction
171171
_currentTransaction.SetState(CoreTransactionState.Aborted);
172172
// The transaction is aborted.The session MUST be unpinned regardless
173173
// of whether the abortTransaction command succeeds or fails
174-
_currentTransaction.PinnedServer = null;
174+
_currentTransaction.UnpinAll();
175175
}
176176
}
177177

@@ -196,7 +196,7 @@ public bool IsInTransaction
196196
catch (Exception exception) when (ShouldRetryEndTransactionException(exception))
197197
{
198198
// unpin if retryable error
199-
_currentTransaction.PinnedServer = null;
199+
_currentTransaction.UnpinAll();
200200

201201
// ignore exception and retry
202202
}
@@ -220,7 +220,7 @@ public bool IsInTransaction
220220
_currentTransaction.SetState(CoreTransactionState.Aborted);
221221
// The transaction is aborted.The session MUST be unpinned regardless
222222
// of whether the abortTransaction command succeeds or fails
223-
_currentTransaction.PinnedServer = null;
223+
_currentTransaction.UnpinAll();
224224
}
225225
}
226226

@@ -243,6 +243,8 @@ public void AboutToSendCommand()
243243
// don't set to null when retrying a commit
244244
if (!_isCommitTransactionInProgress)
245245
{
246+
// Unpin data non-transaction operation uses the commited session
247+
_currentTransaction.UnpinAll();
246248
_currentTransaction = null;
247249
}
248250
return;
@@ -364,6 +366,7 @@ public void Dispose()
364366
}
365367
}
366368

369+
_currentTransaction?.UnpinAll();
367370
_serverSession.Dispose();
368371
_disposed = true;
369372
}
@@ -387,6 +390,7 @@ public void StartTransaction(TransactionOptions transactionOptions = null)
387390
throw new InvalidOperationException("Transactions do not support unacknowledged write concerns.");
388391
}
389392

393+
_currentTransaction?.UnpinAll(); // unpin data if any when a new transaction is started
390394
_currentTransaction = new CoreTransaction(transactionNumber, effectiveTransactionOptions);
391395
}
392396

@@ -501,25 +505,27 @@ private void EnsureTransactionsAreSupported()
501505
{
502506
var serverType = connectedDataBearingServer.Type;
503507

504-
if (serverType == ServerType.Standalone)
508+
switch (serverType)
505509
{
506-
throw new NotSupportedException("Standalone servers do not support transactions.");
507-
}
508-
else if (serverType == ServerType.ShardRouter)
509-
{
510-
Feature.ShardedTransactions.ThrowIfNotSupported(connectedDataBearingServer.Version);
511-
}
512-
else
513-
{
514-
Feature.Transactions.ThrowIfNotSupported(connectedDataBearingServer.Version);
510+
case ServerType.Standalone:
511+
throw new NotSupportedException("Standalone servers do not support transactions.");
512+
case ServerType.ShardRouter:
513+
Feature.ShardedTransactions.ThrowIfNotSupported(connectedDataBearingServer.Version);
514+
break;
515+
case ServerType.LoadBalanced:
516+
// do nothing, load balancing always supports transactions
517+
break;
518+
default:
519+
Feature.Transactions.ThrowIfNotSupported(connectedDataBearingServer.Version);
520+
break;
515521
}
516522
}
517523
}
518524

519525
private TResult ExecuteEndTransactionOnPrimary<TResult>(IReadOperation<TResult> operation, CancellationToken cancellationToken)
520526
{
521527
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
522-
using (var binding = new WritableServerBinding(_cluster, sessionHandle))
528+
using (var binding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, sessionHandle))
523529
{
524530
return operation.Execute(binding, cancellationToken);
525531
}
@@ -528,7 +534,7 @@ private TResult ExecuteEndTransactionOnPrimary<TResult>(IReadOperation<TResult>
528534
private async Task<TResult> ExecuteEndTransactionOnPrimaryAsync<TResult>(IReadOperation<TResult> operation, CancellationToken cancellationToken)
529535
{
530536
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
531-
using (var binding = new WritableServerBinding(_cluster, sessionHandle))
537+
using (var binding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, sessionHandle))
532538
{
533539
return await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
534540
}

src/MongoDB.Driver.Core/Core/Bindings/CoreTransaction.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ public class CoreTransaction
2525
{
2626
// private fields
2727
private bool _isEmpty;
28+
private IChannelHandle _pinnedChannel = null;
2829
private IServer _pinnedServer;
2930
private BsonDocument _recoveryToken;
3031
private CoreTransactionState _state;
3132
private readonly long _transactionNumber;
3233
private readonly TransactionOptions _transactionOptions;
34+
private readonly object _lock = new object();
3335

3436
// public constructors
3537
/// <summary>
@@ -62,6 +64,18 @@ public CoreTransaction(long transactionNumber, TransactionOptions transactionOpt
6264
/// </value>
6365
public CoreTransactionState State => _state;
6466

67+
/// <summary>
68+
/// Gets pinned channel for the current transaction.
69+
/// Value has meaning if and only if a transaction is in progress.
70+
/// </summary>
71+
/// <value>
72+
/// The pinned channel for the current transaction.
73+
/// </value>
74+
public IChannelHandle PinnedChannel
75+
{
76+
get => _pinnedChannel;
77+
}
78+
6579
/// <summary>
6680
/// Gets or sets pinned server for the current transaction.
6781
/// Value has meaning if and only if a transaction is in progress.
@@ -104,6 +118,15 @@ public BsonDocument RecoveryToken
104118
}
105119

106120
// internal methods
121+
internal void PinChannel(IChannelHandle channel)
122+
{
123+
lock (_lock)
124+
{
125+
_pinnedChannel?.Dispose();
126+
_pinnedChannel = channel;
127+
}
128+
}
129+
107130
internal void SetState(CoreTransactionState state)
108131
{
109132
_state = state;
@@ -112,5 +135,15 @@ internal void SetState(CoreTransactionState state)
112135
_isEmpty = false;
113136
}
114137
}
138+
139+
internal void UnpinAll()
140+
{
141+
lock (_lock)
142+
{
143+
_pinnedChannel?.Dispose();
144+
_pinnedChannel = null;
145+
_pinnedServer = null;
146+
}
147+
}
115148
}
116149
}

src/MongoDB.Driver.Core/Core/Bindings/IChannel.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ namespace MongoDB.Driver.Core.Bindings
3333
/// </summary>
3434
public interface IChannel : IDisposable
3535
{
36+
/// <summary>
37+
/// Gets the connection.
38+
/// </summary>
39+
/// <value>
40+
/// The connection.
41+
/// </value>
42+
IConnectionHandle Connection { get; }
43+
3644
/// <summary>
3745
/// Gets the connection description.
3846
/// </summary>
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/* Copyright 2021-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System.Threading;
17+
using MongoDB.Driver.Core.Bindings;
18+
using MongoDB.Driver.Core.Clusters;
19+
using MongoDB.Driver.Core.Connections;
20+
using MongoDB.Driver.Core.Servers;
21+
22+
namespace MongoDB.Driver.Core
23+
{
24+
/// <summary>
25+
/// Connection pinning helper.
26+
/// </summary>
27+
public static class ChannelPinningHelper
28+
{
29+
/// <summary>
30+
/// Create a read binding handle.
31+
/// </summary>
32+
/// <param name="cluster">The cluster,</param>
33+
/// <param name="session">The session.</param>
34+
/// <param name="readPreference">The read preference.</param>
35+
/// <returns>An effective read binging.</returns>
36+
public static IReadBindingHandle CreateReadBinding(ICluster cluster, ICoreSessionHandle session, ReadPreference readPreference)
37+
{
38+
IReadBinding readBinding;
39+
if (session.IsInTransaction &&
40+
IsChannelPinned(session.CurrentTransaction) &&
41+
session.CurrentTransaction.State != CoreTransactionState.Starting)
42+
{
43+
readBinding = new ChannelReadWriteBinding(
44+
session.CurrentTransaction.PinnedServer,
45+
session.CurrentTransaction.PinnedChannel.Fork(),
46+
session);
47+
}
48+
else
49+
{
50+
if (IsInLoadBalancedMode(cluster.Description) && IsChannelPinned(session.CurrentTransaction))
51+
{
52+
// unpin if the next operation is not under transaction
53+
session.CurrentTransaction.UnpinAll();
54+
}
55+
readBinding = new ReadPreferenceBinding(cluster, readPreference, session);
56+
}
57+
58+
return new ReadBindingHandle(readBinding);
59+
}
60+
61+
/// <summary>
62+
/// Create a readwrite binding handle.
63+
/// </summary>
64+
/// <param name="cluster">The cluster.</param>
65+
/// <param name="session">The session.</param>
66+
/// <returns>An effective read write binging.</returns>
67+
public static IReadWriteBindingHandle CreateReadWriteBinding(ICluster cluster, ICoreSessionHandle session)
68+
{
69+
IReadWriteBinding readWriteBinding;
70+
if (session.IsInTransaction &&
71+
IsChannelPinned(session.CurrentTransaction) &&
72+
session.CurrentTransaction.State != CoreTransactionState.Starting)
73+
{
74+
readWriteBinding = new ChannelReadWriteBinding(
75+
session.CurrentTransaction.PinnedServer,
76+
session.CurrentTransaction.PinnedChannel.Fork(),
77+
session);
78+
}
79+
else
80+
{
81+
if (IsInLoadBalancedMode(cluster.Description) && IsChannelPinned(session.CurrentTransaction))
82+
{
83+
// unpin if the next operation is not under transaction
84+
session.CurrentTransaction.UnpinAll();
85+
}
86+
readWriteBinding = new WritableServerBinding(cluster, session);
87+
}
88+
89+
return new ReadWriteBindingHandle(readWriteBinding);
90+
}
91+
92+
internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHandle channelSource, long cursorId)
93+
{
94+
IChannelSource effectiveChannelSource;
95+
if (IsInLoadBalancedMode(channelSource.ServerDescription) && cursorId != 0)
96+
{
97+
var getMoreChannel = channelSource.GetChannel(CancellationToken.None); // no need for cancellation token since we already have channel in the source
98+
var getMoreSession = channelSource.Session.Fork();
99+
100+
effectiveChannelSource = new ChannelChannelSource(
101+
channelSource.Server,
102+
getMoreChannel,
103+
getMoreSession);
104+
}
105+
else
106+
{
107+
effectiveChannelSource = new ServerChannelSource(channelSource.Server, channelSource.Session.Fork());
108+
}
109+
110+
return new ChannelSourceHandle(effectiveChannelSource);
111+
}
112+
113+
internal static bool PinChannelSourceAndChannelIfRequired(
114+
IChannelSourceHandle channelSource,
115+
IChannelHandle channel,
116+
ICoreSessionHandle session,
117+
out IChannelSourceHandle pinnedChannelSource,
118+
out IChannelHandle pinnedChannel)
119+
{
120+
if (IsInLoadBalancedMode(channel.ConnectionDescription))
121+
{
122+
var server = channelSource.Server;
123+
124+
pinnedChannelSource = new ChannelSourceHandle(
125+
new ChannelChannelSource(
126+
server,
127+
channel.Fork(),
128+
session.Fork()));
129+
130+
if (session.IsInTransaction && !IsChannelPinned(session.CurrentTransaction))
131+
{
132+
session.CurrentTransaction.PinChannel(channel.Fork());
133+
session.CurrentTransaction.PinnedServer = server;
134+
}
135+
136+
pinnedChannel = channel.Fork();
137+
138+
return true;
139+
}
140+
141+
pinnedChannelSource = null;
142+
pinnedChannel = null;
143+
return false;
144+
}
145+
146+
// private methods
147+
private static bool IsInLoadBalancedMode(ConnectionDescription connectionDescription) => connectionDescription?.ServiceId.HasValue ?? false;
148+
private static bool IsInLoadBalancedMode(ServerDescription serverDescription) => serverDescription?.Type == ServerType.LoadBalanced;
149+
private static bool IsInLoadBalancedMode(ClusterDescription clusterDescription) => clusterDescription?.Type == ClusterType.LoadBalanced;
150+
private static bool IsChannelPinned(CoreTransaction coreTransaction) => coreTransaction?.PinnedChannel != null;
151+
}
152+
}

0 commit comments

Comments
 (0)