Skip to content

Commit 339a283

Browse files
committed
CSHARP-3984: Remove BinaryConnection.DropBox
1 parent c4cc63b commit 339a283

File tree

4 files changed

+56
-408
lines changed

4 files changed

+56
-408
lines changed

src/MongoDB.Driver/Core/Connections/BinaryConnection.cs

Lines changed: 55 additions & 185 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
using System;
1717
using System.Buffers.Binary;
18-
using System.Collections.Concurrent;
1918
using System.Diagnostics;
2019
using System.IO;
2120
using System.Net;
@@ -46,15 +45,12 @@ internal sealed class BinaryConnection : IConnection
4645
private ConnectionInitializerContext _connectionInitializerContext;
4746
private EndPoint _endPoint;
4847
private ConnectionDescription _description;
49-
private readonly Dropbox _dropbox = new Dropbox();
5048
private bool _failedEventHasBeenRaised;
5149
private DateTime _lastUsedAtUtc;
5250
private DateTime _openedAtUtc;
5351
private readonly object _openLock = new object();
5452
private Task _openTask;
55-
private readonly SemaphoreSlim _receiveLock;
5653
private CompressorType? _sendCompressorType;
57-
private readonly SemaphoreSlim _sendLock;
5854
private readonly ConnectionSettings _settings;
5955
private readonly TimeSpan _socketReadTimeout;
6056
private readonly TimeSpan _socketWriteTimeout;
@@ -83,8 +79,6 @@ public BinaryConnection(
8379
Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));
8480

8581
_connectionId = new ConnectionId(serverId, settings.ConnectionIdLocalValueProvider());
86-
_receiveLock = new SemaphoreSlim(1);
87-
_sendLock = new SemaphoreSlim(1);
8882
_state = new InterlockedInt32(State.Initial);
8983

9084
_compressorSource = new CompressorSource(settings.Compressors);
@@ -179,9 +173,6 @@ private void Dispose(bool disposing)
179173
_eventLogger.LogAndPublish(new ConnectionClosingEvent(_connectionId, EventContext.OperationId));
180174

181175
var stopwatch = Stopwatch.StartNew();
182-
_receiveLock.Dispose();
183-
_sendLock.Dispose();
184-
185176
if (_stream != null)
186177
{
187178
try
@@ -374,50 +365,6 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext)
374365
}
375366
}
376367

377-
private IByteBuffer ReceiveBuffer(OperationContext operationContext, int responseTo)
378-
{
379-
using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken))
380-
{
381-
var messageTask = _dropbox.GetMessageAsync(responseTo);
382-
try
383-
{
384-
Task.WaitAny(messageTask, receiveLockRequest.Task);
385-
if (messageTask.IsCompleted)
386-
{
387-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
388-
}
389-
390-
receiveLockRequest.Task.GetAwaiter().GetResult(); // propagate exceptions
391-
while (true)
392-
{
393-
try
394-
{
395-
var buffer = ReceiveBuffer(operationContext);
396-
_dropbox.AddMessage(buffer);
397-
}
398-
catch (Exception ex)
399-
{
400-
_dropbox.AddException(ex);
401-
}
402-
403-
if (messageTask.IsCompleted)
404-
{
405-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
406-
}
407-
408-
operationContext.ThrowIfTimedOutOrCanceled();
409-
}
410-
}
411-
catch
412-
{
413-
var ignored = messageTask.ContinueWith(
414-
t => { _dropbox.RemoveMessage(responseTo).Dispose(); },
415-
TaskContinuationOptions.OnlyOnRanToCompletion);
416-
throw;
417-
}
418-
}
419-
}
420-
421368
private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationContext)
422369
{
423370
try
@@ -443,50 +390,6 @@ private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationCon
443390
}
444391
}
445392

