Skip to content

Commit 27a7c21

Browse files
CSHARP-4026: Preemptively cancel in progress operations when SDAM heartbeats timeout. (#765)
CSHARP-4026: Preemptively cancel in progress operations when SDAM heartbeats timeout.
1 parent d5f2d91 commit 27a7c21

24 files changed

+1269
-316
lines changed

src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs

Lines changed: 76 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -161,66 +161,6 @@ public void ThrowIfNotInitialized()
161161
public override string ToString() => State.ToString();
162162
}
163163

164-
internal sealed class MaintenanceHelper : IDisposable
165-
{
166-
private CancellationTokenSource _cancellationTokenSource = null;
167-
private readonly Action<CancellationToken> _maintenanceAction;
168-
private Thread _maintenanceThread;
169-
private readonly TimeSpan _interval;
170-
171-
public MaintenanceHelper(Action<CancellationToken> maintenanceAction, TimeSpan interval)
172-
{
173-
_interval = interval;
174-
_maintenanceAction = Ensure.IsNotNull(maintenanceAction, nameof(maintenanceAction));
175-
}
176-
177-
public bool IsRunning => _maintenanceThread != null;
178-
179-
public void Cancel()
180-
{
181-
if (_interval == Timeout.InfiniteTimeSpan)
182-
{
183-
return;
184-
}
185-
186-
CancelAndDispose();
187-
_cancellationTokenSource = null;
188-
_maintenanceThread = null;
189-
// the previous _maintenanceThread might not be stopped yet, but it will be soon
190-
}
191-
192-
public void Start()
193-
{
194-
if (_interval == Timeout.InfiniteTimeSpan)
195-
{
196-
return;
197-
}
198-
199-
CancelAndDispose();
200-
_cancellationTokenSource = new CancellationTokenSource();
201-
var cancellationToken = _cancellationTokenSource.Token;
202-
203-
_maintenanceThread = new Thread(new ParameterizedThreadStart(ThreadStart)) { IsBackground = true };
204-
_maintenanceThread.Start(cancellationToken);
205-
206-
void ThreadStart(object cancellationToken)
207-
{
208-
_maintenanceAction((CancellationToken)cancellationToken);
209-
}
210-
}
211-
212-
public void Dispose()
213-
{
214-
CancelAndDispose();
215-
}
216-
217-
private void CancelAndDispose()
218-
{
219-
_cancellationTokenSource?.Cancel();
220-
_cancellationTokenSource?.Dispose();
221-
}
222-
}
223-
224164
private sealed class AcquireConnectionHelper : IDisposable
225165
{
226166
// private fields
@@ -406,7 +346,7 @@ private void HandleException(Exception ex)
406346
}
407347
}
408348

409-
private sealed class PooledConnection : IConnection, ICheckOutReasonTracker
349+
internal sealed class PooledConnection : IConnection, ICheckOutReasonTracker
410350
{
411351
private CheckOutReason? _checkOutReason;
412352
private readonly IConnection _connection;
@@ -712,11 +652,13 @@ private void ThrowIfDisposed()
712652
}
713653
}
714654

