Skip to content

Commit ef3a6ea

Browse files
authored
CSHARP-4831: Emit heartbeat started event before connection establishment in ServerMonitor (#1253)
1 parent 370e8b0 commit ef3a6ea

File tree

4 files changed

+95
-13
lines changed

4 files changed

+95
-13
lines changed

src/MongoDB.Driver.Core/Core/Servers/ServerMonitor.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,18 +202,24 @@ private CommandWireProtocol<BsonDocument> InitializeHelloProtocol(IConnection co
202202
private IConnection InitializeConnection(CancellationToken cancellationToken) // called setUpConnection in spec
203203
{
204204
var connection = _connectionFactory.CreateConnection(_serverId, _endPoint);
205+
_eventLoggerSdam.LogAndPublish(new ServerHeartbeatStartedEvent(connection.ConnectionId, false));
205206

206207
var stopwatch = Stopwatch.StartNew();
207208
try
208209
{
209210
// if we are cancelling, it's because the server has
210211
// been shut down and we really don't need to wait.
211212
connection.Open(cancellationToken);
213+
214+
_eventLoggerSdam.LogAndPublish(new ServerHeartbeatSucceededEvent(connection.ConnectionId, stopwatch.Elapsed, false, connection.Description.HelloResult.Wrapped));
212215
}
213-
catch
216+
catch (Exception exception)
214217
{
215218
// dispose it here because the _connection is not initialized yet
216219
try { connection.Dispose(); } catch { }
220+
221+
_eventLoggerSdam.LogAndPublish(new ServerHeartbeatFailedEvent(connection.ConnectionId, stopwatch.Elapsed, exception, false));
222+
217223
throw;
218224
}
219225
stopwatch.Stop();

tests/MongoDB.Driver.Core.Tests/Core/Servers/ServerMonitorTests.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ public void DescriptionChanged_should_be_raised_during_initial_handshake()
146146
changes[0].OldServerDescription.State.Should().Be(ServerState.Disconnected);
147147
changes[0].NewServerDescription.State.Should().Be(ServerState.Connected);
148148

149-
capturedEvents.Any().Should().BeFalse();
149+
// ServerHeartbeatStartedEvent and ServerHeartbeatSucceededEvent events should be emitted during initial handshake
150+
capturedEvents.Next().Should().BeOfType<ServerHeartbeatStartedEvent>();
151+
capturedEvents.Next().Should().BeOfType<ServerHeartbeatSucceededEvent>();
150152
}
151153
#endif
152154

@@ -163,8 +165,9 @@ public void Description_should_be_connected_after_successful_heartbeat()
163165
subject.Description.State.Should().Be(ServerState.Connected);
164166
subject.Description.Type.Should().Be(ServerType.Standalone);
165167

166-
// no ServerHeartbeat events should be triggered during initial handshake
167-
capturedEvents.Any().Should().BeFalse();
168+
// ServerHeartbeatStartedEvent and ServerHeartbeatSucceededEvent events should be emitted during initial handshake
169+
capturedEvents.Next().Should().BeOfType<ServerHeartbeatStartedEvent>();
170+
capturedEvents.Next().Should().BeOfType<ServerHeartbeatSucceededEvent>();
168171
}
169172

170173
[Fact]
@@ -229,12 +232,13 @@ public void Heartbeat_should_make_immediate_next_attempt_for_streaming_protocol(
229232
: 2;
230233
capturedEvents.WaitForOrThrowIfTimeout(
231234
events =>
232-
events.Count(e => e is ServerDescriptionChangedEvent) >= expectedServerDescriptionChangedEventCount, // the connection has been initialized and the first heatbeat event has been fired
235+
events.Count(e => e is ServerDescriptionChangedEvent) >= expectedServerDescriptionChangedEventCount, // the connection has been initialized and the first heartbeat event has been fired
233236
TimeSpan.FromSeconds(10));
234237

238+
capturedEvents.Next().Should().BeOfType<ServerHeartbeatSucceededEvent>(); // heartbeat succeeded before connection initialized
235239
capturedEvents.Next().Should().BeOfType<ServerDescriptionChangedEvent>(); // connection initialized
236240
AssertHeartbeatAttempt();
237-
capturedEvents.Any().Should().BeFalse(); // the next attempt will be in 10 seconds because the second stremable respone has 10 seconds delay
241+
capturedEvents.Any().Should().BeFalse(); // the next attempt will be in 10 seconds because the second streamable response has 10 seconds delay
238242

239243
void AssertHeartbeatAttempt()
240244
{
@@ -248,7 +252,10 @@ void AssertHeartbeatAttempt()
248252
var serverDescriptionChangedEvent = capturedEvents.Next().Should().BeOfType<ServerDescriptionChangedEvent>().Subject;
249253
serverDescriptionChangedEvent.NewDescription.HeartbeatException.Should().Be(exception);
250254

251-
serverDescriptionChangedEvent = capturedEvents.Next().Should().BeOfType<ServerDescriptionChangedEvent>().Subject; // when we catch exceptions, we close the current connection, so opening connection will trigger one more ServerDescriptionChangedEvent
255+
// when we catch exceptions, we close the current connection,
256+
// so opening connection will trigger one more ServerHeartbeatSucceededEvent and ServerDescriptionChangedEvent
257+
capturedEvents.Next().Should().BeOfType<ServerHeartbeatSucceededEvent>();
258+
serverDescriptionChangedEvent = capturedEvents.Next().Should().BeOfType<ServerDescriptionChangedEvent>().Subject;
252259
serverDescriptionChangedEvent.OldDescription.HeartbeatException.Should().Be(exception);
253260
serverDescriptionChangedEvent.NewDescription.HeartbeatException.Should().BeNull();
254261
}

tests/MongoDB.Driver.Tests/Specifications/connection-monitoring-and-pooling/ConnectionMonitoringAndPoolingTestRunner.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,13 @@ or ConnectionCreatedEvent
702702
return EndPointHelper.Equals(serverId.EndPoint, server.EndPoint);
703703
}
704704

705+
if (o is ServerHeartbeatStartedEvent ||
706+
o is ServerHeartbeatSucceededEvent ||
707+
o is ServerHeartbeatFailedEvent)
708+
{
709+
return false;
710+
}
711+
705712
return true;
706713
};
707714