446-
private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationContext, int responseTo)
447-
{
448-
using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken))
449-
{
450-
var messageTask = _dropbox.GetMessageAsync(responseTo);
451-
try
452-
{
453-
await Task.WhenAny(messageTask, receiveLockRequest.Task).ConfigureAwait(false);
454-
if (messageTask.IsCompleted)
455-
{
456-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
457-
}
458-
459-
await receiveLockRequest.Task.ConfigureAwait(false); // propagate exceptions
460-
while (true)
461-
{
462-
try
463-
{
464-
var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false);
465-
_dropbox.AddMessage(buffer);
466-
}
467-
catch (Exception ex)
468-
{
469-
_dropbox.AddException(ex);
470-
}
471-
472-
if (messageTask.IsCompleted)
473-
{
474-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
475-
}
476-
477-
operationContext.ThrowIfTimedOutOrCanceled();
478-
}
479-
}
480-
catch
481-
{
482-
var ignored = messageTask.ContinueWith(
483-
t => { _dropbox.RemoveMessage(responseTo).Dispose(); },
484-
TaskContinuationOptions.OnlyOnRanToCompletion);
485-
throw;
486-
}
487-
}
488-
}
489-
490393
public ResponseMessage ReceiveMessage(
491394
OperationContext operationContext,
492395
int responseTo,
@@ -500,11 +403,19 @@ public ResponseMessage ReceiveMessage(
500403
try
501404
{
502405
helper.ReceivingMessage();
503-
using (var buffer = ReceiveBuffer(operationContext, responseTo))
406+
while (true)
504407
{
505-
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
506-
helper.ReceivedMessage(buffer, message);
507-
return message;
408+
using (var buffer = ReceiveBuffer(operationContext))
409+
{
410+
if (responseTo != GetResponseTo(buffer))
411+
{
412+
continue;
413+
}
414+
415+
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
416+
helper.ReceivedMessage(buffer, message);
417+
return message;
418+
}
508419
}
509420
}
510421
catch (Exception ex)
@@ -515,7 +426,9 @@ public ResponseMessage ReceiveMessage(
515426
}
516427
}
517428

518-
public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operationContext, int responseTo,
429+
public async Task<ResponseMessage> ReceiveMessageAsync(
430+
OperationContext operationContext,
431+
int responseTo,
519432
IMessageEncoderSelector encoderSelector,
520433
MessageEncoderSettings messageEncoderSettings)
521434
{
@@ -526,11 +439,19 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
526439
try
527440
{
528441
helper.ReceivingMessage();
529-
using (var buffer = await ReceiveBufferAsync(operationContext, responseTo).ConfigureAwait(false))
442+
while (true)
530443
{
531-
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
532-
helper.ReceivedMessage(buffer, message);
533-
return message;
444+
using (var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false))
445+
{
446+
if (responseTo != GetResponseTo(buffer))
447+
{
448+
continue;
449+
}
450+
451+
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
452+
helper.ReceivedMessage(buffer, message);
453+
return message;
454+
}
534455
}
535456
}
536457
catch (Exception ex)
@@ -541,59 +462,49 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
541462
}
542463
}
543464

465+
private int GetResponseTo(IByteBuffer message)
466+
{
467+
var backingBytes = message.AccessBackingBytes(8);
468+
return BitConverter.ToInt32(backingBytes.Array, backingBytes.Offset);
469+
}
470+
544471
private void SendBuffer(OperationContext operationContext, IByteBuffer buffer)
545472
{
546-
_sendLock.Wait(operationContext.RemainingTimeout, operationContext.CancellationToken);
547-
try
473+
if (_state.Value == State.Failed)
548474
{
549-
if (_state.Value == State.Failed)
550-
{
551-
throw new MongoConnectionClosedException(_connectionId);
552-
}
475+
throw new MongoConnectionClosedException(_connectionId);
476+
}
553477

554-
try
555-
{
556-
_stream.WriteBytes(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout);
557-
_lastUsedAtUtc = DateTime.UtcNow;
558-
}
559-
catch (Exception ex)
560-
{
561-
var wrappedException = WrapExceptionIfRequired(operationContext, ex, "sending a message to the server");
562-
ConnectionFailed(wrappedException ?? ex);
563-
if (wrappedException == null) { throw; } else { throw wrappedException; }
564-
}
478+
try
479+
{
480+
_stream.WriteBytes(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout);
481+
_lastUsedAtUtc = DateTime.UtcNow;
565482
}
566-
finally
483+
catch (Exception ex)
567484
{
568-
_sendLock.Release();
485+
var wrappedException = WrapExceptionIfRequired(operationContext, ex, "sending a message to the server");
486+
ConnectionFailed(wrappedException ?? ex);
487+
if (wrappedException == null) { throw; } else { throw wrappedException; }
569488
}
570489
}
571490