715-
private sealed class ListConnectionHolder
655+
internal sealed class ListConnectionHolder
716656
{
717657
private readonly SemaphoreSlimSignalable _semaphoreSlimSignalable;
718658
private readonly object _lock = new object();
659+
private readonly object _lockInUse = new object();
719660
private readonly List<PooledConnection> _connections;
661+
private readonly List<PooledConnection> _connectionsInUse;
720662

721663
private readonly Action<ConnectionPoolRemovingConnectionEvent> _removingConnectionEventHandler;
722664
private readonly Action<ConnectionPoolRemovedConnectionEvent> _removedConnectionEventHandler;
@@ -725,6 +667,7 @@ public ListConnectionHolder(IEventSubscriber eventSubscriber, SemaphoreSlimSigna
725667
{
726668
_semaphoreSlimSignalable = semaphoreSlimSignalable;
727669
_connections = new List<PooledConnection>();
670+
_connectionsInUse = new List<PooledConnection>();
728671

729672
eventSubscriber.TryGetEventHandler(out _removingConnectionEventHandler);
730673
eventSubscriber.TryGetEventHandler(out _removedConnectionEventHandler);
@@ -745,6 +688,7 @@ public void Clear()
745688
{
746689
lock (_lock)
747690
{
691+
// In use Connections MUST be closed when they are checked in to the closed pool.
748692
foreach (var connection in _connections)
749693
{
750694
RemoveConnection(connection);
@@ -755,30 +699,44 @@ public void Clear()
755699
}
756700
}
757701

758-
public void Prune(CancellationToken cancellationToken)
702+
public void Prune(int? maxExpiredGenerationInUse, CancellationToken cancellationToken)
759703
{
760-
PooledConnection[] expiredConnections;
761-
lock (_lock)
704+
RemoveExpiredConnections(_connections, generation: null, _lock, signal: true);
705+
706+
if (maxExpiredGenerationInUse.HasValue)
762707
{
763-
expiredConnections = _connections.Where(c => c.IsExpired).ToArray();
708+
RemoveExpiredConnections(_connectionsInUse, generation: maxExpiredGenerationInUse.Value, _lockInUse, signal: false);
764709
}
765710

766-
foreach (var connection in expiredConnections)
711+
void RemoveExpiredConnections(List<PooledConnection> connections, int? generation, object @lock, bool signal)
767712
{
768-
cancellationToken.ThrowIfCancellationRequested();
713+
PooledConnection[] expiredConnections;
714+
lock (@lock)
715+
{
716+
expiredConnections = connections.Where(c => c.IsExpired && (generation == null || c.Generation <= generation)).ToArray();
717+
}
769718

770-
lock (_lock)
719+
foreach (var connection in expiredConnections)
771720
{
772-
// At this point connection is always expired and might be disposed
773-
// If connection is already disposed the removal logic was already executed
774-
if (connection.IsDisposed)
721+
cancellationToken.ThrowIfCancellationRequested();
722+
723+
lock (@lock)
775724
{
776-
continue;
777-
}
725+
// At this point connection is always expired and might be disposed
726+
// If connection is already disposed the removal logic was already executed
727+
if (connection.IsDisposed)
728+
{
729+
continue;
730+
}
778731

779-
RemoveConnection(connection);
780-
_connections.Remove(connection);
781-
SignalOrReset();
732+
RemoveConnection(connection);
733+
connections.Remove(connection);
734+
735+
if (signal)
736+
{
737+
SignalOrReset();
738+
}
739+
}
782740
}
783741
}
784742
}
@@ -806,11 +764,26 @@ public PooledConnection Acquire()
806764

807765
SignalOrReset();
808766
}
767+
768+
if (result != null)
769+
{
770+
TrackInUseConnection(result);
771+
772+
// This connection can be expired and not disposed by Prune. Dispose if needed
773+
if (result.IsExpired)
774+
{
775+
RemoveConnection(result);
776+
result = null;
777+
}
778+
}
779+
809780
return result;
810781
}
811782

812783
public void Return(PooledConnection connection)
813784
{
785+
UntrackInUseConnection(connection);
786+
814787
lock (_lock)
815788
{
816789
_connections.Add(connection);
@@ -826,6 +799,7 @@ public void RemoveConnection(PooledConnection connection)
826799
}
827800

828801
var stopwatch = Stopwatch.StartNew();
802+
UntrackInUseConnection(connection); // no op if connection is not in use
829803
connection.Dispose();
830804
stopwatch.Stop();
831805

@@ -849,9 +823,25 @@ private void SignalOrReset()
849823
_semaphoreSlimSignalable.Signal();
850824
}
851825
}
826+
827+
public void TrackInUseConnection(PooledConnection connection)
828+
{
829+
lock (_lockInUse)
830+
{
831+
_connectionsInUse.Add(connection);
832+
}
833+
}
834+
835+
public void UntrackInUseConnection(PooledConnection connection)
836+
{
837+
lock (_lockInUse)
838+
{
839+
_connectionsInUse.Remove(connection);
840+
}
841+
}
852842
}
853843

854-
private sealed class ConnectionCreator : IDisposable
844+
internal sealed class ConnectionCreator : IDisposable
855845
{
856846
private readonly ExclusiveConnectionPool _pool;
857847
private readonly TimeSpan _connectingTimeout;
@@ -888,8 +878,7 @@ public PooledConnection CreateOpened(CancellationToken cancellationToken)
888878
_pool.CreateTimeoutException(stopwatch, $"Timed out waiting for in connecting queue after {stopwatch.ElapsedMilliseconds}ms.");
889879
}
890880

891-
var connection = CreateOpenedInternal(cancellationToken);
892-
return connection;
881+
return CreateOpenedInternal(cancellationToken);
893882
}
894883
catch (Exception ex)
895884
{
@@ -992,9 +981,10 @@ public void Dispose()
992981
_pool._maxConnectingQueue.Release();
993982
}
994983

995-
if (_disposeConnection)
984+
if (_disposeConnection && _connection != null)
996985
{
997-
_connection?.Dispose();
986+
_pool.ConnectionHolder.UntrackInUseConnection(_connection);
987+
_connection.Dispose();
998988
}
999989
}
1000990

@@ -1029,7 +1019,10 @@ private void StartCreating(CancellationToken cancellationToken)
10291019
cancellationToken.ThrowIfCancellationRequested();
10301020

10311021
_stopwatch = Stopwatch.StartNew();
1032-
_connection = _pool.CreateNewConnection();
1022+
1023+
var connection = _pool.CreateNewConnection();
1024+
_pool.ConnectionHolder.TrackInUseConnection(connection);
1025+
_connection = connection;
10331026
}
10341027

10351028
private void FinishCreating(ConnectionDescription description)

src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.cs

