Skip to content

Commit 05d5932

Browse files
committed
Changed ReceiveMessageAsync to return new abstract ResponseMessage to support the possibility that the server might return multiple message types in the future.
1 parent 3fb6ef1 commit 05d5932

21 files changed

+222
-103
lines changed

src/MongoDB.Driver.Core.Tests/Core/Connections/BinaryConnectionTests.cs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -147,31 +147,31 @@ public void OpenAsync_should_not_complete_the_second_call_until_the_first_is_com
147147
}
148148

149149
[Test]
150-
public void ReceiveMessageAsync_should_throw_an_ArgumentNullException_when_the_serializer_is_null()
150+
public void ReceiveMessageAsync_should_throw_an_ArgumentNullException_when_the_encoderSelector_is_null()
151151
{
152-
IBsonSerializer<int> serializer = null;
153-
Action act = () => _subject.ReceiveMessageAsync(10, serializer, _messageEncoderSettings, CancellationToken.None).Wait();
152+
IMessageEncoderSelector encoderSelector = null;
153+
Action act = () => _subject.ReceiveMessageAsync(10, encoderSelector, _messageEncoderSettings, CancellationToken.None).Wait();
154154

155155
act.ShouldThrow<ArgumentNullException>();
156156
}
157157

158158
[Test]
159159
public void ReceiveMessageAsync_should_throw_an_ObjectDisposedException_if_the_connection_is_disposed()
160160
{
161-
var serializer = Substitute.For<IBsonSerializer<BsonDocument>>();
161+
var encoderSelector = Substitute.For<IMessageEncoderSelector>();
162162
_subject.Dispose();
163163

164-
Action act = () => _subject.ReceiveMessageAsync(10, serializer, _messageEncoderSettings, CancellationToken.None).Wait();
164+
Action act = () => _subject.ReceiveMessageAsync(10, encoderSelector, _messageEncoderSettings, CancellationToken.None).Wait();
165165

166166
act.ShouldThrow<ObjectDisposedException>();
167167
}
168168

169169
[Test]
170170
public void ReceiveMessageAsync_should_throw_an_InvalidOperationException_if_the_connection_is_not_open()
171171
{
172-
var serializer = Substitute.For<IBsonSerializer<BsonDocument>>();
172+
var encoderSelector = Substitute.For<IMessageEncoderSelector>();
173173

174-
Action act = () => _subject.ReceiveMessageAsync(10, serializer, _messageEncoderSettings, CancellationToken.None).Wait();
174+
Action act = () => _subject.ReceiveMessageAsync(10, encoderSelector, _messageEncoderSettings, CancellationToken.None).Wait();
175175

176176
act.ShouldThrow<InvalidOperationException>();
177177
}
@@ -187,17 +187,18 @@ public void ReceiveMessageAsync_should_complete_when_reply_is_already_on_the_str
187187
_subject.OpenAsync(CancellationToken.None).Wait();
188188

189189
var messageToReceive = MessageHelper.BuildSuccessReply<BsonDocument>(new BsonDocument(), BsonDocumentSerializer.Instance, responseTo: 10);
190-
MessageHelper.WriteRepliesToStream(stream, new[] { messageToReceive });
190+
MessageHelper.WriteResponsesToStream(stream, new[] { messageToReceive });
191191

192-
var received = _subject.ReceiveMessageAsync(10, BsonDocumentSerializer.Instance, _messageEncoderSettings, CancellationToken.None).Result;
192+
var encoderSelector = new ReplyMessageEncoderSelector<BsonDocument>(BsonDocumentSerializer.Instance);
193+
var received = _subject.ReceiveMessageAsync(10, encoderSelector, _messageEncoderSettings, CancellationToken.None).Result;
193194

194195
var expected = MessageHelper.TranslateMessagesToBsonDocuments(new[] { messageToReceive });
195196
var actual = MessageHelper.TranslateMessagesToBsonDocuments(new[] { received });
196197

197198
actual.Should().BeEquivalentTo(expected);
198199

199200
_listener.ReceivedWithAnyArgs().ConnectionBeforeReceivingMessage(default(ConnectionBeforeReceivingMessageEvent));
200-
_listener.ReceivedWithAnyArgs().ConnectionAfterReceivingMessage<BsonDocument>(default(ConnectionAfterReceivingMessageEvent<BsonDocument>));
201+
_listener.ReceivedWithAnyArgs().ConnectionAfterReceivingMessage(default(ConnectionAfterReceivingMessageEvent));
201202
}
202203
}
203204

