Skip to content

Commit 10bd782

Browse files
committed
CSHARP-2188: Decrease likelihood of session leaks by releasing the
session as soon as possible in AsyncCursor.
1 parent 6b1b509 commit 10bd782

30 files changed

+341
-304
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/* Copyright 2018-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using MongoDB.Bson;
17+
using MongoDB.Driver.Core.Clusters;
18+
using MongoDB.Driver.Core.Misc;
19+
using MongoDB.Driver.Core.Operations;
20+
21+
namespace MongoDB.Driver.Core.Bindings
22+
{
23+
/// <summary>
24+
/// Represents a session.
25+
/// </summary>
26+
/// <seealso cref="MongoDB.Driver.Core.Bindings.ICoreSession" />
27+
internal sealed class CoreSession : ICoreSession
28+
{
29+
// private fields
30+
private readonly IClusterClock _clusterClock = new ClusterClock();
31+
private bool _disposed;
32+
private readonly bool _isCausallyConsistent;
33+
private readonly bool _isImplicit;
34+
private readonly IOperationClock _operationClock = new OperationClock();
35+
private readonly ICoreServerSession _serverSession;
36+
37+
// constructors
38+
/// <summary>
39+
/// Initializes a new instance of the <see cref="CoreSession" /> class.
40+
/// </summary>
41+
/// <param name="serverSession">The server session.</param>
42+
/// <param name="isCausallyConsistent">if set to <c>true</c> [is causally consistent].</param>
43+
/// <param name="isImplicit">if set to <c>true</c> [is implicit].</param>
44+
public CoreSession(
45+
ICoreServerSession serverSession,
46+
bool isCausallyConsistent = false,
47+
bool isImplicit = false)
48+
{
49+
_serverSession = Ensure.IsNotNull(serverSession, nameof(serverSession));
50+
_isCausallyConsistent = isCausallyConsistent;
51+
_isImplicit = isImplicit;
52+
}
53+
54+
// public properties
55+
/// <inheritdoc />
56+
public BsonDocument ClusterTime => _clusterClock.ClusterTime;
57+
58+
/// <inheritdoc />
59+
public BsonDocument Id => _serverSession.Id;
60+
61+
/// <inheritdoc />
62+
public bool IsCausallyConsistent => _isCausallyConsistent;
63+
64+
/// <inheritdoc />
65+
public bool IsImplicit => _isImplicit;
66+
67+
/// <inheritdoc />
68+
public BsonTimestamp OperationTime => _operationClock.OperationTime;
69+
70+
// public methods
71+
/// <inheritdoc />
72+
public void AdvanceClusterTime(BsonDocument newClusterTime)
73+
{
74+
_clusterClock.AdvanceClusterTime(newClusterTime);
75+
}
76+
77+
/// <inheritdoc />
78+
public void AdvanceOperationTime(BsonTimestamp newOperationTime)
79+
{
80+
_operationClock.AdvanceOperationTime(newOperationTime);
81+
}
82+
83+
/// <inheritdoc />
84+
public long AdvanceTransactionNumber()
85+
{
86+
return _serverSession.AdvanceTransactionNumber();
87+
}
88+
89+
/// <inheritdoc />
90+
public void Dispose()
91+
{
92+
if (!_disposed)
93+
{
94+
_serverSession.Dispose();
95+
_disposed = true;
96+
}
97+
}
98+
99+
/// <inheritdoc />
100+
public void WasUsed()
101+
{
102+
_serverSession.WasUsed();
103+
}
104+
}
105+
}

src/MongoDB.Driver.Core/Core/Operations/AsyncCursor.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class AsyncCursor<TDocument> : IAsyncCursor<TDocument>
4747
// fields
4848
private readonly int? _batchSize;
4949
private readonly CollectionNamespace _collectionNamespace;
50-
private readonly IChannelSource _channelSource;
50+
private IChannelSource _channelSource;
5151
private int _count;
5252
private IReadOnlyList<TDocument> _currentBatch;
5353
private long _cursorId;
@@ -104,12 +104,7 @@ public AsyncCursor(
104104
}
105105
_count = _firstBatch.Count;
106106

107-
// if we aren't going to need the channel source we can go ahead and Dispose it now
108-
if (_cursorId == 0 && _channelSource != null)
109-
{
110-
_channelSource.Dispose();
111-
_channelSource = null;
112-
}
107+
DisposeChannelSourceIfNoLongerNeeded();
113108
}
114109

115110
// properties
@@ -276,6 +271,15 @@ protected virtual void Dispose(bool disposing)
276271
_disposed = true;
277272
}
278273

274+
private void DisposeChannelSourceIfNoLongerNeeded()
275+
{
276+
if (_channelSource != null && _cursorId == 0)
277+
{
278+
_channelSource.Dispose();
279+
_channelSource = null;
280+
}
281+
}
282+
279283
private CursorBatch<TDocument> GetNextBatch(CancellationToken cancellationToken)
280284
{
281285
using (EventContext.BeginOperation(_operationId))
@@ -373,6 +377,8 @@ private void SaveBatch(CursorBatch<TDocument> batch)
373377

374378
_currentBatch = documents;
375379
_cursorId = batch.CursorId;
380+
381+
DisposeChannelSourceIfNoLongerNeeded();
376382
}
377383

378384
private void ThrowIfDisposed()

src/MongoDB.Driver.Core/MongoDB.Driver.Core.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
<Compile Include="Core\Bindings\ChannelSourceReadWriteBinding.cs" />
9797
<Compile Include="Core\Bindings\CoreServerSession.cs" />
9898
<Compile Include="Core\Bindings\CoreServerSessionPool.cs" />
99+
<Compile Include="Core\Bindings\CoreSession.cs" />
99100
<Compile Include="Core\Bindings\CoreSessionHandle.cs" />
100101
<Compile Include="Core\Bindings\ICoreServerSessionPool.cs" />
101102
<Compile Include="Core\Bindings\ICoreServerSesssion.cs" />

tests/MongoDB.Driver.Core.TestHelpers/CoreTestConfiguration.cs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -241,24 +241,9 @@ public static DatabaseNamespace GetDatabaseNamespaceForTestClass(Type testClassT
241241
return new DatabaseNamespace(databaseName);
242242
}
243243

244-
public static IReadBinding GetReadBinding(ICoreSessionHandle session)
245-
{
246-
return GetReadBinding(ReadPreference.Primary, session);
247-
}
248-
249-
public static IReadBinding GetReadBinding(ReadPreference readPreference, ICoreSessionHandle session)
250-
{
251-
return new ReadPreferenceBinding(__cluster.Value, readPreference, session);
252-
}
253-
254-
public static IReadWriteBinding GetReadWriteBinding(ICoreSessionHandle session)
255-
{
256-
return new WritableServerBinding(__cluster.Value, session);
257-
}
258-
259244
public static IEnumerable<string> GetModules()
260245
{
261-
var session = CoreTestConfiguration.StartSession();
246+
using (var session = StartSession())
262247
using (var binding = GetReadBinding(session))
263248
{
264249
var command = new BsonDocument("buildinfo", 1);
@@ -278,7 +263,7 @@ public static IEnumerable<string> GetModules()
278263

279264
public static string GetStorageEngine()
280265
{
281-
var session = CoreTestConfiguration.StartSession();
266+
using (var session = StartSession())
282267
using (var binding = GetReadWriteBinding(session))
283268
{
284269
var command = new BsonDocument("serverStatus", 1);
@@ -305,7 +290,9 @@ public static ICoreSessionHandle StartSession(ICluster cluster)
305290
{
306291
if (AreSessionsSupported(cluster))
307292
{
308-
return TestCoreSession.NewHandle();
293+
var serverSession = cluster.AcquireServerSession();
294+
var session = new CoreSession(serverSession);
295+
return new CoreSessionHandle(session);
309296
}
310297
else
311298
{
@@ -371,7 +358,7 @@ private static string TruncateDatabaseNameIfTooLong(string databaseName)
371358
}
372359
#endregion
373360

374-
// methods
361+
// private methods
375362
private static bool AreSessionsSupported(ICluster cluster)
376363
{
377364
SpinWait.SpinUntil(() => cluster.Description.Servers.Any(s => s.State == ServerState.Connected), TimeSpan.FromSeconds(30));
@@ -389,13 +376,28 @@ private static void DropDatabase()
389376
{
390377
var operation = new DropDatabaseOperation(__databaseNamespace, __messageEncoderSettings);
391378

392-
var session = CoreTestConfiguration.StartSession();
379+
using (var session = StartSession())
393380
using (var binding = GetReadWriteBinding(session))
394381
{
395382
operation.Execute(binding, CancellationToken.None);
396383
}
397384
}
398385

386+
private static IReadBinding GetReadBinding(ICoreSessionHandle session)
387+
{
388+
return GetReadBinding(ReadPreference.Primary, session);
389+
}
390+
391+
private static IReadBinding GetReadBinding(ReadPreference readPreference, ICoreSessionHandle session)
392+
{
393+
return new ReadPreferenceBinding(__cluster.Value, readPreference, session.Fork());
394+
}
395+
396+
private static IReadWriteBinding GetReadWriteBinding(ICoreSessionHandle session)
397+
{
398+
return new WritableServerBinding(__cluster.Value, session.Fork());
399+
}
400+
399401
public static void TearDown()
400402
{
401403
if (__cluster.IsValueCreated)

tests/MongoDB.Driver.Core.TestHelpers/MongoDB.Driver.Core.TestHelpers.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
<Compile Include="FailPoint.cs" />
6464
<Compile Include="ICoreSessionHandleExtensions.cs" />
6565
<Compile Include="Properties\AssemblyInfo.cs" />
66-
<Compile Include="TestCoreSession.cs" />
6766
<Compile Include="XunitExtensions\RequireServer.cs" />
6867
</ItemGroup>
6968
<ItemGroup>

tests/MongoDB.Driver.Core.TestHelpers/TestCoreSession.cs

Lines changed: 0 additions & 83 deletions
This file was deleted.

tests/MongoDB.Driver.Core.Tests/Core/Operations/AggregateExplainOperationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public void Execute_should_throw_when_maxTime_is_exceeded(
371371

372372
var subject = new AggregateExplainOperation(_collectionNamespace, __pipeline, _messageEncoderSettings) { MaxTime = TimeSpan.FromSeconds(9001) };
373373

374-
using (var failPoint = FailPoint.ConfigureAlwaysOn(CoreTestConfiguration.Cluster, _session, FailPointName.MaxTimeAlwaysTimeout))
374+
using (var failPoint = FailPoint.ConfigureAlwaysOn(_cluster, _session, FailPointName.MaxTimeAlwaysTimeout))
375375
{
376376
var exception = Record.Exception(() => ExecuteOperation(subject, failPoint.Binding, async));
377377

tests/MongoDB.Driver.Core.Tests/Core/Operations/AggregateOperationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ public void Execute_should_throw_when_maxTime_is_exceeded(
569569

570570
var subject = new AggregateOperation<BsonDocument>(_collectionNamespace, __pipeline, __resultSerializer, _messageEncoderSettings) { MaxTime = TimeSpan.FromSeconds(9001) };
571571

572-
using (var failPoint = FailPoint.ConfigureAlwaysOn(CoreTestConfiguration.Cluster, _session, FailPointName.MaxTimeAlwaysTimeout))
572+
using (var failPoint = FailPoint.ConfigureAlwaysOn(_cluster, _session, FailPointName.MaxTimeAlwaysTimeout))
573573
{
574574
var exception = Record.Exception(() => ExecuteOperation(subject, failPoint.Binding, async));
575575

tests/MongoDB.Driver.Core.Tests/Core/Operations/AggregateToCollectionOperationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ public void Execute_should_throw_when_maxTime_is_exceeded(
513513
MaxTime = TimeSpan.FromSeconds(9001)
514514
};
515515

516-
using (var failPoint = FailPoint.ConfigureAlwaysOn(CoreTestConfiguration.Cluster, _session, FailPointName.MaxTimeAlwaysTimeout))
516+
using (var failPoint = FailPoint.ConfigureAlwaysOn(_cluster, _session, FailPointName.MaxTimeAlwaysTimeout))
517517
{
518518
var exception = Record.Exception(() => ExecuteOperation(subject, failPoint.Binding, async));
519519

0 commit comments

Comments
 (0)