tests/MongoDB.Driver.Tests/Specifications/server-discovery-and-monitoring/ServerDiscoveryAndMonitoringProseTests.cs

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System;
1717
using System.Collections.Concurrent;
1818
using System.Diagnostics;
19+
using System.IO;
1920
using System.Linq;
2021
using System.Net;
2122
using System.Threading;
@@ -25,12 +26,14 @@
2526
using MongoDB.Driver.Core;
2627
using MongoDB.Driver.Core.Bindings;
2728
using MongoDB.Driver.Core.Clusters.ServerSelectors;
29+
using MongoDB.Driver.Core.Connections;
2830
using MongoDB.Driver.Core.Events;
2931
using MongoDB.Driver.Core.Misc;
3032
using MongoDB.Driver.Core.Servers;
3133
using MongoDB.Driver.Core.TestHelpers;
3234
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
3335
using MongoDB.Driver.TestHelpers;
36+
using Moq;
3437
using Xunit;
3538

3639
namespace MongoDB.Driver.Tests.Specifications.server_discovery_and_monitoring
@@ -41,12 +44,12 @@ public class ServerDiscoveryAndMonitoringProseTests
4144
[Fact]
4245
public void Heartbeat_should_work_as_expected()
4346
{
44-
var heartbeatSuceededTimestamps = new ConcurrentQueue<DateTime>();
47+
var heartbeatSucceededTimestamps = new ConcurrentQueue<DateTime>();
4548
var eventCapturer = new EventCapturer()
4649
.Capture<ServerHeartbeatSucceededEvent>(
4750
(@event) =>
4851
{
49-
heartbeatSuceededTimestamps.Enqueue(DateTime.UtcNow);
52+
heartbeatSucceededTimestamps.Enqueue(DateTime.UtcNow);
5053
return true;
5154
}
5255
);
@@ -64,21 +67,79 @@ public void Heartbeat_should_work_as_expected()
6467
});
6568
}
6669