@@ -211,12 +212,13 @@ public void ReceiveMessageAsync_should_complete_when_reply_is_not_already_on_the
211212

212213
_subject.OpenAsync(CancellationToken.None).Wait();
213214

214-
var receivedTask = _subject.ReceiveMessageAsync(10, BsonDocumentSerializer.Instance, _messageEncoderSettings, CancellationToken.None);
215+
var encoderSelector = new ReplyMessageEncoderSelector<BsonDocument>(BsonDocumentSerializer.Instance);
216+
var receivedTask = _subject.ReceiveMessageAsync(10, encoderSelector, _messageEncoderSettings, CancellationToken.None);
215217

216218
receivedTask.IsCompleted.Should().BeFalse();
217219

218220
var messageToReceive = MessageHelper.BuildSuccessReply<BsonDocument>(new BsonDocument(), BsonDocumentSerializer.Instance, responseTo: 10);
219-
MessageHelper.WriteRepliesToStream(stream, new[] { messageToReceive });
221+
MessageHelper.WriteResponsesToStream(stream, new[] { messageToReceive });
220222

221223
var received = receivedTask.Result;
222224

@@ -237,12 +239,13 @@ public void ReceiveMessageAsync_should_handle_out_of_order_replies()
237239

238240
_subject.OpenAsync(CancellationToken.None).Wait();
239241

240-
var receivedTask11 = _subject.ReceiveMessageAsync(11, BsonDocumentSerializer.Instance, _messageEncoderSettings, CancellationToken.None);
241-
var receivedTask10 = _subject.ReceiveMessageAsync(10, BsonDocumentSerializer.Instance, _messageEncoderSettings, CancellationToken.None);
242+
var encoderSelector = new ReplyMessageEncoderSelector<BsonDocument>(BsonDocumentSerializer.Instance);
243+
var receivedTask11 = _subject.ReceiveMessageAsync(11, encoderSelector, _messageEncoderSettings, CancellationToken.None);
244+
var receivedTask10 = _subject.ReceiveMessageAsync(10, encoderSelector, _messageEncoderSettings, CancellationToken.None);
242245

243246
var messageToReceive10 = MessageHelper.BuildSuccessReply<BsonDocument>(new BsonDocument(), BsonDocumentSerializer.Instance, responseTo: 10);
244247
var messageToReceive11 = MessageHelper.BuildSuccessReply<BsonDocument>(new BsonDocument(), BsonDocumentSerializer.Instance, responseTo: 11);
245-
MessageHelper.WriteRepliesToStream(stream, new[] { messageToReceive10, messageToReceive11 });
248+
MessageHelper.WriteResponsesToStream(stream, new[] { messageToReceive10, messageToReceive11 });
246249

247250
var received11 = receivedTask11.Result;
248251
var received10 = receivedTask10.Result;
@@ -272,8 +275,9 @@ public async Task ReceiveMessageAsync_should_throw_network_exception_to_all_awai
272275

273276
await _subject.OpenAsync(CancellationToken.None);
274277

275-
var task1 = _subject.ReceiveMessageAsync<BsonDocument>(1, BsonDocumentSerializer.Instance, _messageEncoderSettings, CancellationToken.None);
276-
var task2 = _subject.ReceiveMessageAsync<BsonDocument>(2, BsonDocumentSerializer.Instance, _messageEncoderSettings, CancellationToken.None);
278+
var encoderSelector = new ReplyMessageEncoderSelector<BsonDocument>(BsonDocumentSerializer.Instance);
279+
var task1 = _subject.ReceiveMessageAsync(1, encoderSelector, _messageEncoderSettings, CancellationToken.None);
280+
var task2 = _subject.ReceiveMessageAsync(2, encoderSelector, _messageEncoderSettings, CancellationToken.None);
277281

278282
readTcs.SetException(new SocketException());
279283

@@ -310,7 +314,8 @@ public async Task ReceiveMessageAsync_should_throw_MongoConnectionClosedExceptio
310314

311315
await _subject.OpenAsync(CancellationToken.None);
312316

