Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/MongoDB.Driver/Core/Clusters/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ private sealed class ServerSelectionWaitQueue : IDisposable
private readonly InterlockedInt32 _rapidHeartbeatTimerCallbackState;

private int _serverSelectionWaitQueueSize;
private bool _disposed;

public ServerSelectionWaitQueue(Cluster cluster)
{
Expand All @@ -455,7 +456,11 @@ public ServerSelectionWaitQueue(Cluster cluster)

public void Dispose()
{
_rapidHeartbeatTimer.Dispose();
lock (_serverSelectionWaitQueueLock)
{
_disposed = true;
_rapidHeartbeatTimer.Dispose();
}
}

public IDisposable Enter(OperationContext operationContext, IServerSelector selector, ClusterDescription clusterDescription, long? operationId)
Expand Down Expand Up @@ -489,6 +494,11 @@ private void ExitServerSelectionWaitQueue()
{
if (--_serverSelectionWaitQueueSize == 0)
{
if (_disposed)
{
return;
}

_rapidHeartbeatTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
}
}
Expand Down
68 changes: 42 additions & 26 deletions src/MongoDB.Driver/Core/Connections/TcpStreamFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,41 +165,57 @@ private void ConfigureConnectedSocket(Socket socket)

private void Connect(Socket socket, EndPoint endPoint, CancellationToken cancellationToken)
{
IAsyncResult connectOperation;
#if NET472
if (endPoint is DnsEndPoint dnsEndPoint)
{
// mono doesn't support DnsEndPoint in its BeginConnect method.
connectOperation = socket.BeginConnect(dnsEndPoint.Host, dnsEndPoint.Port, null, null);
}
else
{
connectOperation = socket.BeginConnect(endPoint, null, null);
}
#else
connectOperation = socket.BeginConnect(endPoint, null, null);
#endif

WaitHandle.WaitAny([connectOperation.AsyncWaitHandle, cancellationToken.WaitHandle], _settings.ConnectTimeout);

if (!connectOperation.IsCompleted)
var wasCallbackExecuted = false;
using var timeoutCancellationTokenSource = new CancellationTokenSource(_settings.ConnectTimeout);
using var combinedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token);
using var cancellationSubscription = combinedCancellationTokenSource.Token.Register(() =>
{
try
{
wasCallbackExecuted = true;
socket.Dispose();
} catch { }

cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException($"Timed out connecting to {endPoint}. Timeout was {_settings.ConnectTimeout}.");
}
}
catch
{
// Ignore any exception here, as we should avoid throwing in callback.
}
});

try
{
socket.EndConnect(connectOperation);
#if NET472
if (endPoint is DnsEndPoint dnsEndPoint)
{
// mono doesn't support DnsEndPoint in its Connect method.
socket.Connect(dnsEndPoint.Host, dnsEndPoint.Port);
}
else
{
socket.Connect(endPoint);
}
#else
socket.Connect(endPoint);
#endif
}
catch
catch (Exception)
{
try { socket.Dispose(); } catch { }
if (!wasCallbackExecuted)
{
try
{
socket.Dispose();
}
catch (Exception)
{
}
}

cancellationToken.ThrowIfCancellationRequested();
if (timeoutCancellationTokenSource.IsCancellationRequested)
{
throw new TimeoutException($"Timed out connecting to {endPoint}. Timeout was {_settings.ConnectTimeout}.");
}

throw;
}
}
Expand Down
3 changes: 0 additions & 3 deletions tests/MongoDB.Driver.Tests/ClusterRegistryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.Servers;
using MongoDB.Driver.Core.TestHelpers.Logging;
using MongoDB.Driver.TestHelpers;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -73,7 +72,6 @@ public void DisposingClusterSource_should_use_cluster_registry_and_return_cluste
ClusterRegistry.Instance._registry().Keys.Should().NotContain(clusterKey);
}

#if WINDOWS
[Fact]
public void Instance_should_return_the_same_instance_every_time()
{
Expand Down Expand Up @@ -221,7 +219,6 @@ public void UnregisterAndDisposeCluster_should_unregister_and_dispose_the_cluste
subject._registry().Count.Should().Be(0);
cluster._state().Should().Be(2);
}
#endif
}

