Skip to content

Commit 95f4898

Browse files
committed
CSHARP-3984: Remove BinaryConnection.DropBox
1 parent 2cb30a7 commit 95f4898

File tree

4 files changed

+25
-305
lines changed

4 files changed

+25
-305
lines changed

src/MongoDB.Driver/Core/Connections/BinaryConnection.cs

Lines changed: 24 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
using System;
1717
using System.Buffers.Binary;
18-
using System.Collections.Concurrent;
1918
using System.Diagnostics;
2019
using System.IO;
2120
using System.Net;
@@ -46,7 +45,6 @@ internal sealed class BinaryConnection : IConnection
4645
private ConnectionInitializerContext _connectionInitializerContext;
4746
private EndPoint _endPoint;
4847
private ConnectionDescription _description;
49-
private readonly Dropbox _dropbox = new Dropbox();
5048
private bool _failedEventHasBeenRaised;
5149
private DateTime _lastUsedAtUtc;
5250
private DateTime _openedAtUtc;
@@ -356,50 +354,6 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext)
356354
}
357355
}
358356

359-
private IByteBuffer ReceiveBuffer(OperationContext operationContext, int responseTo)
360-
{
361-
using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken))
362-
{
363-
var messageTask = _dropbox.GetMessageAsync(responseTo);
364-
try
365-
{
366-
Task.WaitAny(messageTask, receiveLockRequest.Task);
367-
if (messageTask.IsCompleted)
368-
{
369-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
370-
}
371-
372-
receiveLockRequest.Task.GetAwaiter().GetResult(); // propagate exceptions
373-
while (true)
374-
{
375-
try
376-
{
377-
var buffer = ReceiveBuffer(operationContext);
378-
_dropbox.AddMessage(buffer);
379-
}
380-
catch (Exception ex)
381-
{
382-
_dropbox.AddException(ex);
383-
}
384-
385-
if (messageTask.IsCompleted)
386-
{
387-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
388-
}
389-
390-
operationContext.ThrowIfTimedOutOrCanceled();
391-
}
392-
}
393-
catch
394-
{
395-
var ignored = messageTask.ContinueWith(
396-
t => { _dropbox.RemoveMessage(responseTo).Dispose(); },
397-
TaskContinuationOptions.OnlyOnRanToCompletion);
398-
throw;
399-
}
400-
}
401-
}
402-
403357
private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationContext)
404358
{
405359
try
@@ -425,50 +379,6 @@ private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationCon
425379
}
426380
}
427381