67-
var heartbeatSuceededTimestampsList = heartbeatSuceededTimestamps.ToList();
70+
var heartbeatSucceededTimestampsList = heartbeatSucceededTimestamps.ToList();
6871
// we have at least 3 items here
6972
// Skip the first event because we have nothing to compare it to
70-
for (int i = 1; i < heartbeatSuceededTimestampsList.Count; i++)
73+
for (int i = 1; i < heartbeatSucceededTimestampsList.Count; i++)
7174
{
72-
var attemptDuration = heartbeatSuceededTimestampsList[i] - heartbeatSuceededTimestampsList[i - 1];
75+
var attemptDuration = heartbeatSucceededTimestampsList[i] - heartbeatSucceededTimestampsList[i - 1];
7376
attemptDuration
7477
.Should()
7578
.BeLessThan(TimeSpan.FromSeconds(2));
7679
// Assert the client processes heartbeat replies more frequently than 10 secs (approximately every 500ms)
7780
}
7881
}
7982

83+
// Heartbeat prose tests
84+
// https://github.com/mongodb/specifications/blob/c5771dce88eed54386690039f76142e1d741d83f/source/server-discovery-and-monitoring/tests/README.rst?plain=1#L268-L289
8085
[Fact]
81-
public void Monitor_sleep_at_least_minHeartbeatFreqencyMS_between_checks()
86+
public void Heartbeat_should_be_emitted_before_connection_open()
87+
{
88+
const string HelloReceivedEvent = "client hello received";
89+
const string ClientConnectedEvent = "client connected";
90+
91+
var events = new ConcurrentQueue<string>();
92+
var eventCapturer = new EventCapturer()
93+
.Capture<ServerHeartbeatStartedEvent>(e => EnqueueEvent(nameof(ServerHeartbeatStartedEvent)))
94+
.Capture<ServerHeartbeatSucceededEvent>(e => EnqueueEvent(nameof(ServerHeartbeatSucceededEvent)))
95+
.Capture<ServerHeartbeatFailedEvent>(e => EnqueueEvent(nameof(ServerHeartbeatFailedEvent)));
96+
97+
var mockStream = new Mock<Stream>();
98+
mockStream
99+
.Setup(s => s.Write(It.IsAny<byte[]>(), It.IsAny<int>(), It.IsAny<int>()))
100+
.Callback(() => EnqueueEvent(HelloReceivedEvent))
101+
.Throws(new Exception("Stream is closed."));
102+
103+
var mockStreamFactory = new Mock<IStreamFactory>();
104+
mockStreamFactory
105+
.Setup(s => s.CreateStream(It.IsAny<EndPoint>(), It.IsAny<CancellationToken>()))
106+
.Callback(() => EnqueueEvent(ClientConnectedEvent))
107+
.Returns(mockStream.Object);
108+
109+
var settings = new MongoClientSettings();
110+
settings.ServerSelectionTimeout = TimeSpan.FromMilliseconds(500);
111+
settings.HeartbeatInterval = TimeSpan.FromMilliseconds(5000);
112+
settings.ClusterConfigurator = c => c
113+
.Subscribe(eventCapturer)
114+
.RegisterStreamFactory(f => mockStreamFactory.Object);
115+
116+
using var client = DriverTestConfiguration.CreateDisposableClient(settings);
117+
118+
eventCapturer.WaitForEventOrThrowIfTimeout<ServerHeartbeatFailedEvent>(TimeSpan.FromSeconds(5));
119+
120+
events.ShouldAllBeEquivalentTo(new[]
121+
{
122+
nameof(ServerHeartbeatStartedEvent),
123+
HelloReceivedEvent,
124+
ClientConnectedEvent,
125+
nameof(ServerHeartbeatFailedEvent),
126+
});
127+
128+
bool EnqueueEvent(string @event)
129+
{
130+
// Ignore RTT events
131+
if (Environment.StackTrace.Contains(nameof(RoundTripTimeMonitor)))
132+
{
133+
return false;
134+
}
135+
136+
events.Enqueue(@event);
137+
return true;
138+
}
139+
}
140+
141+
[Fact]
142+
public void Monitor_sleep_at_least_minHeartbeatFrequencyMS_between_checks()
82143
{
83144
var minVersion = new SemanticVersion(4, 9, 0, "");
84145
RequireServer.Check().VersionGreaterThanOrEqualTo(minVersion);
@@ -232,6 +293,7 @@ public void ConnectionPool_cleared_on_failed_hello()
232293
{
233294
typeof(ServerHeartbeatFailedEvent),
234295
typeof(ConnectionPoolClearedEvent),
296+
typeof(ServerHeartbeatSucceededEvent),
235297
typeof(ConnectionPoolReadyEvent),
236298
typeof(ServerHeartbeatSucceededEvent),
237299
},

0 commit comments

Comments
 (0)