internal static class ClusterRegistryReflector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TcpStreamFactoryTests
{
[Theory]
[ParameterAttributeData]
public void Connect_should_dispose_socket_if_socket_fails([Values(false, true)] bool async)
public async Task Connect_should_dispose_socket_if_socket_fails([Values(false, true)] bool async)
{
RequireServer.Check();

Expand All @@ -43,20 +43,9 @@ public void Connect_should_dispose_socket_if_socket_fails([Values(false, true)]

using (var testSocket = new TestSocket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
Exception exception;
if (async)
{
exception = Record.Exception(
() =>
subject
.ConnectAsync(testSocket, endpoint, CancellationToken.None)
.GetAwaiter()
.GetResult());
}
else
{
exception = Record.Exception(() => subject.Connect(testSocket, endpoint, CancellationToken.None));
}
var exception = async ?
await Record.ExceptionAsync(() => subject.ConnectAsync(testSocket, endpoint, CancellationToken.None)) :
Record.Exception(() => subject.Connect(testSocket, endpoint, CancellationToken.None));

exception.Should().NotBeNull();
testSocket.DisposeAttempts.Should().Be(1);
Expand All @@ -66,83 +55,64 @@ public void Connect_should_dispose_socket_if_socket_fails([Values(false, true)]
[Fact]
public void Constructor_should_throw_an_ArgumentNullException_when_tcpStreamSettings_is_null()
{
Action act = () => new TcpStreamFactory(null);
var exception = Record.Exception(() => new TcpStreamFactory(null));

act.ShouldThrow<ArgumentNullException>();
exception.Should().BeOfType<ArgumentNullException>().Subject
.ParamName.Should().Be("settings");
}

[Theory]
[ParameterAttributeData]
public void CreateStream_should_throw_a_SocketException_when_the_endpoint_could_not_be_resolved(
[Values(false, true)]
bool async)
public async Task CreateStream_should_throw_a_SocketException_when_the_endpoint_could_not_be_resolved([Values(false, true)]bool async)
{
var subject = new TcpStreamFactory();

Action act;
if (async)
{
act = () => subject.CreateStreamAsync(new DnsEndPoint("not-gonna-exist-i-hope", 27017), CancellationToken.None).GetAwaiter().GetResult();
}
else
{
act = () => subject.CreateStream(new DnsEndPoint("not-gonna-exist-i-hope", 27017), CancellationToken.None);
}
var exception = async ?
await Record.ExceptionAsync(() => subject.CreateStreamAsync(new DnsEndPoint("not-gonna-exist-i-hope", 27017), CancellationToken.None)) :
Record.Exception(() => subject.CreateStream(new DnsEndPoint("not-gonna-exist-i-hope", 27017), CancellationToken.None));

act.ShouldThrow<SocketException>();
exception.Should().BeAssignableTo<SocketException>();
}

[Theory]
[ParameterAttributeData]
public void CreateStream_should_throw_when_cancellation_is_requested(
[Values(false, true)]
bool async)
public async Task CreateStream_should_throw_when_cancellation_is_requested([Values(false, true)]bool async)
{
var subject = new TcpStreamFactory();
var endPoint = new IPEndPoint(new IPAddress(0x01010101), 12345); // a non-existent host and port
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMilliseconds(20));

Action action;
var exception = async ?
await Record.ExceptionAsync(() => subject.CreateStreamAsync(endPoint, cancellationTokenSource.Token)) :
Record.Exception(() => subject.CreateStream(endPoint, cancellationTokenSource.Token));
if (async)
{
action = () => subject.CreateStreamAsync(endPoint, cancellationTokenSource.Token).GetAwaiter().GetResult();
exception.Should().BeOfType<TaskCanceledException>();
}
else
{
action = () => subject.CreateStream(endPoint, cancellationTokenSource.Token);
exception.Should().BeOfType<OperationCanceledException>();
}

action.ShouldThrow<OperationCanceledException>();
}

[Theory]
[ParameterAttributeData]
public void CreateStream_should_throw_when_connect_timeout_has_expired(
[Values(false, true)]
bool async)
public async Task CreateStream_should_throw_when_connect_timeout_has_expired([Values(false, true)]bool async)
{
var settings = new TcpStreamSettings(connectTimeout: TimeSpan.FromMilliseconds(20));
var subject = new TcpStreamFactory(settings);
var endPoint = new IPEndPoint(new IPAddress(0x01010101), 12345); // a non-existent host and port

Action action;
if (async)
{
action = () => subject.CreateStreamAsync(endPoint, CancellationToken.None).GetAwaiter().GetResult(); ;
}
else
{
action = () => subject.CreateStream(endPoint, CancellationToken.None);
}
var exception = async ?
await Record.ExceptionAsync(() => subject.CreateStreamAsync(endPoint, CancellationToken.None)) :
Record.Exception(() => subject.CreateStream(endPoint, CancellationToken.None));

action.ShouldThrow<TimeoutException>();
exception.Should().BeOfType<TimeoutException>();
}

[Theory]
[ParameterAttributeData]
public void CreateStream_should_call_the_socketConfigurator(
[Values(false, true)]
bool async)
public async Task CreateStream_should_call_the_socketConfigurator([Values(false, true)]bool async)
{
RequireServer.Check();
var socketConfiguratorWasCalled = false;
Expand All @@ -153,7 +123,7 @@ public void CreateStream_should_call_the_socketConfigurator(

if (async)
{
subject.CreateStreamAsync(endPoint, CancellationToken.None).GetAwaiter().GetResult();
await subject.CreateStreamAsync(endPoint, CancellationToken.None);
}
else
{
Expand All @@ -165,9 +135,7 @@ public void CreateStream_should_call_the_socketConfigurator(

[Theory]
[ParameterAttributeData]
public void CreateStream_should_connect_to_a_running_server_and_return_a_non_null_stream(
[Values(false, true)]
bool async)
public async Task CreateStream_should_connect_to_a_running_server_and_return_a_non_null_stream([Values(false, true)]bool async)
{
RequireServer.Check();
var subject = new TcpStreamFactory();
Expand All @@ -176,7 +144,7 @@ public void CreateStream_should_connect_to_a_running_server_and_return_a_non_nul
Stream stream;
if (async)
{
stream = subject.CreateStreamAsync(endPoint, CancellationToken.None).GetAwaiter().GetResult();
stream = await subject.CreateStreamAsync(endPoint, CancellationToken.None);
}
else
{
Expand All @@ -188,9 +156,7 @@ public void CreateStream_should_connect_to_a_running_server_and_return_a_non_nul

[Theory]
[ParameterAttributeData]
public void SocketConfigurator_can_be_used_to_set_keepAlive(
[Values(false, true)]
bool async)
public async Task SocketConfigurator_can_be_used_to_set_keepAlive([Values(false, true)]bool async)
{
RequireServer.Check();
Action<Socket> socketConfigurator = s => s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
Expand All @@ -201,7 +167,7 @@ public void SocketConfigurator_can_be_used_to_set_keepAlive(
Stream stream;
if (async)
{
stream = subject.CreateStreamAsync(endPoint, CancellationToken.None).GetAwaiter().GetResult();
stream = await subject.CreateStreamAsync(endPoint, CancellationToken.None);
}
else
{
Expand Down
7 changes: 6 additions & 1 deletion tests/MongoDB.Driver.Tests/Core/Jira/CSharp3302Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ public async Task RapidHeartbeatTimerCallback_should_ignore_reentrant_calls()
cluster.Initialize();

// Trigger Cluster._rapidHeartbeatTimer
_ = cluster.SelectServerAsync(OperationContext.NoTimeout, CreateWritableServerAndEndPointSelector(__endPoint1));
using var cancellationTokenSource = new CancellationTokenSource();
var operationContext = new OperationContext(Timeout.InfiniteTimeSpan, cancellationTokenSource.Token);
cluster.SelectServerAsync(operationContext, CreateWritableServerAndEndPointSelector(__endPoint1))
.IgnoreExceptions();

// Wait for all heartbeats to complete
await Task.WhenAny(allHeartbeatsReceived.Task, Task.Delay(1000));

cancellationTokenSource.Cancel();
}

allHeartbeatsReceived.Task.Status.Should().Be(TaskStatus.RanToCompletion);
Expand Down
33 changes: 33 additions & 0 deletions tests/MongoDB.Driver.Tests/UnobservedTaskExceptionTracking.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using FluentAssertions;
using MongoDB.TestHelpers.XunitExtensions.TimeoutEnforcing;

namespace MongoDB.Driver.Tests;

public class UnobservedTaskExceptionTracking
{
[UnobservedExceptionTrackingFact]
public void EnsureNoUnobservedTaskException()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why don't we add this to BSON a well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues with BSON?

{
GC.Collect();
GC.WaitForPendingFinalizers();

UnobservedExceptionTestDiscoverer.UnobservedExceptions.Should().BeEmpty();
}
}

4 changes: 4 additions & 0 deletions tests/MongoDB.TestHelpers/MongoDB.TestHelpers.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@
<Description>Helper classes applicable to all test projects.</Description>
</PropertyGroup>

<PropertyGroup>
<DefineConstants>$(DefineConstants);UNOBSERVED_TASK_EXCEPTION_DEBUGGING</DefineConstants>
</PropertyGroup>

</Project>
Loading