313-
var task1 = _subject.ReceiveMessageAsync<BsonDocument>(1, BsonDocumentSerializer.Instance, _messageEncoderSettings, CancellationToken.None);
317+
var encoderSelector = new ReplyMessageEncoderSelector<BsonDocument>(BsonDocumentSerializer.Instance);
318+
var task1 = _subject.ReceiveMessageAsync(1, encoderSelector, _messageEncoderSettings, CancellationToken.None);
314319

315320
readTcs.SetException(new SocketException());
316321

@@ -319,7 +324,7 @@ public async Task ReceiveMessageAsync_should_throw_MongoConnectionClosedExceptio
319324
.WithInnerException<SocketException>()
320325
.And.ConnectionId.Should().Be(_subject.ConnectionId);
321326

322-
var task2 = _subject.ReceiveMessageAsync<BsonDocument>(2, BsonDocumentSerializer.Instance, _messageEncoderSettings, CancellationToken.None);
327+
var task2 = _subject.ReceiveMessageAsync(2, encoderSelector, _messageEncoderSettings, CancellationToken.None);
323328

324329
Func<Task> act2 = () => task2;
325330
act2.ShouldThrow<MongoConnectionClosedException>()

src/MongoDB.Driver.Core.Tests/Core/Helpers/MessageHelper.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,14 @@ public static List<BsonDocument> TranslateMessagesToBsonDocuments(byte[] bytes)
118118
return TranslateMessagesToBsonDocuments(TranslateBytesToRequests(bytes));
119119
}
120120

121-
public static void WriteRepliesToStream(Stream stream, IEnumerable<ReplyMessage> replies)
121+
public static void WriteResponsesToStream(Stream stream, IEnumerable<ResponseMessage> responses)
122122
{
123123
var startPosition = stream.Position;
124-
foreach (var reply in replies)
124+
foreach (var response in responses)
125125
{
126126
var encoderFactory = new BinaryMessageEncoderFactory(stream, null);
127-
var encoder = reply.GetEncoder(encoderFactory);
128-
encoder.WriteMessage(reply);
127+
var encoder = response.GetEncoder(encoderFactory);
128+
encoder.WriteMessage(response);
129129
}
130130
stream.Position = startPosition;
131131
}

src/MongoDB.Driver.Core.Tests/Core/Helpers/MockConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ public Task OpenAsync(CancellationToken cancellationToken)
9191
return Task.FromResult<object>(null);
9292
}
9393

94-
public Task<ReplyMessage<TDocument>> ReceiveMessageAsync<TDocument>(int responseTo, Bson.Serialization.IBsonSerializer<TDocument> serializer, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
94+
public Task<ResponseMessage> ReceiveMessageAsync(int responseTo, IMessageEncoderSelector encoderSelector, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
9595
{
96-
return Task.FromResult<ReplyMessage<TDocument>>((ReplyMessage<TDocument>)_replyMessages.Dequeue());
96+
return Task.FromResult((ResponseMessage)_replyMessages.Dequeue());
9797
}
9898

9999
public Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)

src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -475,9 +475,9 @@ public Task OpenAsync(CancellationToken cancellationToken)
475475
return _connection.OpenAsync(cancellationToken);
476476
}
477477

