Skip to content

Commit 79fcf06

Browse files
authored
CSHARP-5619: Replace IConnection.SendMessages with the method to send a single message (#1713)
1 parent 73c8d10 commit 79fcf06

26 files changed

+266
-476
lines changed

src/MongoDB.Driver/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2021-present MongoDB Inc.
1+
/* Copyright 2010-present MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -461,11 +461,11 @@ public async Task<ResponseMessage> ReceiveMessageAsync(int responseTo, IMessageE
461461
}
462462
}
463463

464-
public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
464+
public void SendMessage(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
465465
{
466466
try
467467
{
468-
_connection.SendMessages(messages, messageEncoderSettings, cancellationToken);
468+
_connection.SendMessage(message, messageEncoderSettings, cancellationToken);
469469
}
470470
catch (MongoConnectionException ex)
471471
{
@@ -474,11 +474,11 @@ public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSet
474474
}
475475
}
476476

477-
public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
477+
public async Task SendMessageAsync(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
478478
{
479479
try
480480
{
481-
await _connection.SendMessagesAsync(messages, messageEncoderSettings, cancellationToken).ConfigureAwait(false);
481+
await _connection.SendMessageAsync(message, messageEncoderSettings, cancellationToken).ConfigureAwait(false);
482482
}
483483
catch (MongoConnectionException ex)
484484
{
@@ -623,16 +623,16 @@ public ResponseMessage ReceiveMessage(int responseTo, IMessageEncoderSelector en
623623
return _reference.Instance.ReceiveMessage(responseTo, encoderSelector, messageEncoderSettings, cancellationToken);
624624
}
625625

626-
public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
626+
public void SendMessage(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
627627
{
628628
ThrowIfDisposed();
629-
_reference.Instance.SendMessages(messages, messageEncoderSettings, cancellationToken);
629+
_reference.Instance.SendMessage(message, messageEncoderSettings, cancellationToken);
630630
}
631631

632-
public Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
632+
public Task SendMessageAsync(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
633633
{
634634
ThrowIfDisposed();
635-
return _reference.Instance.SendMessagesAsync(messages, messageEncoderSettings, cancellationToken);
635+
return _reference.Instance.SendMessageAsync(message, messageEncoderSettings, cancellationToken);
636636
}
637637

638638
public void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason)

src/MongoDB.Driver/Core/Connections/BinaryConnection.cs

Lines changed: 63 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2013-present MongoDB Inc.
1+
/* Copyright 2010-present MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -16,10 +16,8 @@
1616
using System;
1717
using System.Buffers.Binary;
1818
using System.Collections.Concurrent;
19-
using System.Collections.Generic;
2019
using System.Diagnostics;
2120
using System.IO;
22-
using System.Linq;
2321
using System.Net;
2422
using System.Threading;
2523
using System.Threading.Tasks;
@@ -582,22 +580,22 @@ private async Task SendBufferAsync(IByteBuffer buffer, CancellationToken cancell
582580
}
583581
}
584582

585-
public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
583+
public void SendMessage(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
586584
{
587-
Ensure.IsNotNull(messages, nameof(messages));
585+
Ensure.IsNotNull(message, nameof(message));
588586
ThrowIfCancelledOrDisposedOrNotOpen(cancellationToken);
589587

590-
var helper = new SendMessagesHelper(this, messages, messageEncoderSettings);
588+
var helper = new SendMessageHelper(this, message, messageEncoderSettings);
591589
try
592590
{
593-
helper.EncodingMessages();
594-
using (var uncompressedBuffer = helper.EncodeMessages(cancellationToken, out var sentMessages))
591+
helper.EncodingMessage();
592+
using (var uncompressedBuffer = helper.EncodeMessage(cancellationToken, out var sentMessage))
595593
{
596-
helper.SendingMessages(uncompressedBuffer);
594+
helper.SendingMessage(uncompressedBuffer);
597595
int sentLength;
598-
if (AnyMessageNeedsToBeCompressed(sentMessages))
596+
if (ShouldBeCompressed(sentMessage))
599597
{
600-
using (var compressedBuffer = CompressMessages(sentMessages, uncompressedBuffer, messageEncoderSettings))
598+
using (var compressedBuffer = CompressMessage(sentMessage, uncompressedBuffer, messageEncoderSettings))
601599
{
602600
SendBuffer(compressedBuffer, cancellationToken);
603601
sentLength = compressedBuffer.Length;
@@ -608,33 +606,33 @@ public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSet
608606
SendBuffer(uncompressedBuffer, cancellationToken);
609607
sentLength = uncompressedBuffer.Length;
610608
}
611-
helper.SentMessages(sentLength);
609+
helper.SentMessage(sentLength);
612610
}
613611
}
614612
catch (Exception ex)
615613
{
616-
helper.FailedSendingMessages(ex);
614+
helper.FailedSendingMessage(ex);
617615
ThrowOperationCanceledExceptionIfRequired(ex);
618616
throw;
619617
}
620618
}
621619

622-
public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
620+
public async Task SendMessageAsync(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
623621
{
624-
Ensure.IsNotNull(messages, nameof(messages));
622+
Ensure.IsNotNull(message, nameof(message));
625623
ThrowIfCancelledOrDisposedOrNotOpen(cancellationToken);
626624

627-
var helper = new SendMessagesHelper(this, messages, messageEncoderSettings);
625+
var helper = new SendMessageHelper(this, message, messageEncoderSettings);
628626
try
629627
{
630-
helper.EncodingMessages();
631-
using (var uncompressedBuffer = helper.EncodeMessages(cancellationToken, out var sentMessages))
628+
helper.EncodingMessage();
629+
using (var uncompressedBuffer = helper.EncodeMessage(cancellationToken, out var sentMessage))
632630
{
633-
helper.SendingMessages(uncompressedBuffer);
631+
helper.SendingMessage(uncompressedBuffer);
634632
int sentLength;
635-
if (AnyMessageNeedsToBeCompressed(sentMessages))
633+
if (ShouldBeCompressed(sentMessage))
636634
{
637-
using (var compressedBuffer = CompressMessages(sentMessages, uncompressedBuffer, messageEncoderSettings))
635+
using (var compressedBuffer = CompressMessage(sentMessage, uncompressedBuffer, messageEncoderSettings))
638636
{
639637
await SendBufferAsync(compressedBuffer, cancellationToken).ConfigureAwait(false);
640638
sentLength = compressedBuffer.Length;
@@ -645,12 +643,12 @@ public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, Messag
645643
await SendBufferAsync(uncompressedBuffer, cancellationToken).ConfigureAwait(false);
646644
sentLength = uncompressedBuffer.Length;
647645
}
648-
helper.SentMessages(sentLength);
646+
helper.SentMessage(sentLength);
649647
}
650648
}
651649
catch (Exception ex)
652650
{
653-
helper.FailedSendingMessages(ex);
651+
helper.FailedSendingMessage(ex);
654652
ThrowOperationCanceledExceptionIfRequired(ex);
655653
throw;
656654
}
@@ -663,9 +661,9 @@ public void SetReadTimeout(TimeSpan timeout)
663661
}
664662

665663
// private methods
666-
private bool AnyMessageNeedsToBeCompressed(IEnumerable<RequestMessage> messages)
664+
private bool ShouldBeCompressed(RequestMessage message)
667665
{
668-
return _sendCompressorType.HasValue && messages.Any(m => m.MayBeCompressed);
666+
return _sendCompressorType.HasValue && message.MayBeCompressed;
669667
}
670668

671669
private CompressorType? ChooseSendCompressorTypeIfAny(ConnectionDescription connectionDescription)
@@ -674,8 +672,8 @@ private bool AnyMessageNeedsToBeCompressed(IEnumerable<RequestMessage> messages)
674672
return availableCompressors.Count > 0 ? (CompressorType?)availableCompressors[0] : null;
675673
}
676674

677-
private IByteBuffer CompressMessages(
678-
IEnumerable<RequestMessage> messages,
675+
private IByteBuffer CompressMessage(
676+
RequestMessage message,
679677
IByteBuffer uncompressedBuffer,
680678
MessageEncoderSettings messageEncoderSettings)
681679
{
@@ -685,24 +683,22 @@ private IByteBuffer CompressMessages(
685683
using (var uncompressedStream = new ByteBufferStream(uncompressedBuffer, ownsBuffer: false))
686684
using (var compressedStream = new ByteBufferStream(compressedBuffer, ownsBuffer: false))
687685
{
688-
foreach (var message in messages)
689-
{
690-
var uncompressedMessageLength = uncompressedStream.ReadInt32();
691-
uncompressedStream.Position -= 4;
686+
var uncompressedMessageLength = uncompressedStream.ReadInt32();
687+
uncompressedStream.Position -= 4;
692688

693-
using (var uncompressedMessageSlice = uncompressedBuffer.GetSlice((int)uncompressedStream.Position, uncompressedMessageLength))
694-
using (var uncompressedMessageStream = new ByteBufferStream(uncompressedMessageSlice, ownsBuffer: false))
689+
using (var uncompressedMessageSlice = uncompressedBuffer.GetSlice((int)uncompressedStream.Position, uncompressedMessageLength))
690+
using (var uncompressedMessageStream = new ByteBufferStream(uncompressedMessageSlice, ownsBuffer: false))
691+
{
692+
if (message.MayBeCompressed)
695693
{
696-
if (message.MayBeCompressed)
697-
{
698-
CompressMessage(message, uncompressedMessageStream, compressedStream, messageEncoderSettings);
699-
}
700-
else
701-
{
702-
uncompressedMessageStream.EfficientCopyTo(compressedStream);
703-
}
694+
CompressMessage(message, uncompressedMessageStream, compressedStream, messageEncoderSettings);
695+
}
696+
else
697+
{
698+
uncompressedMessageStream.EfficientCopyTo(compressedStream);
704699
}
705700
}
701+
706702
compressedBuffer.Length = (int)compressedStream.Length;
707703
}
708704

@@ -978,29 +974,27 @@ private Opcode PeekOpcode(BsonStream stream)
978974
}
979975
}
980976

981-
private class SendMessagesHelper
977+
private class SendMessageHelper
982978
{
983979
private readonly Stopwatch _commandStopwatch;
984980
private readonly BinaryConnection _connection;
985981
private readonly MessageEncoderSettings _messageEncoderSettings;
986-
private readonly List<RequestMessage> _messages;
987-
private Lazy<List<int>> _requestIds;
982+
private readonly RequestMessage _message;
988983
private TimeSpan _serializationDuration;
989984
private Stopwatch _networkStopwatch;
990985

991-
public SendMessagesHelper(BinaryConnection connection, IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings)
986+
public SendMessageHelper(BinaryConnection connection, RequestMessage message, MessageEncoderSettings messageEncoderSettings)
992987
{
993988
_connection = connection;
994-
_messages = messages.ToList();
989+
_message = message;
995990
_messageEncoderSettings = messageEncoderSettings;
996991

997992
_commandStopwatch = Stopwatch.StartNew();
998-
_requestIds = new Lazy<List<int>>(() => _messages.Select(m => m.RequestId).ToList());
999993
}
1000994

1001-
public IByteBuffer EncodeMessages(CancellationToken cancellationToken, out List<RequestMessage> sentMessages)
995+
public IByteBuffer EncodeMessage(CancellationToken cancellationToken, out RequestMessage sentMessage)
1002996
{
1003-
sentMessages = new List<RequestMessage>();
997+
sentMessage = null;
1004998
cancellationToken.ThrowIfCancellationRequested();
1005999

10061000
var serializationStopwatch = Stopwatch.StartNew();
@@ -1009,21 +1003,17 @@ public IByteBuffer EncodeMessages(CancellationToken cancellationToken, out List<
10091003
using (var stream = new ByteBufferStream(buffer, ownsBuffer: false))
10101004
{
10111005
var encoderFactory = new BinaryMessageEncoderFactory(stream, _messageEncoderSettings, compressorSource: null);
1012-
foreach (var message in _messages)
1013-
{
1014-
if (message.ShouldBeSent == null || message.ShouldBeSent())
1015-
{
1016-
var encoder = message.GetEncoder(encoderFactory);
1017-
encoder.WriteMessage(message);
1018-
message.WasSent = true;
1019-
sentMessages.Add(message);
1020-
}
10211006

1022-
// Encoding messages includes serializing the
1023-
// documents, so encoding message could be expensive
1024-
// and worthy of us honoring cancellation here.
1025-
cancellationToken.ThrowIfCancellationRequested();
1026-
}
1007+
var encoder = _message.GetEncoder(encoderFactory);
1008+
encoder.WriteMessage(_message);
1009+
_message.WasSent = true;
1010+
sentMessage = _message;
1011+
1012+
// Encoding messages includes serializing the
1013+
// documents, so encoding message could be expensive
1014+
// and worthy of us honoring cancellation here.
1015+
cancellationToken.ThrowIfCancellationRequested();
1016+
10271017
buffer.Length = (int)stream.Length;
10281018
buffer.MakeReadOnly();
10291019
}
@@ -1033,42 +1023,42 @@ public IByteBuffer EncodeMessages(CancellationToken cancellationToken, out List<
10331023
return buffer;
10341024
}
10351025

1036-
public void EncodingMessages()
1026+
public void EncodingMessage()
10371027
{
1038-
_connection._eventLogger.LogAndPublish(new ConnectionSendingMessagesEvent(_connection.ConnectionId, _requestIds.Value, EventContext.OperationId));
1028+
_connection._eventLogger.LogAndPublish(new ConnectionSendingMessagesEvent(_connection.ConnectionId, _message.RequestId, EventContext.OperationId));
10391029
}
10401030

1041-
public void FailedSendingMessages(Exception ex)
1031+
public void FailedSendingMessage(Exception ex)
10421032
{
10431033
if (_connection._commandEventHelper.ShouldCallErrorSending)
10441034
{
1045-
_connection._commandEventHelper.ErrorSending(_messages, _connection._connectionId, _connection._description?.ServiceId, ex, _connection.IsInitializing);
1035+
_connection._commandEventHelper.ErrorSending(_message, _connection._connectionId, _connection._description?.ServiceId, ex, _connection.IsInitializing);
10461036
}
10471037

1048-
_connection._eventLogger.LogAndPublish(new ConnectionSendingMessagesFailedEvent(_connection.ConnectionId, _requestIds.Value, ex, EventContext.OperationId));
1038+
_connection._eventLogger.LogAndPublish(new ConnectionSendingMessagesFailedEvent(_connection.ConnectionId, _message.RequestId, ex, EventContext.OperationId));
10491039
}
10501040

1051-
public void SendingMessages(IByteBuffer buffer)
1041+
public void SendingMessage(IByteBuffer buffer)
10521042
{
10531043
if (_connection._commandEventHelper.ShouldCallBeforeSending)
10541044
{
1055-
_connection._commandEventHelper.BeforeSending(_messages, _connection.ConnectionId, _connection.Description?.ServiceId, buffer, _messageEncoderSettings, _commandStopwatch, _connection.IsInitializing);
1045+
_connection._commandEventHelper.BeforeSending(_message, _connection.ConnectionId, _connection.Description?.ServiceId, buffer, _messageEncoderSettings, _commandStopwatch, _connection.IsInitializing);
10561046
}
10571047

10581048
_networkStopwatch = Stopwatch.StartNew();
10591049
}
10601050

1061-
public void SentMessages(int bufferLength)
1051+
public void SentMessage(int bufferLength)
10621052
{
10631053
_networkStopwatch.Stop();
10641054
var networkDuration = _networkStopwatch.Elapsed;
10651055

10661056
if (_connection._commandEventHelper.ShouldCallAfterSending)
10671057
{
1068-
_connection._commandEventHelper.AfterSending(_messages, _connection._connectionId, _connection.Description?.ServiceId, _connection.IsInitializing);
1058+
_connection._commandEventHelper.AfterSending(_message, _connection._connectionId, _connection.Description?.ServiceId, _connection.IsInitializing);
10691059
}
10701060

1071-
_connection._eventLogger.LogAndPublish(new ConnectionSentMessagesEvent(_connection.ConnectionId, _requestIds.Value, bufferLength, networkDuration, _serializationDuration, EventContext.OperationId));
1061+
_connection._eventLogger.LogAndPublish(new ConnectionSentMessagesEvent(_connection.ConnectionId, _message.RequestId, bufferLength, networkDuration, _serializationDuration, EventContext.OperationId));
10721062
}
10731063
}
10741064

0 commit comments

Comments
 (0)