Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/MongoDB.Driver/Core/Clusters/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,14 @@ private void ExitServerSelectionWaitQueue()
{
if (--_serverSelectionWaitQueueSize == 0)
{
_rapidHeartbeatTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
try
{
_rapidHeartbeatTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
}
catch (ObjectDisposedException)
{
// Ignore ObjectDisposedException here, as ExitServerSelectionWaitQueue could be done after the WaitQueue was disposed.
}
}
}
}
Expand Down
66 changes: 39 additions & 27 deletions src/MongoDB.Driver/Core/Connections/TcpStreamFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,42 +165,54 @@ private void ConfigureConnectedSocket(Socket socket)

private void Connect(Socket socket, EndPoint endPoint, CancellationToken cancellationToken)
{
IAsyncResult connectOperation;
#if NET472
if (endPoint is DnsEndPoint dnsEndPoint)
{
// mono doesn't support DnsEndPoint in its BeginConnect method.
connectOperation = socket.BeginConnect(dnsEndPoint.Host, dnsEndPoint.Port, null, null);
}
else
var isSocketDisposed = false;
using var timeoutCancellationTokenSource = new CancellationTokenSource(_settings.ConnectTimeout);
using var combinedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token);
using var cancellationSubscription = combinedCancellationTokenSource.Token.Register(DisposeSocket);

try
{
connectOperation = socket.BeginConnect(endPoint, null, null);
}
#if NET472
if (endPoint is DnsEndPoint dnsEndPoint)
{
// mono doesn't support DnsEndPoint in its Connect method.
socket.Connect(dnsEndPoint.Host, dnsEndPoint.Port);
}
else
{
socket.Connect(endPoint);
}
#else
connectOperation = socket.BeginConnect(endPoint, null, null);
socket.Connect(endPoint);
#endif

WaitHandle.WaitAny([connectOperation.AsyncWaitHandle, cancellationToken.WaitHandle], _settings.ConnectTimeout);

if (!connectOperation.IsCompleted)
}
catch (Exception)
{
try
if (!isSocketDisposed)
{
socket.Dispose();
} catch { }
DisposeSocket();
}

cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException($"Timed out connecting to {endPoint}. Timeout was {_settings.ConnectTimeout}.");
}
if (timeoutCancellationTokenSource.IsCancellationRequested)
{
throw new TimeoutException($"Timed out connecting to {endPoint}. Timeout was {_settings.ConnectTimeout}.");
}

try
{
socket.EndConnect(connectOperation);
throw;
}
catch

void DisposeSocket()
{
try { socket.Dispose(); } catch { }
throw;
isSocketDisposed = true;
try
{
socket.Dispose();
}
catch
{
// Ignore any exceptions.
}
}
}

Expand Down Expand Up @@ -228,8 +240,8 @@ private async Task ConnectAsync(Socket socket, EndPoint endPoint, CancellationTo
{
try
{
socket.Dispose();
connectTask.IgnoreExceptions();
socket.Dispose();
}
catch { }

Expand Down
33 changes: 33 additions & 0 deletions tests/MongoDB.Bson.Tests/UnobservedTaskExceptionTracking.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using FluentAssertions;
using MongoDB.TestHelpers.XunitExtensions.TimeoutEnforcing;

namespace MongoDB.Bson.Tests;

public class UnobservedTaskExceptionTracking
{
[UnobservedExceptionTrackingFact]
public void EnsureNoUnobservedTaskException()
{
GC.Collect();
GC.WaitForPendingFinalizers();

UnobservedExceptionTestDiscoverer.UnobservedExceptions.Should().BeEmpty();
}
}

3 changes: 0 additions & 3 deletions tests/MongoDB.Driver.Tests/ClusterRegistryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.Servers;
using MongoDB.Driver.Core.TestHelpers.Logging;
using MongoDB.Driver.TestHelpers;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -73,7 +72,6 @@ public void DisposingClusterSource_should_use_cluster_registry_and_return_cluste
ClusterRegistry.Instance._registry().Keys.Should().NotContain(clusterKey);
}

#if WINDOWS
[Fact]
public void Instance_should_return_the_same_instance_every_time()
{
Expand Down Expand Up @@ -221,7 +219,6 @@ public void UnregisterAndDisposeCluster_should_unregister_and_dispose_the_cluste
subject._registry().Count.Should().Be(0);
cluster._state().Should().Be(2);
}
#endif
}

