Skip to content

Commit 94a31f2

Browse files
committed
CSHARP-2020: OP_MSG support.
1 parent b8f6aa9 commit 94a31f2

File tree

57 files changed

+4864
-334
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+4864
-334
lines changed

src/MongoDB.Driver.Core/Core/Bindings/IChannel.cs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public interface IChannel : IDisposable
5353
/// <param name="messageEncoderSettings">The message encoder settings.</param>
5454
/// <param name="cancellationToken">The cancellation token.</param>
5555
/// <returns>The result of the Command protocol.</returns>
56+
[Obsolete("Use the newest overload instead.")]
5657
TResult Command<TResult>(
5758
DatabaseNamespace databaseNamespace,
5859
BsonDocument command,
@@ -81,6 +82,7 @@ TResult Command<TResult>(
8182
/// <returns>
8283
/// The result of the Command protocol.
8384
/// </returns>
85+
[Obsolete("Use the newest overload instead.")]
8486
TResult Command<TResult>(
8587
ICoreSession session,
8688
ReadPreference readPreference,
@@ -94,6 +96,35 @@ TResult Command<TResult>(
9496
MessageEncoderSettings messageEncoderSettings,
9597
CancellationToken cancellationToken);
9698

99+
/// <summary>
100+
/// Executes a Command protocol.
101+
/// </summary>
102+
/// <typeparam name="TResult">The type of the result.</typeparam>
103+
/// <param name="session">The session.</param>
104+
/// <param name="readPreference">The read preference.</param>
105+
/// <param name="databaseNamespace">The database namespace.</param>
106+
/// <param name="command">The command.</param>
107+
/// <param name="commandValidator">The command validator.</param>
108+
/// <param name="additionalOptions">The additional options.</param>
109+
/// <param name="responseHandling">The response handling.</param>
110+
/// <param name="resultSerializer">The result serializer.</param>
111+
/// <param name="messageEncoderSettings">The message encoder settings.</param>
112+
/// <param name="cancellationToken">The cancellation token.</param>
113+
/// <returns>
114+
/// The result of the Command protocol.
115+
/// </returns>
116+
TResult Command<TResult>(
117+
ICoreSession session,
118+
ReadPreference readPreference,
119+
DatabaseNamespace databaseNamespace,
120+
BsonDocument command,
121+
IElementNameValidator commandValidator,
122+
BsonDocument additionalOptions,
123+
Func<CommandResponseHandling> responseHandling,
124+
IBsonSerializer<TResult> resultSerializer,
125+
MessageEncoderSettings messageEncoderSettings,
126+
CancellationToken cancellationToken);
127+
97128
/// <summary>
98129
/// Executes a Command protocol.
99130
/// </summary>
@@ -107,6 +138,7 @@ TResult Command<TResult>(
107138
/// <param name="messageEncoderSettings">The message encoder settings.</param>
108139
/// <param name="cancellationToken">The cancellation token.</param>
109140
/// <returns>A Task whose result is the result of the Command protocol.</returns>
141+
[Obsolete("Use the newest overload instead.")]
110142
Task<TResult> CommandAsync<TResult>(
111143
DatabaseNamespace databaseNamespace,
112144
BsonDocument command,
@@ -135,6 +167,7 @@ Task<TResult> CommandAsync<TResult>(
135167
/// <returns>
136168
/// A Task whose result is the result of the Command protocol.
137169
/// </returns>
170+
[Obsolete("Use the newest overload instead.")]
138171
Task<TResult> CommandAsync<TResult>(
139172
ICoreSession session,
140173
ReadPreference readPreference,
@@ -148,6 +181,35 @@ Task<TResult> CommandAsync<TResult>(
148181
MessageEncoderSettings messageEncoderSettings,
149182
CancellationToken cancellationToken);
150183

184+
/// <summary>
185+
/// Executes a Command protocol.
186+
/// </summary>
187+
/// <typeparam name="TResult">The type of the result.</typeparam>
188+
/// <param name="session">The session.</param>
189+
/// <param name="readPreference">The read preference.</param>
190+
/// <param name="databaseNamespace">The database namespace.</param>
191+
/// <param name="command">The command.</param>
192+
/// <param name="commandValidator">The command validator.</param>
193+
/// <param name="additionalOptions">The additional options.</param>
194+
/// <param name="responseHandling">The response handling.</param>
195+
/// <param name="resultSerializer">The result serializer.</param>
196+
/// <param name="messageEncoderSettings">The message encoder settings.</param>
197+
/// <param name="cancellationToken">The cancellation token.</param>
198+
/// <returns>
199+
/// A Task whose result is the result of the Command protocol.
200+
/// </returns>
201+
Task<TResult> CommandAsync<TResult>(
202+
ICoreSession session,
203+
ReadPreference readPreference,
204+
DatabaseNamespace databaseNamespace,
205+
BsonDocument command,
206+
IElementNameValidator commandValidator,
207+
BsonDocument additionalOptions,
208+
Func<CommandResponseHandling> responseHandling,
209+
IBsonSerializer<TResult> resultSerializer,
210+
MessageEncoderSettings messageEncoderSettings,
211+
CancellationToken cancellationToken);
212+
151213
/// <summary>
152214
/// Executes a Delete protocol.
153215
/// </summary>

src/MongoDB.Driver.Core/Core/Connections/CommandEventHelper.cs

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,14 @@ public void AfterReceiving(ResponseMessage message, IByteBuffer buffer, Connecti
168168
return;
169169
}
170170

171-
ProcessReplyMessage(state, message, buffer, connectionId, encoderSettings);
171+
if (message is CommandResponseMessage)
172+
{
173+
ProcessCommandResponseMessage(state, (CommandResponseMessage)message, buffer, connectionId, encoderSettings);
174+
}
175+
else
176+
{
177+
ProcessReplyMessage(state, message, buffer, connectionId, encoderSettings);
178+
}
172179
}
173180

174181
public void ErrorReceiving(int responseTo, ConnectionId connectionId, Exception exception)
@@ -225,6 +232,9 @@ private void ProcessRequestMessages(Queue<RequestMessage> messageQueue, Connecti
225232
var message = messageQueue.Dequeue();
226233
switch (message.MessageType)
227234
{
235+
case MongoDBMessageType.Command:
236+
ProcessCommandRequestMessage((CommandRequestMessage)message, messageQueue, connectionId, new CommandMessageBinaryEncoder(stream, encoderSettings), stopwatch);
237+
break;
228238
case MongoDBMessageType.Delete:
229239
ProcessDeleteMessage((DeleteMessage)message, messageQueue, connectionId, new DeleteMessageBinaryEncoder(stream, encoderSettings), stopwatch);
230240
break;
@@ -248,6 +258,103 @@ private void ProcessRequestMessages(Queue<RequestMessage> messageQueue, Connecti
248258
}
249259
}
250260

261+
private void ProcessCommandRequestMessage(CommandRequestMessage originalMessage, Queue<RequestMessage> messageQueue, ConnectionId connectionId, CommandMessageBinaryEncoder encoder, Stopwatch stopwatch)
262+
{
263+
var requestId = originalMessage.RequestId;
264+
var operationId = EventContext.OperationId;
265+
266+
var decodedMessage = encoder.ReadMessage();
267+
using (new CommandMessageDisposer(decodedMessage))
268+
{
269+
var type0Section = decodedMessage.Sections.OfType<Type0CommandMessageSection>().Single();
270+
var command = (BsonDocument)type0Section.Document;
271+
var commandName = command.GetElement(0).Name;
272+
var databaseName = command["$db"].AsString;
273+
var databaseNamespace = new DatabaseNamespace(databaseName);
274+
if (__securitySensitiveCommands.Contains(commandName))
275+
{
276+
command = new BsonDocument();
277+
}
278+
279+
if (_startedEvent != null)
280+
{
281+
282+
var @event = new CommandStartedEvent(
283+
commandName,
284+
command,
285+
databaseNamespace,
286+
operationId,
287+
requestId,
288+
connectionId);
289+
290+
_startedEvent(@event);
291+
}
292+
293+
if (_shouldTrackState)
294+
{
295+
_state.TryAdd(requestId, new CommandState
296+
{
297+
CommandName = commandName,
298+
OperationId = operationId,
299+
Stopwatch = stopwatch,
300+
QueryNamespace = new CollectionNamespace(databaseNamespace, "$cmd"),
301+
ExpectedResponseType = ExpectedResponseType.Command
302+
});
303+
}
304+
}
305+
}
306+
307+
private void ProcessCommandResponseMessage(CommandState state, CommandResponseMessage message, IByteBuffer buffer, ConnectionId connectionId, MessageEncoderSettings encoderSettings)
308+
{
309+
var wrappedMessage = message.WrappedMessage;
310+
var type0Section = wrappedMessage.Sections.OfType<Type0CommandMessageSection<RawBsonDocument>>().Single();
311+
var reply = (BsonDocument)type0Section.Document;
312+
313+
BsonValue ok;
314+
if (!reply.TryGetValue("ok", out ok))
315+
{
316+
// this is a degenerate case with the server and
317+
// we don't really know what to do here...
318+
return;
319+
}
320+
321+
if (__securitySensitiveCommands.Contains(state.CommandName))
322+
{
323+
reply = new BsonDocument();
324+
}
325+
326+
if (ok.ToBoolean())
327+
{
328+
if (_succeededEvent != null)
329+
{
330+
_succeededEvent(new CommandSucceededEvent(
331+
state.CommandName,
332+
reply,
333+
state.OperationId,
334+
message.ResponseTo,
335+
connectionId,
336+
state.Stopwatch.Elapsed));
337+
}
338+
}
339+
else
340+
{
341+
if (_failedEvent != null)
342+
{
343+
_failedEvent(new CommandFailedEvent(
344+
state.CommandName,
345+
new MongoCommandException(
346+
connectionId,
347+
string.Format("{0} command failed", state.CommandName),
348+
null,
349+
reply),
350+
state.OperationId,
351+
message.ResponseTo,
352+
connectionId,
353+
state.Stopwatch.Elapsed));
354+
}
355+
}
356+
}
357+
251358
private void ProcessDeleteMessage(DeleteMessage originalMessage, Queue<RequestMessage> messageQueue, ConnectionId connectionId, DeleteMessageBinaryEncoder encoder, Stopwatch stopwatch)
252359
{
253360
var commandName = "delete";

src/MongoDB.Driver.Core/Core/Misc/BatchableSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace MongoDB.Driver.Core.Misc
2323
/// Represents a batch of items that can be split if not all items can be processed at once.
2424
/// </summary>
2525
/// <typeparam name="T">The type of the items.</typeparam>
26-
public sealed class BatchableSource<T>
26+
public sealed class BatchableSource<T> : IBatchableSource<T>
2727
{
2828
#region static
2929
// private static methods

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class Feature
3838
private static readonly Feature __bypassDocumentValidation = new Feature("BypassDocumentValidation", new SemanticVersion(3, 2, 0));
3939
private static readonly Feature __changeStreamStage = new Feature("ChangeStreamStage", new SemanticVersion(3, 5, 11));
4040
private static readonly CollationFeature __collation = new CollationFeature("Collation", new SemanticVersion(3, 3, 11));
41+
private static readonly Feature __commandMessage = new Feature("CommandMessage", new SemanticVersion(3, 6, 0));
4142
private static readonly CommandsThatWriteAcceptWriteConcernFeature __commandsThatWriteAcceptWriteConcern = new CommandsThatWriteAcceptWriteConcernFeature("CommandsThatWriteAcceptWriteConcern", new SemanticVersion(3, 3, 11));
4243
private static readonly Feature __createIndexesCommand = new Feature("CreateIndexesCommand", new SemanticVersion(3, 0, 0));
4344
private static readonly Feature __currentOpCommand = new Feature("CurrentOpCommand", new SemanticVersion(3, 2, 0));
@@ -136,6 +137,11 @@ public class Feature
136137
/// </summary>
137138
public static CollationFeature Collation => __collation;
138139

140+
/// <summary>
141+
/// Gets the command message feature.
142+
/// </summary>
143+
public static Feature CommandMessage => __commandMessage;
144+
139145
/// <summary>
140146
/// Gets the commands that write accept write concern feature.
141147
/// </summary>
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/* Copyright 2018-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Collections.Generic;
18+
using System.Linq;
19+
20+
namespace MongoDB.Driver.Core.Misc
21+
{
22+
/// <summary>
23+
/// Represents a batch of items that can be split if not all items can be processed at once.
24+
/// </summary>
25+
/// <typeparam name="T">The type of the items.</typeparam>
26+
public interface IBatchableSource<out T>
27+
{
28+
// properties
29+
/// <summary>
30+
/// Gets a value indicating whether all items were processed.
31+
/// </summary>
32+
/// <value>
33+
/// <c>true</c> if all items were processed; otherwise, <c>false</c>.
34+
/// </value>
35+
bool AllItemsWereProcessed { get; }
36+
37+
/// <summary>
38+
/// Gets a value indicating whether the batch can be split.
39+
/// </summary>
40+
/// <value>
41+
/// <c>true</c> if the batch can be split; otherwise, <c>false</c>.
42+
/// </value>
43+
bool CanBeSplit { get; }
44+
45+
/// <summary>
46+
/// Gets the count.
47+
/// </summary>
48+
/// <value>
49+
/// The count.
50+
/// </value>
51+
int Count { get; }
52+
53+
/// <summary>
54+
/// Gets the items.
55+
/// </summary>
56+
/// <value>
57+
/// The items.
58+
/// </value>
59+
IReadOnlyList<T> Items { get; }
60+
61+
/// <summary>
62+
/// Gets the offset.
63+
/// </summary>
64+
/// <value>
65+
/// The offset.
66+
/// </value>
67+
int Offset { get; }
68+
69+
/// <summary>
70+
/// Gets the count of processed items. Equal to zero until SetProcessedCount has been called.
71+
/// </summary>
72+
/// <value>
73+
/// The count of processed items.
74+
/// </value>
75+
int ProcessedCount { get; }
76+
77+
// methods
78+
/// <summary>
79+
/// Advances past the processed items.
80+
/// </summary>
81+
void AdvancePastProcessedItems();
82+
83+
/// <summary>
84+
/// Gets the items in the batch.
85+
/// </summary>
86+
/// <returns>
87+
/// The items in the batch.
88+
/// </returns>
89+
IReadOnlyList<T> GetBatchItems();
90+
91+
/// <summary>
92+
/// Gets the items that were processed.
93+
/// </summary>
94+
/// <returns>
95+
/// The items that were processed.
96+
/// </returns>
97+
IReadOnlyList<T> GetProcessedItems();
98+
99+
/// <summary>
100+
/// Gets the items that were not processed.
101+
/// </summary>
102+
/// <returns>
103+
/// The items that were not processed.
104+
/// </returns>
105+
IReadOnlyList<T> GetUnprocessedItems();
106+
107+
/// <summary>
108+
/// Sets the processed count.
109+
/// </summary>
110+
/// <param name="value">The value.</param>
111+
void SetProcessedCount(int value);
112+
}
113+
}

src/MongoDB.Driver.Core/Core/Operations/AsyncCursor.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ private CursorBatch<TDocument> ExecuteGetMoreCommand(IChannelHandle channel, Can
175175
NoOpElementNameValidator.Instance,
176176
null, // additionalOptions
177177
() => CommandResponseHandling.Return,
178-
false, // slaveOk
179178
__getMoreCommandResultSerializer,
180179
_messageEncoderSettings,
181180
cancellationToken);
@@ -194,7 +193,6 @@ private async Task<CursorBatch<TDocument>> ExecuteGetMoreCommandAsync(IChannelHa
194193
NoOpElementNameValidator.Instance,
195194
null, // additionalOptions
196195
() => CommandResponseHandling.Return,
197-
false, // slaveOk
198196
__getMoreCommandResultSerializer,
199197
_messageEncoderSettings,
200198
cancellationToken).ConfigureAwait(false);

0 commit comments

Comments
 (0)