Skip to content

Commit 862302a

Browse files
committed
CSHARP-2205: OP_MSG type 1 payload support
1 parent 7b826e7 commit 862302a

File tree

43 files changed

+1250
-305
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1250
-305
lines changed

src/MongoDB.Bson/Serialization/Serializers/ElementAppendingSerializer.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,25 @@ public class ElementAppendingSerializer<TDocument> : IBsonSerializer<TDocument>
3030
// private fields
3131
private readonly IBsonSerializer<TDocument> _documentSerializer;
3232
private readonly List<BsonElement> _elements;
33+
private readonly Action<BsonWriterSettings> _writerSettingsConfigurator;
3334

3435
// constructors
3536
/// <summary>
3637
/// Initializes a new instance of the <see cref="ElementAppendingSerializer{TDocument}" /> class.
3738
/// </summary>
3839
/// <param name="documentSerializer">The document serializer.</param>
3940
/// <param name="elements">The elements to append.</param>
40-
public ElementAppendingSerializer(IBsonSerializer<TDocument> documentSerializer, IEnumerable<BsonElement> elements)
41+
/// <param name="writerSettingsConfigurator">The writer settings configurator.</param>
42+
public ElementAppendingSerializer(
43+
IBsonSerializer<TDocument> documentSerializer,
44+
IEnumerable<BsonElement> elements,
45+
Action<BsonWriterSettings> writerSettingsConfigurator = null)
4146
{
4247
if (documentSerializer == null) { throw new ArgumentNullException(nameof(documentSerializer)); }
4348
if (elements == null) { throw new ArgumentNullException(nameof(elements)); }
4449
_documentSerializer = documentSerializer;
4550
_elements = elements.ToList();
51+
_writerSettingsConfigurator = writerSettingsConfigurator; // can be null
4652
}
4753