428-
private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationContext, int responseTo)
429-
{
430-
using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken))
431-
{
432-
var messageTask = _dropbox.GetMessageAsync(responseTo);
433-
try
434-
{
435-
await Task.WhenAny(messageTask, receiveLockRequest.Task).ConfigureAwait(false);
436-
if (messageTask.IsCompleted)
437-
{
438-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
439-
}
440-
441-
await receiveLockRequest.Task.ConfigureAwait(false); // propagate exceptions
442-
while (true)
443-
{
444-
try
445-
{
446-
var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false);
447-
_dropbox.AddMessage(buffer);
448-
}
449-
catch (Exception ex)
450-
{
451-
_dropbox.AddException(ex);
452-
}
453-
454-
if (messageTask.IsCompleted)
455-
{
456-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
457-
}
458-
459-
operationContext.ThrowIfTimedOutOrCanceled();
460-
}
461-
}
462-
catch
463-
{
464-
var ignored = messageTask.ContinueWith(
465-
t => { _dropbox.RemoveMessage(responseTo).Dispose(); },
466-
TaskContinuationOptions.OnlyOnRanToCompletion);
467-
throw;
468-
}
469-
}
470-
}
471-
472382
public ResponseMessage ReceiveMessage(
473383
OperationContext operationContext,
474384
int responseTo,
@@ -482,8 +392,9 @@ public ResponseMessage ReceiveMessage(
482392
try
483393
{
484394
helper.ReceivingMessage();
485-
using (var buffer = ReceiveBuffer(operationContext, responseTo))
395+
using (var buffer = ReceiveBuffer(operationContext))
486396
{
397+
EnsureResponseToIsCorrect(buffer, responseTo);
487398
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
488399
helper.ReceivedMessage(buffer, message);
489400
return message;
@@ -497,7 +408,9 @@ public ResponseMessage ReceiveMessage(
497408
}
498409
}
499410

500-
public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operationContext, int responseTo,
411+
public async Task<ResponseMessage> ReceiveMessageAsync(
412+
OperationContext operationContext,
413+
int responseTo,
501414
IMessageEncoderSelector encoderSelector,
502415
MessageEncoderSettings messageEncoderSettings)
503416
{
@@ -508,8 +421,9 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
508421
try
509422
{
510423
helper.ReceivingMessage();
511-
using (var buffer = await ReceiveBufferAsync(operationContext, responseTo).ConfigureAwait(false))
424+
using (var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false))
512425
{
426+
EnsureResponseToIsCorrect(buffer, responseTo);
513427
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
514428
helper.ReceivedMessage(buffer, message);
515429
return message;
@@ -523,6 +437,23 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
523437
}
524438
}
525439

440+
private IByteBuffer EnsureResponseToIsCorrect(IByteBuffer message, int responseTo)
441+
{
442+
var receivedResponseTo = GetResponseTo(message);
443+
if (receivedResponseTo != responseTo)
444+
{
445+
throw new InvalidOperationException($"Expected responseTo to be {responseTo} but was {receivedResponseTo}."); // should not be reached
446+
}
447+
448+
return message;
449+
450+
int GetResponseTo(IByteBuffer message)
451+
{
452+
var backingBytes = message.AccessBackingBytes(8);
453+
return BitConverter.ToInt32(backingBytes.Array, backingBytes.Offset);
454+
}
455+
}
456+
526457
private void SendBuffer(OperationContext operationContext, IByteBuffer buffer)
527458
{
528459
_sendLock.Wait(operationContext.RemainingTimeout, operationContext.CancellationToken);
@@ -770,47 +701,6 @@ private void ThrowOperationCanceledExceptionIfRequired(Exception exception)
770701
}
771702

772703
// nested classes
773-
private class Dropbox
774-
{
775-
private readonly ConcurrentDictionary<int, TaskCompletionSource<IByteBuffer>> _messages = new ConcurrentDictionary<int, TaskCompletionSource<IByteBuffer>>();
776-
777-
// public methods
778-
public void AddException(Exception exception)
779-
{
780-
foreach (var taskCompletionSource in _messages.Values)
781-
{
782-
taskCompletionSource.TrySetException(exception); // has no effect on already completed tasks
783-
}
784-
}
785-
786-
public void AddMessage(IByteBuffer message)
787-
{
788-
var responseTo = GetResponseTo(message);
789-
var tcs = _messages.GetOrAdd(responseTo, x => new TaskCompletionSource<IByteBuffer>());
790-
tcs.TrySetResult(message);
791-
}
792-
793-
public Task<IByteBuffer> GetMessageAsync(int responseTo)
794-
{
795-
var tcs = _messages.GetOrAdd(responseTo, _ => new TaskCompletionSource<IByteBuffer>());
796-
return tcs.Task;
797-
}
798-
799-
public IByteBuffer RemoveMessage(int responseTo)
800-
{
801-
TaskCompletionSource<IByteBuffer> tcs;
802-
_messages.TryRemove(responseTo, out tcs);
803-
return tcs.Task.GetAwaiter().GetResult(); // RemoveMessage is only called when Task is complete
804-
}
805-
806-
// private methods
807-
private int GetResponseTo(IByteBuffer message)
808-
{
809-
var backingBytes = message.AccessBackingBytes(8);
810-
return BinaryPrimitives.ReadInt32LittleEndian(new ReadOnlySpan<byte>(backingBytes.Array, backingBytes.Offset, 4));
811-
}
812-
}
813-
814704
private class OpenConnectionHelper
815705
{
816706
private readonly BinaryConnection _connection;

src/MongoDB.Driver/Core/Misc/SemaphoreSlimRequest.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace MongoDB.Driver.Core.Misc
2222
/// <summary>
2323
/// Represents a tentative request to acquire a SemaphoreSlim.
2424
/// </summary>
25+
[Obsolete("SemaphoreSlimRequest is deprecated and will be removed in future release")]
2526
public sealed class SemaphoreSlimRequest : IDisposable
2627
{
2728
// private fields

tests/MongoDB.Driver.Tests/Core/Connections/BinaryConnectionTests.cs

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -489,52 +489,6 @@ public async Task ReceiveMessage_should_complete_when_reply_is_not_already_on_th
489489
}
490490
}
491491

492-
[Theory]
493-
[ParameterAttributeData]
494-
public async Task ReceiveMessage_should_handle_out_of_order_replies(
495-
[Values(false, true)]
496-
bool async1,
497-
[Values(false, true)]
498-
bool async2)
499-
{
500-
using (var stream = new BlockingMemoryStream())
501-
{
502-
_mockStreamFactory.Setup(f => f.CreateStreamAsync(_endPoint, It.IsAny<CancellationToken>())).ReturnsAsync(stream);
503-
await _subject.OpenAsync(OperationContext.NoTimeout);
504-
_capturedEvents.Clear();
505-
506-
var encoderSelector = new ReplyMessageEncoderSelector<BsonDocument>(BsonDocumentSerializer.Instance);
507-
508-
var receivedTask10 = async1 ?
509-
_subject.ReceiveMessageAsync(OperationContext.NoTimeout, 10, encoderSelector, _messageEncoderSettings) :
510-
Task.Run(() => _subject.ReceiveMessage(OperationContext.NoTimeout, 10, encoderSelector, _messageEncoderSettings));
511-
512-
var receivedTask11 = async2 ?
513-
_subject.ReceiveMessageAsync(OperationContext.NoTimeout, 11, encoderSelector, _messageEncoderSettings) :
514-
Task.Run(() => _subject.ReceiveMessage(OperationContext.NoTimeout, 11, encoderSelector, _messageEncoderSettings));
515-
516-
SpinWait.SpinUntil(() => _capturedEvents.Count >= 2, TimeSpan.FromSeconds(5)).Should().BeTrue();
517-
518-
var messageToReceive10 = MessageHelper.BuildReply<BsonDocument>(new BsonDocument("_id", 10), BsonDocumentSerializer.Instance, responseTo: 10);
519-
var messageToReceive11 = MessageHelper.BuildReply<BsonDocument>(new BsonDocument("_id", 11), BsonDocumentSerializer.Instance, responseTo: 11);
520-
MessageHelper.WriteResponsesToStream(stream, messageToReceive11, messageToReceive10); // out of order
521-
522-
var received10 = await receivedTask10;
523-
var received11 = await receivedTask11;
524-
525-
var expected = MessageHelper.TranslateMessagesToBsonDocuments(new[] { messageToReceive10, messageToReceive11 });
526-
var actual = MessageHelper.TranslateMessagesToBsonDocuments(new[] { received10, received11 });
527-
528-
actual.Should().BeEquivalentTo(expected);
529-
530-
_capturedEvents.Next().Should().BeOfType<ConnectionReceivingMessageEvent>();
531-
_capturedEvents.Next().Should().BeOfType<ConnectionReceivingMessageEvent>();
532-
_capturedEvents.Next().Should().BeOfType<ConnectionReceivedMessageEvent>();
533-
_capturedEvents.Next().Should().BeOfType<ConnectionReceivedMessageEvent>();
534-
_capturedEvents.Any().Should().BeFalse();
535-
}
536-
}
537-
538492
[Fact]
539493
public async Task ReceiveMessage_should_not_produce_unobserved_task_exceptions_on_fail()
540494
{

0 commit comments

Comments
 (0)