internal static class ClusterRegistryReflector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TcpStreamFactoryTests
{
[Theory]
[ParameterAttributeData]
public void Connect_should_dispose_socket_if_socket_fails([Values(false, true)] bool async)
public async Task Connect_should_dispose_socket_if_socket_fails([Values(false, true)] bool async)
{
RequireServer.Check();

Expand All @@ -43,20 +43,9 @@ public void Connect_should_dispose_socket_if_socket_fails([Values(false, true)]

using (var testSocket = new TestSocket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
Exception exception;
if (async)
{
exception = Record.Exception(
() =>
subject
.ConnectAsync(testSocket, endpoint, CancellationToken.None)
.GetAwaiter()
.GetResult());
}
else
{
exception = Record.Exception(() => subject.Connect(testSocket, endpoint, CancellationToken.None));
}
var exception = async ?
await Record.ExceptionAsync(() => subject.ConnectAsync(testSocket, endpoint, CancellationToken.None)) :
Record.Exception(() => subject.Connect(testSocket, endpoint, CancellationToken.None));

exception.Should().NotBeNull();
testSocket.DisposeAttempts.Should().Be(1);
Expand All @@ -66,83 +55,64 @@ public void Connect_should_dispose_socket_if_socket_fails([Values(false, true)]
[Fact]
public void Constructor_should_throw_an_ArgumentNullException_when_tcpStreamSettings_is_null()
{
Action act = () => new TcpStreamFactory(null);
var exception = Record.Exception(() => new TcpStreamFactory(null));

act.ShouldThrow<ArgumentNullException>();
exception.Should().BeOfType<ArgumentNullException>().Subject
.ParamName.Should().Be("settings");
}

[Theory]
[ParameterAttributeData]
public void CreateStream_should_throw_a_SocketException_when_the_endpoint_could_not_be_resolved(
[Values(false, true)]
bool async)
public async Task CreateStream_should_throw_a_SocketException_when_the_endpoint_could_not_be_resolved([Values(false, true)]bool async)
{
var subject = new TcpStreamFactory();

Action act;
if (async)
{
act = () => subject.CreateStreamAsync(new DnsEndPoint("not-gonna-exist-i-hope", 27017), CancellationToken.None).GetAwaiter().GetResult();
}
else
{
act = () => subject.CreateStream(new DnsEndPoint("not-gonna-exist-i-hope", 27017), CancellationToken.None);
}
var exception = async ?
await Record.ExceptionAsync(() => subject.CreateStreamAsync(new DnsEndPoint("not-gonna-exist-i-hope", 27017), CancellationToken.None)) :
Record.Exception(() => subject.CreateStream(new DnsEndPoint("not-gonna-exist-i-hope", 27017), CancellationToken.None));

act.ShouldThrow<SocketException>();
exception.Should().BeAssignableTo<SocketException>();
}

[Theory]
[ParameterAttributeData]
public void CreateStream_should_throw_when_cancellation_is_requested(
[Values(false, true)]
bool async)
public async Task CreateStream_should_throw_when_cancellation_is_requested([Values(false, true)]bool async)
{
var subject = new TcpStreamFactory();
var endPoint = new IPEndPoint(new IPAddress(0x01010101), 12345); // a non-existent host and port
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMilliseconds(20));

Action action;
var exception = async ?
await Record.ExceptionAsync(() => subject.CreateStreamAsync(endPoint, cancellationTokenSource.Token)) :
Record.Exception(() => subject.CreateStream(endPoint, cancellationTokenSource.Token));
if (async)
{
action = () => subject.CreateStreamAsync(endPoint, cancellationTokenSource.Token).GetAwaiter().GetResult();
exception.Should().BeOfType<TaskCanceledException>();
}
else
{
action = () => subject.CreateStream(endPoint, cancellationTokenSource.Token);
exception.Should().BeOfType<OperationCanceledException>();
}

action.ShouldThrow<OperationCanceledException>();
}

[Theory]
[ParameterAttributeData]
public void CreateStream_should_throw_when_connect_timeout_has_expired(
[Values(false, true)]
bool async)
public async Task CreateStream_should_throw_when_connect_timeout_has_expired([Values(false, true)]bool async)
{
var settings = new TcpStreamSettings(connectTimeout: TimeSpan.FromMilliseconds(20));
var subject = new TcpStreamFactory(settings);
var endPoint = new IPEndPoint(new IPAddress(0x01010101), 12345); // a non-existent host and port

Action action;
if (async)
{
action = () => subject.CreateStreamAsync(endPoint, CancellationToken.None).GetAwaiter().GetResult(); ;
}
else
{
action = () => subject.CreateStream(endPoint, CancellationToken.None);
}
var exception = async ?
await Record.ExceptionAsync(() => subject.CreateStreamAsync(endPoint, CancellationToken.None)) :
Record.Exception(() => subject.CreateStream(endPoint, CancellationToken.None));

action.ShouldThrow<TimeoutException>();
exception.Should().BeOfType<TimeoutException>();
}

[Theory]
[ParameterAttributeData]
public void CreateStream_should_call_the_socketConfigurator(
[Values(false, true)]
bool async)
public async Task CreateStream_should_call_the_socketConfigurator([Values(false, true)]bool async)
{
RequireServer.Check();
var socketConfiguratorWasCalled = false;
Expand All @@ -153,7 +123,7 @@ public void CreateStream_should_call_the_socketConfigurator(

if (async)
{
subject.CreateStreamAsync(endPoint, CancellationToken.None).GetAwaiter().GetResult();
await subject.CreateStreamAsync(endPoint, CancellationToken.None);
}
else
{
Expand All @@ -165,9 +135,7 @@ public void CreateStream_should_call_the_socketConfigurator(

[Theory]
[ParameterAttributeData]
public void CreateStream_should_connect_to_a_running_server_and_return_a_non_null_stream(
[Values(false, true)]
bool async)
public async Task CreateStream_should_connect_to_a_running_server_and_return_a_non_null_stream([Values(false, true)]bool async)
{
RequireServer.Check();
var subject = new TcpStreamFactory();
Expand All @@ -176,7 +144,7 @@ public void CreateStream_should_connect_to_a_running_server_and_return_a_non_nul
Stream stream;
if (async)
{
stream = subject.CreateStreamAsync(endPoint, CancellationToken.None).GetAwaiter().GetResult();
stream = await subject.CreateStreamAsync(endPoint, CancellationToken.None);
}
else
{
Expand All @@ -188,9 +156,7 @@ public void CreateStream_should_connect_to_a_running_server_and_return_a_non_nul

[Theory]
[ParameterAttributeData]
public void SocketConfigurator_can_be_used_to_set_keepAlive(
[Values(false, true)]
bool async)
public async Task SocketConfigurator_can_be_used_to_set_keepAlive([Values(false, true)]bool async)
{
RequireServer.Check();
Action<Socket> socketConfigurator = s => s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
Expand All @@ -201,7 +167,7 @@ public void SocketConfigurator_can_be_used_to_set_keepAlive(
Stream stream;
if (async)
{
stream = subject.CreateStreamAsync(endPoint, CancellationToken.None).GetAwaiter().GetResult();
stream = await subject.CreateStreamAsync(endPoint, CancellationToken.None);
}
else
{
Expand Down
7 changes: 6 additions & 1 deletion tests/MongoDB.Driver.Tests/Core/Jira/CSharp3302Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ public async Task RapidHeartbeatTimerCallback_should_ignore_reentrant_calls()
cluster.Initialize();

// Trigger Cluster._rapidHeartbeatTimer
_ = cluster.SelectServerAsync(OperationContext.NoTimeout, CreateWritableServerAndEndPointSelector(__endPoint1));
using var cancellationTokenSource = new CancellationTokenSource();
var operationContext = new OperationContext(Timeout.InfiniteTimeSpan, cancellationTokenSource.Token);
cluster.SelectServerAsync(operationContext, CreateWritableServerAndEndPointSelector(__endPoint1))
.IgnoreExceptions();

// Wait for all heartbeats to complete
await Task.WhenAny(allHeartbeatsReceived.Task, Task.Delay(1000));

cancellationTokenSource.Cancel();
}

allHeartbeatsReceived.Task.Status.Should().Be(TaskStatus.RanToCompletion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ private void ExecuteCheckOut(
else
{
tasks[target] = CreateTask(() => CheckOut(operation, connectionPool, map));
tasks[target].IgnoreExceptions();
}
}
}
Expand Down
Loading