478-
public Task<ReplyMessage<TDocument>> ReceiveMessageAsync<TDocument>(int responseTo, IBsonSerializer<TDocument> serializer, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
478+
public Task<ResponseMessage> ReceiveMessageAsync(int responseTo, IMessageEncoderSelector encoderSelector, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
479479
{
480-
return _connection.ReceiveMessageAsync<TDocument>(responseTo, serializer, messageEncoderSettings, cancellationToken);
480+
return _connection.ReceiveMessageAsync(responseTo, encoderSelector, messageEncoderSettings, cancellationToken);
481481
}
482482

483483
public Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
@@ -548,10 +548,10 @@ public Task OpenAsync(CancellationToken cancellationToken)
548548
return _reference.Instance.OpenAsync(cancellationToken);
549549
}
550550

551-
public Task<ReplyMessage<TDocument>> ReceiveMessageAsync<TDocument>(int responseTo, IBsonSerializer<TDocument> serializer, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
551+
public Task<ResponseMessage> ReceiveMessageAsync(int responseTo, IMessageEncoderSelector encoderSelector, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
552552
{
553553
ThrowIfDisposed();
554-
return _reference.Instance.ReceiveMessageAsync<TDocument>(responseTo, serializer, messageEncoderSettings, cancellationToken);
554+
return _reference.Instance.ReceiveMessageAsync(responseTo, encoderSelector, messageEncoderSettings, cancellationToken);
555555
}
556556

557557
public Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)

src/MongoDB.Driver.Core/Core/Connections/BinaryConnection.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -289,13 +289,13 @@ private async Task<IByteBuffer> ReceiveBufferAsync(int responseTo, CancellationT
289289
}
290290
}
291291

292-
public async Task<ReplyMessage<TDocument>> ReceiveMessageAsync<TDocument>(
292+
public async Task<ResponseMessage> ReceiveMessageAsync(
293293
int responseTo,
294-
IBsonSerializer<TDocument> serializer,
294+
IMessageEncoderSelector encoderSelector,
295295
MessageEncoderSettings messageEncoderSettings,
296296
CancellationToken cancellationToken)
297297
{
298-
Ensure.IsNotNull(serializer, "serializer");
298+
Ensure.IsNotNull(encoderSelector, "encoderSelector");
299299
ThrowIfDisposedOrNotOpen();
300300

301301
try
@@ -306,7 +306,7 @@ public async Task<ReplyMessage<TDocument>> ReceiveMessageAsync<TDocument>(
306306
}
307307

308308
var stopwatch = Stopwatch.StartNew();
309-
ReplyMessage<TDocument> reply;
309+
ResponseMessage reply;
310310
int length;
311311
using (var buffer = await ReceiveBufferAsync(responseTo, cancellationToken).ConfigureAwait(false))
312312
{
@@ -315,15 +315,15 @@ public async Task<ReplyMessage<TDocument>> ReceiveMessageAsync<TDocument>(
315315
using (var stream = new ByteBufferStream(buffer))
316316
{
317317
var encoderFactory = new BinaryMessageEncoderFactory(stream, messageEncoderSettings);
318-
var encoder = encoderFactory.GetReplyMessageEncoder<TDocument>(serializer);
319-
reply = (ReplyMessage<TDocument>)encoder.ReadMessage();
318+
var encoder = encoderSelector.GetEncoder(encoderFactory);
319+
reply = (ResponseMessage)encoder.ReadMessage();
320320
}
321321
}
322322
stopwatch.Stop();
323323

324324
if (_listener != null)
325325
{
326-
_listener.ConnectionAfterReceivingMessage<TDocument>(new ConnectionAfterReceivingMessageEvent<TDocument>(_connectionId, reply, length, stopwatch.Elapsed));
326+
_listener.ConnectionAfterReceivingMessage(new ConnectionAfterReceivingMessageEvent(_connectionId, reply, length, stopwatch.Elapsed));
327327
}
328328

329329
return reply;

src/MongoDB.Driver.Core/Core/Connections/IConnection.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,14 @@ public interface IConnection : IDisposable
8484
/// <summary>
8585
/// Receives a message.
8686
/// </summary>
87-
/// <typeparam name="TDocument">The type of the document.</typeparam>
8887
/// <param name="responseTo">The id of the sent message for which a response is to be received.</param>
89-
/// <param name="serializer">The serializer.</param>
88+
/// <param name="encoderSelector">The encoder selector.</param>
9089
/// <param name="messageEncoderSettings">The message encoder settings.</param>
9190
/// <param name="cancellationToken">The cancellation token.</param>
92-
/// <returns>A Task whose result is the reply message.</returns>
93-
Task<ReplyMessage<TDocument>> ReceiveMessageAsync<TDocument>(int responseTo, IBsonSerializer<TDocument> serializer, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken);
91+
/// <returns>
92+
/// A Task whose result is the response message.
93+
/// </returns>
94+
Task<ResponseMessage> ReceiveMessageAsync(int responseTo, IMessageEncoderSelector encoderSelector, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken);
9495

9596
/// <summary>
9697
/// Sends the messages.

src/MongoDB.Driver.Core/Core/Events/ConnectionAfterReceivingMessageEvent.cs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,24 @@ namespace MongoDB.Driver.Core.Events
2424
/// <summary>
2525
/// Represents information about a ConnectionAfterReceivingMessage event.
2626
/// </summary>
27-
/// <typeparam name="TDocument">The type of the documents.</typeparam>
28-
public struct ConnectionAfterReceivingMessageEvent<TDocument>
27+
public struct ConnectionAfterReceivingMessageEvent
2928
{
3029
private readonly ConnectionId _connectionId;
3130
private readonly TimeSpan _elapsed;
3231
private readonly int _length;
33-
private readonly ReplyMessage<TDocument> _replyMessage;
32+
private readonly ResponseMessage _receivedMessage;
3433

3534
/// <summary>
36-
/// Initializes a new instance of the <see cref="ConnectionAfterReceivingMessageEvent{TDocument}"/> struct.
35+
/// Initializes a new instance of the <see cref="ConnectionAfterReceivingMessageEvent"/> struct.
3736
/// </summary>
3837
/// <param name="connectionId">The connection identifier.</param>
39-
/// <param name="replyMessage">The reply message.</param>
38+
/// <param name="receivedMessage">The received message.</param>
4039
/// <param name="length">The length.</param>
4140
/// <param name="elapsed">The elapsed time.</param>
42-
public ConnectionAfterReceivingMessageEvent(ConnectionId connectionId, ReplyMessage<TDocument> replyMessage, int length, TimeSpan elapsed)
41+
public ConnectionAfterReceivingMessageEvent(ConnectionId connectionId, ResponseMessage receivedMessage, int length, TimeSpan elapsed)
4342
{
4443
_connectionId = connectionId;
45-
_replyMessage = replyMessage;
44+
_receivedMessage = receivedMessage;
4645
_length = length;
4746
_elapsed = elapsed;
4847
}
@@ -81,14 +80,14 @@ public int Length
8180
}
8281

8382
/// <summary>
84-
/// Gets the reply message.
83+
/// Gets the received message.
8584
/// </summary>
8685
/// <value>
87-
/// The reply message.
86+
/// The received message.
8887
/// </value>
89-
public ReplyMessage<TDocument> ReplyMessage
88+
public ResponseMessage ReceivedMessage
9089
{
91-
get { return _replyMessage; }
90+
get { return _receivedMessage; }
9291
}
9392
}
9493
}

src/MongoDB.Driver.Core/Core/Events/ConnectionListenerPair.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,10 @@ public void ConnectionBeforeReceivingMessage(ConnectionBeforeReceivingMessageEve
117117
}
118118

119119
/// <inheritdoc/>
120-
public void ConnectionAfterReceivingMessage<TDocument>(ConnectionAfterReceivingMessageEvent<TDocument> @event)
120+
public void ConnectionAfterReceivingMessage(ConnectionAfterReceivingMessageEvent @event)
121121
{
122-
_first.ConnectionAfterReceivingMessage<TDocument>(@event);
123-
_second.ConnectionAfterReceivingMessage<TDocument>(@event);
122+
_first.ConnectionAfterReceivingMessage(@event);
123+
_second.ConnectionAfterReceivingMessage(@event);
124124
}
125125

126126
/// <inheritdoc/>

src/MongoDB.Driver.Core/Core/Events/Diagnostics/LogListener.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,9 @@ public void ConnectionBeforeReceivingMessage(ConnectionBeforeReceivingMessageEve
295295
}
296296

297297
/// <inheritdoc/>
298-
public void ConnectionAfterReceivingMessage<TDocument>(ConnectionAfterReceivingMessageEvent<TDocument> @event)
298+
public void ConnectionAfterReceivingMessage(ConnectionAfterReceivingMessageEvent @event)
299299
{
300-
Log(LogLevel.Info, "{0}: received message in response to {1} of length {2} bytes in {3}ms.", Label(@event.ConnectionId), @event.ReplyMessage.ResponseTo.ToString(), @event.Length.ToString(), @event.Elapsed.TotalMilliseconds.ToString());
300+
Log(LogLevel.Info, "{0}: received message in response to {1} of length {2} bytes in {3}ms.", Label(@event.ConnectionId), @event.ReceivedMessage.ResponseTo.ToString(), @event.Length.ToString(), @event.Elapsed.TotalMilliseconds.ToString());
301301
}
302302

303303
/// <inheritdoc/>

0 commit comments

Comments
 (0)