4854
// public properties
@@ -65,8 +71,7 @@ object IBsonSerializer.Deserialize(BsonDeserializationContext context, BsonDeser
6571
public void Serialize(BsonSerializationContext context, BsonSerializationArgs args, TDocument value)
6672
{
6773
var writer = context.Writer;
68-
Action<BsonWriterSettings> settingsConfigurator = s => s.GuidRepresentation = GuidRepresentation.Unspecified;
69-
var elementAppendingWriter = new ElementAppendingBsonWriter(writer, _elements, settingsConfigurator);
74+
var elementAppendingWriter = new ElementAppendingBsonWriter(writer, _elements, _writerSettingsConfigurator);
7075
var elementAppendingContext = BsonSerializationContext.CreateRoot(elementAppendingWriter, builder => ConfigureElementAppendingContext(builder, context));
7176
_documentSerializer.Serialize(elementAppendingContext, args, value);
7277
}

src/MongoDB.Driver.Core/Core/Bindings/IChannel.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
using MongoDB.Driver.Core.Connections;
2424
using MongoDB.Driver.Core.Misc;
2525
using MongoDB.Driver.Core.WireProtocol;
26+
using MongoDB.Driver.Core.WireProtocol.Messages;
2627
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
2728

2829
namespace MongoDB.Driver.Core.Bindings
@@ -104,8 +105,10 @@ TResult Command<TResult>(
104105
/// <param name="readPreference">The read preference.</param>
105106
/// <param name="databaseNamespace">The database namespace.</param>
106107
/// <param name="command">The command.</param>
108+
/// <param name="commandPayloads">The command payloads.</param>
107109
/// <param name="commandValidator">The command validator.</param>
108110
/// <param name="additionalOptions">The additional options.</param>
111+
/// <param name="postWriteAction">The post write action.</param>
109112
/// <param name="responseHandling">The response handling.</param>
110113
/// <param name="resultSerializer">The result serializer.</param>
111114
/// <param name="messageEncoderSettings">The message encoder settings.</param>
@@ -118,9 +121,11 @@ TResult Command<TResult>(
118121
ReadPreference readPreference,
119122
DatabaseNamespace databaseNamespace,
120123
BsonDocument command,
124+
IEnumerable<Type1CommandMessageSection> commandPayloads,
121125
IElementNameValidator commandValidator,
122126
BsonDocument additionalOptions,
123-
Func<CommandResponseHandling> responseHandling,
127+
Action<IMessageEncoderPostProcessor> postWriteAction,
128+
CommandResponseHandling responseHandling,
124129
IBsonSerializer<TResult> resultSerializer,
125130
MessageEncoderSettings messageEncoderSettings,
126131
CancellationToken cancellationToken);
@@ -189,8 +194,10 @@ Task<TResult> CommandAsync<TResult>(
189194
/// <param name="readPreference">The read preference.</param>
190195
/// <param name="databaseNamespace">The database namespace.</param>
191196
/// <param name="command">The command.</param>
197+
/// <param name="commandPayloads">The command payloads.</param>
192198
/// <param name="commandValidator">The command validator.</param>
193199
/// <param name="additionalOptions">The additional options.</param>
200+
/// <param name="postWriteAction">The post write action.</param>
194201
/// <param name="responseHandling">The response handling.</param>
195202
/// <param name="resultSerializer">The result serializer.</param>
196203
/// <param name="messageEncoderSettings">The message encoder settings.</param>
@@ -203,9 +210,11 @@ Task<TResult> CommandAsync<TResult>(
203210
ReadPreference readPreference,
204211
DatabaseNamespace databaseNamespace,
205212
BsonDocument command,
213+
IEnumerable<Type1CommandMessageSection> commandPayloads,
206214
IElementNameValidator commandValidator,
207215
BsonDocument additionalOptions,
208-
Func<CommandResponseHandling> responseHandling,
216+
Action<IMessageEncoderPostProcessor> postWriteAction,
217+
CommandResponseHandling responseHandling,
209218
IBsonSerializer<TResult> resultSerializer,
210219
MessageEncoderSettings messageEncoderSettings,
211220
CancellationToken cancellationToken);

src/MongoDB.Driver.Core/Core/Connections/CommandEventHelper.cs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,17 @@ private void ProcessCommandRequestMessage(CommandRequestMessage originalMessage,
268268
{
269269
var type0Section = decodedMessage.Sections.OfType<Type0CommandMessageSection>().Single();
270270
var command = (BsonDocument)type0Section.Document;
271+
var type1Sections = decodedMessage.Sections.OfType<Type1CommandMessageSection>().ToList();
272+
if (type1Sections.Count > 0)
273+
{
274+
command = new BsonDocument(command); // materialize the RawBsonDocument
275+
foreach (var type1Section in type1Sections)
276+
{
277+
var name = type1Section.Identifier;
278+
var items = new BsonArray(type1Section.Documents.GetBatchItems().Cast<RawBsonDocument>());
279+
command[name] = items;
280+
}
281+
}
271282
var commandName = command.GetElement(0).Name;
272283
var databaseName = command["$db"].AsString;
273284
var databaseNamespace = new DatabaseNamespace(databaseName);
@@ -298,7 +309,7 @@ private void ProcessCommandRequestMessage(CommandRequestMessage originalMessage,
298309
OperationId = operationId,
299310
Stopwatch = stopwatch,
300311
QueryNamespace = new CollectionNamespace(databaseNamespace, "$cmd"),
301-
ExpectedResponseType = ExpectedResponseType.Command
312+
ExpectedResponseType = decodedMessage.MoreToCome ? ExpectedResponseType.None : ExpectedResponseType.Command
302313
});
303314
}
304315
}
@@ -313,7 +324,7 @@ private void ProcessCommandResponseMessage(CommandState state, CommandResponseMe
313324
BsonValue ok;
314325
if (!reply.TryGetValue("ok", out ok))
315326
{
316-
// this is a degenerate case with the server and
327+
// this is a degenerate case with the server and
317328
// we don't really know what to do here...
318329
return;
319330
}
@@ -480,7 +491,7 @@ private void ProcessInsertMessage(RequestMessage message, Queue<RequestMessage>
480491

481492
if (_startedEvent != null)
482493
{
483-
// InsertMessage is generic, and we don't know the generic type...
494+
// InsertMessage is generic, and we don't know the generic type...
484495
// Plus, for this we really want BsonDocuments, not whatever the generic type is.
485496
var decodedMessage = encoder.ReadMessage();
486497

@@ -725,7 +736,7 @@ private void ProcessCommandReplyMessage(CommandState state, ReplyMessage<RawBson
725736
BsonValue ok;
726737
if (!reply.TryGetValue("ok", out ok))
727738
{
728-
// this is a degenerate case with the server and
739+
// this is a degenerate case with the server and
729740
// we don't really know what to do here...
730741
return;
731742
}
@@ -770,7 +781,7 @@ private void ProcessGLEReplyMessage(CommandState state, ReplyMessage<RawBsonDocu
770781
BsonValue ok;
771782
if (!reply.TryGetValue("ok", out ok))
772783
{
773-
// this is a degenerate case with the server and
784+
// this is a degenerate case with the server and
774785
// we don't really know what to do here...
775786
}
776787
else if (!ok.ToBoolean())
@@ -1073,7 +1084,7 @@ private bool TryGetWriteConcernFromGLE(Queue<RequestMessage> messageQueue, out i
10731084
return false;
10741085
}
10751086

1076-
messageQueue.Dequeue(); // consume it so that we don't process it later...
1087+
messageQueue.Dequeue(); // consume it so that we don't process it later...
10771088
requestId = queryMessage.RequestId;
10781089

10791090
writeConcern = WriteConcern.FromBsonDocument(query);

src/MongoDB.Driver.Core/Core/Operations/AsyncCursor.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,11 @@ private CursorBatch<TDocument> ExecuteGetMoreCommand(IChannelHandle channel, Can
167167
null, // readPreference
168168
_collectionNamespace.DatabaseNamespace,
169169
command,
170+
null, // commandPayloads
170171
NoOpElementNameValidator.Instance,
171172
null, // additionalOptions
172-
() => CommandResponseHandling.Return,
173+
null, // postWriteAction
174+
CommandResponseHandling.Return,
173175
__getMoreCommandResultSerializer,
174176
_messageEncoderSettings,
175177
cancellationToken);
@@ -185,9 +187,11 @@ private async Task<CursorBatch<TDocument>> ExecuteGetMoreCommandAsync(IChannelHa
185187
null, // readPreference
186188
_collectionNamespace.DatabaseNamespace,
187189
command,
190+
null, // commandPayloads
188191
NoOpElementNameValidator.Instance,
189192
null, // additionalOptions
190-
() => CommandResponseHandling.Return,
193+
null, // postWriteAction
194+
CommandResponseHandling.Return,
191195
__getMoreCommandResultSerializer,
192196
_messageEncoderSettings,
193197
cancellationToken).ConfigureAwait(false);

src/MongoDB.Driver.Core/Core/Operations/BulkDeleteOperation.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,12 @@ public BulkDeleteOperation(
3434
// methods
3535
protected override IRetryableWriteOperation<BsonDocument> CreateBatchOperation(Batch batch)
3636
{
37-
Func<WriteConcern> writeConcernFunc = null;
38-
if (!WriteConcern.IsServerDefault)
39-
{
40-
writeConcernFunc = () => GetBatchWriteConcern(batch);
41-
}
42-
4337
return new RetryableDeleteCommandOperation(CollectionNamespace, batch.Requests, MessageEncoderSettings)
4438
{
4539
IsOrdered = IsOrdered,
4640
MaxBatchCount = MaxBatchCount,
4741
RetryRequested = RetryRequested,
48-
WriteConcernFunc = writeConcernFunc
42+
WriteConcern = WriteConcern
4943
};
5044
}
5145

src/MongoDB.Driver.Core/Core/Operations/BulkInsertOperation.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,13 @@ public BulkInsertOperation(
3636
// methods
3737
protected override IRetryableWriteOperation<BsonDocument> CreateBatchOperation(Batch batch)
3838
{
39-
Func<WriteConcern> writeConcernFunc = null;
40-
if (!WriteConcern.IsServerDefault)
41-
{
42-
writeConcernFunc = () => GetBatchWriteConcern(batch);
43-
}
44-
4539
return new RetryableInsertCommandOperation<InsertRequest>(CollectionNamespace, batch.Requests, InsertRequestSerializer.Instance, MessageEncoderSettings)
4640
{
4741
BypassDocumentValidation = BypassDocumentValidation,
4842
IsOrdered = IsOrdered,
4943
MaxBatchCount = MaxBatchCount,
5044
RetryRequested = RetryRequested,
51-
WriteConcernFunc = writeConcernFunc
45+
WriteConcern = WriteConcern
5246
};
5347
}
5448

src/MongoDB.Driver.Core/Core/Operations/BulkUnmixedWriteOperationBase.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,16 +166,6 @@ public async Task<BulkWriteOperationResult> ExecuteAsync(IWriteBinding binding,
166166

167167
protected abstract IExecutableInRetryableWriteContext<BulkWriteOperationResult> CreateEmulator();
168168

169-
protected WriteConcern GetBatchWriteConcern(Batch batch)
170-
{
171-
var writeConcern = _writeConcern;
172-
if (!writeConcern.IsAcknowledged && _isOrdered && !batch.Requests.AllItemsWereProcessed)
173-
{
174-
writeConcern = WriteConcern.W1;
175-
}
176-
return writeConcern;
177-
}
178-
179169
protected abstract bool RequestHasCollation(TWriteRequest request);
180170

181171
// private methods

src/MongoDB.Driver.Core/Core/Operations/BulkUpdateOperation.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,13 @@ public BulkUpdateOperation(
3434
// methods
3535
protected override IRetryableWriteOperation<BsonDocument> CreateBatchOperation(Batch batch)
3636
{
37-
Func<WriteConcern> writeConcernFunc = null;
38-
if (!WriteConcern.IsServerDefault)
39-
{
40-
writeConcernFunc = () => GetBatchWriteConcern(batch);
41-
}
42-
4337
return new RetryableUpdateCommandOperation(CollectionNamespace, batch.Requests, MessageEncoderSettings)
4438
{
4539
BypassDocumentValidation = BypassDocumentValidation,
4640
IsOrdered = IsOrdered,
4741
MaxBatchCount = MaxBatchCount,
4842
RetryRequested = RetryRequested,
49-
WriteConcernFunc = writeConcernFunc
43+
WriteConcern = WriteConcern
5044
};
5145
}
5246

src/MongoDB.Driver.Core/Core/Operations/BulkWriteBatchResult.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public static BulkWriteBatchResult Create(
4545
var unprocessedRequests = CreateUnprocessedRequests(requests, writeErrors, isOrdered);
4646
var upserts = CreateUpserts(writeCommandResponse);
4747

48-
var n = writeCommandResponse.GetValue("n", 0).ToInt64();
48+
var n = writeCommandResponse == null ? 0 : writeCommandResponse.GetValue("n", 0).ToInt64();
4949
var matchedCount = 0L;
5050
var deletedCount = 0L;
5151
var insertedCount = 0L;
@@ -64,7 +64,7 @@ public static BulkWriteBatchResult Create(
6464
case WriteRequestType.Update:
6565
matchedCount = n - upserts.Count();
6666
BsonValue nModified;
67-
if (writeCommandResponse.TryGetValue("nModified", out nModified))
67+
if (writeCommandResponse != null && writeCommandResponse.TryGetValue("nModified", out nModified))
6868
{
6969
modifiedCount = nModified.ToInt64();
7070
}
@@ -248,7 +248,7 @@ private static IReadOnlyList<BulkWriteOperationUpsert> CreateUpserts(BsonDocumen
248248
{
249249
var upserts = new List<BulkWriteOperationUpsert>();
250250

251-
if (writeCommandResponse.Contains("upserted"))
251+
if (writeCommandResponse != null && writeCommandResponse.Contains("upserted"))
252252
{
253253
foreach (BsonDocument value in writeCommandResponse["upserted"].AsBsonArray)
254254
{
@@ -264,7 +264,7 @@ private static IReadOnlyList<BulkWriteOperationUpsert> CreateUpserts(BsonDocumen
264264

265265
private static BulkWriteConcernError CreateWriteConcernError(BsonDocument writeCommandResponse)
266266
{
267-
if (writeCommandResponse.Contains("writeConcernError"))
267+
if (writeCommandResponse != null && writeCommandResponse.Contains("writeConcernError"))
268268
{
269269
var value = (BsonDocument)writeCommandResponse["writeConcernError"];
270270
var code = value["code"].ToInt32();
@@ -311,7 +311,7 @@ private static IReadOnlyList<BulkWriteOperationError> CreateWriteErrors(BsonDocu
311311
{
312312
var writeErrors = new List<BulkWriteOperationError>();
313313

314-
if (writeCommandResponse.Contains("writeErrors"))
314+
if (writeCommandResponse != null && writeCommandResponse.Contains("writeErrors"))
315315
{
316316
foreach (BsonDocument value in writeCommandResponse["writeErrors"].AsBsonArray)
317317
{

src/MongoDB.Driver.Core/Core/Operations/CommandOperationBase.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,11 @@ private TCommandResult ExecuteProtocol(IChannelHandle channel, ICoreSessionHandl
153153
readPreference,
154154
_databaseNamespace,
155155
_command,
156+
null, // commandPayloads
156157
_commandValidator,
157158
additionalOptions,
158-
() => CommandResponseHandling.Return,
159+
null, // postWriteAction,
160+
CommandResponseHandling.Return,
159161
_resultSerializer,
160162
_messageEncoderSettings,
161163
cancellationToken);
@@ -192,9 +194,11 @@ private Task<TCommandResult> ExecuteProtocolAsync(IChannelHandle channel, ICoreS
192194
readPreference,
193195
_databaseNamespace,
194196
_command,
197+
null, // TODO: support commandPayloads
195198
_commandValidator,
196199
additionalOptions,
197-
() => CommandResponseHandling.Return,
200+
null, // postWriteAction,
201+
CommandResponseHandling.Return,
198202
_resultSerializer,
199203
_messageEncoderSettings,
200204
cancellationToken);

0 commit comments

Comments
 (0)