Lines changed: 14 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public ExclusiveConnectionPool(
7676
_connectionExceptionHandler = Ensure.IsNotNull(connectionExceptionHandler, nameof(connectionExceptionHandler));
7777
Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));
7878

79-
_maintenanceHelper = new MaintenanceHelper(MaintainSize, _settings.MaintenanceInterval);
79+
_maintenanceHelper = new MaintenanceHelper(this, _settings.MaintenanceInterval);
8080
_poolState = new PoolState(EndPointHelper.ToString(_endPoint));
8181
_checkOutReasonCounter = new CheckOutReasonCounter();
8282

@@ -155,6 +155,8 @@ public ServerId ServerId
155155
get { return _serverId; }
156156
}
157157

158+
public ConnectionPoolSettings Settings => _settings;
159+
158160
public int UsedCount
159161
{
160162
get
@@ -164,6 +166,9 @@ public int UsedCount
164166
}
165167
}
166168

169+
// internal properties
170+
internal ListConnectionHolder ConnectionHolder => _connectionHolder;
171+
167172
// public methods
168173
public IConnectionHandle AcquireConnection(CancellationToken cancellationToken)
169174
{
@@ -177,7 +182,7 @@ public async Task<IConnectionHandle> AcquireConnectionAsync(CancellationToken ca
177182
return await helper.AcquireConnectionAsync(cancellationToken).ConfigureAwait(false);
178183
}
179184

180-
public void Clear()
185+
public void Clear(bool closeInUseConnections = false)
181186
{
182187
lock (_poolState)
183188
{
@@ -187,8 +192,9 @@ public void Clear()
187192
{
188193
_clearingEventHandler?.Invoke(new ConnectionPoolClearingEvent(_serverId, _settings));
189194

190-
_maintenanceHelper.Cancel();
195+
int? maxGenerationToReap = closeInUseConnections ? _generation : null;
191196
_generation++;
197+
_maintenanceHelper.Stop(maxGenerationToReap);
192198

193199
_maxConnectionsQueue.Signal();
194200
_maxConnectingQueue.Signal();
@@ -267,8 +273,8 @@ public void Dispose()
267273
_closingEventHandler(new ConnectionPoolClosingEvent(_serverId));
268274
}
269275

270-
_connectionHolder.Clear();
271276
_maintenanceHelper.Dispose();
277+
_connectionHolder.Clear();
272278
_maxConnectionsQueue.Dispose();
273279
_maxConnectingQueue.Dispose();
274280
if (_closedEventHandler != null)
@@ -290,56 +296,13 @@ public int GetGeneration(ObjectId? serviceId)
290296
return Generation;
291297
}
292298

293-
// private methods
294-
private void MaintainSize(CancellationToken cancellationToken)
299+
// internal methods
300+
internal SemaphoreSlimSignalable.SemaphoreSlimSignalableAwaiter CreateMaxConnectionsAwaiter()
295301
{
296-
try
297-
{
298-
while (!cancellationToken.IsCancellationRequested)
299-
{
300-
try
301-
{
302-
_connectionHolder.Prune(cancellationToken);
303-
EnsureMinSize(cancellationToken);
304-
}
305-
catch
306-
{
307-
// ignore exceptions
308-
}
309-
ThreadHelper.Sleep(_settings.MaintenanceInterval, cancellationToken);
310-
}
311-
}
312-
catch
313-
{
314-
// ignore exceptions
315-
}
316-
}
317-
318-
private void EnsureMinSize(CancellationToken cancellationToken)
319-
{
320-
var minTimeout = TimeSpan.FromMilliseconds(20);
321-
322-
while (CreatedCount < _settings.MinConnections && !cancellationToken.IsCancellationRequested)
323-
{
324-
using (var poolAwaiter = _maxConnectionsQueue.CreateAwaiter())
325-
{
326-
var entered = poolAwaiter.WaitSignaled(minTimeout, cancellationToken);
327-
if (!entered)
328-
{
329-
return;
330-
}
331-
332-
using (var connectionCreator = new ConnectionCreator(this, minTimeout))
333-
{
334-
var connection = connectionCreator.CreateOpened(cancellationToken);
335-
_connectionHolder.Return(connection);
336-
}
337-
}
338-
339-
cancellationToken.ThrowIfCancellationRequested();
340-
}
302+
return _maxConnectionsQueue.CreateAwaiter();
341303
}
342304

305+
// private methods
343306
private void ReleaseConnection(PooledConnection connection)
344307
{
345308
if (_checkingInConnectionEventHandler != null)

src/MongoDB.Driver.Core/Core/ConnectionPools/IConnectionPool.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public interface IConnectionPool : IDisposable
6363
/// <summary>
6464
/// Clears the connection pool and marks it as paused.
6565
/// </summary>
66-
void Clear();
66+
/// <param name="closeInUseConnections">Whether in use connections should be closed.</param>
67+
void Clear(bool closeInUseConnections = false);
6768

6869
/// <summary>
6970
/// Clears the connection pool for the specified service.

0 commit comments

Comments
 (0)