572491
private async Task SendBufferAsync(OperationContext operationContext, IByteBuffer buffer)
573492
{
574-
await _sendLock.WaitAsync(operationContext.RemainingTimeout, operationContext.CancellationToken).ConfigureAwait(false);
575-
try
493+
if (_state.Value == State.Failed)
576494
{
577-
if (_state.Value == State.Failed)
578-
{
579-
throw new MongoConnectionClosedException(_connectionId);
580-
}
495+
throw new MongoConnectionClosedException(_connectionId);
496+
}
581497

582-
try
583-
{
584-
await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout).ConfigureAwait(false);
585-
_lastUsedAtUtc = DateTime.UtcNow;
586-
}
587-
catch (Exception ex)
588-
{
589-
var wrappedException = WrapExceptionIfRequired(operationContext, ex, "sending a message to the server");
590-
ConnectionFailed(wrappedException ?? ex);
591-
if (wrappedException == null) { throw; } else { throw wrappedException; }
592-
}
498+
try
499+
{
500+
await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout).ConfigureAwait(false);
501+
_lastUsedAtUtc = DateTime.UtcNow;
593502
}
594-
finally
503+
catch (Exception ex)
595504
{
596-
_sendLock.Release();
505+
var wrappedException = WrapExceptionIfRequired(operationContext, ex, "sending a message to the server");
506+
ConnectionFailed(wrappedException ?? ex);
507+
if (wrappedException == null) { throw; } else { throw wrappedException; }
597508
}
598509
}
599510

@@ -790,47 +701,6 @@ private void ThrowOperationCanceledExceptionIfRequired(Exception exception)
790701
}
791702

792703
// nested classes
793-
private class Dropbox
794-
{
795-
private readonly ConcurrentDictionary<int, TaskCompletionSource<IByteBuffer>> _messages = new ConcurrentDictionary<int, TaskCompletionSource<IByteBuffer>>();
796-
797-
// public methods
798-
public void AddException(Exception exception)
799-
{
800-
foreach (var taskCompletionSource in _messages.Values)
801-
{
802-
taskCompletionSource.TrySetException(exception); // has no effect on already completed tasks
803-
}
804-
}
805-
806-
public void AddMessage(IByteBuffer message)
807-
{
808-
var responseTo = GetResponseTo(message);
809-
var tcs = _messages.GetOrAdd(responseTo, x => new TaskCompletionSource<IByteBuffer>());
810-
tcs.TrySetResult(message);
811-
}
812-
813-
public Task<IByteBuffer> GetMessageAsync(int responseTo)
814-
{
815-
var tcs = _messages.GetOrAdd(responseTo, _ => new TaskCompletionSource<IByteBuffer>());
816-
return tcs.Task;
817-
}
818-
819-
public IByteBuffer RemoveMessage(int responseTo)
820-
{
821-
TaskCompletionSource<IByteBuffer> tcs;
822-
_messages.TryRemove(responseTo, out tcs);
823-
return tcs.Task.GetAwaiter().GetResult(); // RemoveMessage is only called when Task is complete
824-
}
825-
826-
// private methods
827-
private int GetResponseTo(IByteBuffer message)
828-
{
829-
var backingBytes = message.AccessBackingBytes(8);
830-
return BinaryPrimitives.ReadInt32LittleEndian(new ReadOnlySpan<byte>(backingBytes.Array, backingBytes.Offset, 4));
831-
}
832-
}
833-
834704
private class OpenConnectionHelper
835705
{
836706
private readonly BinaryConnection _connection;

src/MongoDB.Driver/Core/Misc/SemaphoreSlimRequest.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace MongoDB.Driver.Core.Misc
2222
/// <summary>
2323
/// Represents a tentative request to acquire a SemaphoreSlim.
2424
/// </summary>
25+
[Obsolete("SemaphoreSlimRequest is deprecated and will be removed in future release")]
2526
public sealed class SemaphoreSlimRequest : IDisposable
2627
{
2728
// private fields

0 commit comments

Comments
 (0)