Skip to content

Commit f74a311

Browse files
committed
CSHARP-3019: Support hedged reads.
1 parent 0ecfd24 commit f74a311

File tree

11 files changed

+437
-34
lines changed

11 files changed

+437
-34
lines changed

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class Feature
6363
private static readonly Feature __findCommand = new Feature("FindCommand", new SemanticVersion(3, 2, 0));
6464
private static readonly Feature __geoNearCommand = new Feature("GeoNearCommand", new SemanticVersion(1, 0, 0), new SemanticVersion(4, 1, 0, ""));
6565
private static readonly Feature __groupCommand = new Feature("GroupCommand", new SemanticVersion(1, 0, 0), new SemanticVersion(4, 1, 1, ""));
66+
private static readonly Feature __hedgedReads = new Feature("HedgedReads", new SemanticVersion(4, 3, 1, ""));
6667
private static readonly HintForDeleteOperationsFeature __hintForDeleteOperations = new HintForDeleteOperationsFeature("HintForDeleteOperations", new SemanticVersion(4, 3, 4));
6768
private static readonly HintForFindAndModifyFeature __hintForFindAndModifyFeature = new HintForFindAndModifyFeature("HintForFindAndModify", new SemanticVersion(4, 3, 4));
6869
private static readonly HintForUpdateAndReplaceOperationsFeature __hintForUpdateAndReplaceOperations = new HintForUpdateAndReplaceOperationsFeature("HintForUpdateAndReplaceOperations", new SemanticVersion(4, 2, 0));
@@ -294,6 +295,11 @@ public class Feature
294295
/// </summary>
295296
public static Feature GroupCommand => __groupCommand;
296297

298+
/// <summary>
299+
/// Gets the hedged reads feature.
300+
/// </summary>
301+
public static Feature HedgedReads => __hedgedReads;
302+
297303
/// <summary>
298304
/// Gets the hint for delete operations feature.
299305
/// </summary>

src/MongoDB.Driver.Core/Core/Operations/FindOpcodeOperation.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -406,9 +406,9 @@ private Task<CursorBatch<TDocument>> ExecuteProtocolAsync(IChannelHandle channel
406406
#pragma warning restore 618
407407
}
408408

409-
internal BsonDocument CreateWrappedQuery(ServerType serverType, ReadPreference readPreference)
409+
internal BsonDocument CreateWrappedQuery(ServerType serverType, ReadPreference readPreference, out bool slaveOk)
410410
{
411-
var readPreferenceDocument = QueryHelper.CreateReadPreferenceDocument(serverType, readPreference);
411+
var readPreferenceDocument = QueryHelper.CreateReadPreferenceDocument(serverType, readPreference, out slaveOk);
412412

413413
var wrappedQuery = new BsonDocument
414414
{
@@ -463,8 +463,7 @@ public IAsyncCursor<TDocument> Execute(RetryableReadContext context, Cancellatio
463463
{
464464
var readPreference = context.Binding.ReadPreference;
465465
var serverDescription = context.ChannelSource.ServerDescription;
466-
var wrappedQuery = CreateWrappedQuery(serverDescription.Type, readPreference);
467-
var slaveOk = readPreference != null && readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary;
466+
var wrappedQuery = CreateWrappedQuery(serverDescription.Type, readPreference, out var slaveOk);
468467

469468
var batch = ExecuteProtocol(context.Channel, wrappedQuery, slaveOk, cancellationToken);
470469
return CreateCursor(context.ChannelSource, wrappedQuery, batch);
@@ -492,8 +491,7 @@ public async Task<IAsyncCursor<TDocument>> ExecuteAsync(RetryableReadContext con
492491
{
493492
var readPreference = context.Binding.ReadPreference;
494493
var serverDescription = context.ChannelSource.ServerDescription;
495-
var wrappedQuery = CreateWrappedQuery(serverDescription.Type, readPreference);
496-
var slaveOk = readPreference != null && readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary;
494+
var wrappedQuery = CreateWrappedQuery(serverDescription.Type, readPreference, out var slaveOk);
497495

498496
var batch = await ExecuteProtocolAsync(context.Channel, wrappedQuery, slaveOk, cancellationToken).ConfigureAwait(false);
499497
return CreateCursor(context.ChannelSource, wrappedQuery, batch);

src/MongoDB.Driver.Core/Core/Operations/QueryHelper.cs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,28 @@ public static int CalculateFirstBatchSize(int? limit, int? batchSize)
5252
return firstBatchSize;
5353
}
5454

55-
public static BsonDocument CreateReadPreferenceDocument(ServerType serverType, ReadPreference readPreference)
55+
public static BsonDocument CreateReadPreferenceDocument(ServerType serverType, ReadPreference readPreference, out bool slaveOk)
5656
{
57+
slaveOk = readPreference != null && readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary;
58+
5759
if (serverType != ServerType.ShardRouter || readPreference == null)
5860
{
5961
return null;
6062
}
6163

6264
// simple ReadPreferences of Primary and SecondaryPreferred are encoded in the slaveOk bit
63-
if (readPreference.ReadPreferenceMode == ReadPreferenceMode.Primary || readPreference.ReadPreferenceMode == ReadPreferenceMode.SecondaryPreferred)
65+
switch (readPreference.ReadPreferenceMode)
6466
{
65-
var hasTagSets = readPreference.TagSets != null && readPreference.TagSets.Count > 0;
66-
if (!hasTagSets && !readPreference.MaxStaleness.HasValue)
67-
{
67+
case ReadPreferenceMode.Primary:
6868
return null;
69-
}
69+
70+
case ReadPreferenceMode.SecondaryPreferred:
71+
var hasTagSets = readPreference.TagSets != null && readPreference.TagSets.Count > 0;
72+
if (!hasTagSets && !readPreference.MaxStaleness.HasValue && readPreference.Hedge == null)
73+
{
74+
return null;
75+
}
76+
break;
7077
}
7178

7279
return CreateReadPreferenceDocument(readPreference);
@@ -92,7 +99,8 @@ public static BsonDocument CreateReadPreferenceDocument(ReadPreference readPrefe
9299
{
93100
{ "mode", modeString },
94101
{ "tags", tagSets, tagSets != null },
95-
{ "maxStalenessSeconds", () => (int)readPreference.MaxStaleness.Value.TotalSeconds, readPreference.MaxStaleness.HasValue }
102+
{ "maxStalenessSeconds", () => (int)readPreference.MaxStaleness.Value.TotalSeconds, readPreference.MaxStaleness.HasValue },
103+
{ "hedge", () => readPreference.Hedge.ToBsonDocument(), readPreference.Hedge != null }
96104
};
97105
}
98106
}

src/MongoDB.Driver.Core/Core/WireProtocol/CommandUsingQueryMessageWireProtocol.cs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
using MongoDB.Driver.Core.Connections;
2929
using MongoDB.Driver.Core.Misc;
3030
using MongoDB.Driver.Core.Operations;
31+
using MongoDB.Driver.Core.Servers;
3132
using MongoDB.Driver.Core.WireProtocol.Messages;
3233
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
3334
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders.BinaryEncoders;
@@ -85,8 +86,7 @@ public CommandUsingQueryMessageWireProtocol(
8586
private QueryMessage CreateMessage(ConnectionDescription connectionDescription, out bool messageContainsSessionId)
8687
{
8788
var commandWithPayloads = CombineCommandWithPayloads(connectionDescription);
88-
var wrappedCommand = WrapCommandForQueryMessage(commandWithPayloads, connectionDescription, out messageContainsSessionId);
89-
var slaveOk = _readPreference != null && _readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary;
89+
var wrappedCommand = WrapCommandForQueryMessage(commandWithPayloads, connectionDescription, out messageContainsSessionId, out var slaveOk);
9090

9191
#pragma warning disable 618
9292
return new QueryMessage(
@@ -329,7 +329,7 @@ private TCommandResult ProcessReply(ConnectionId connectionId, ReplyMessage<RawB
329329
}
330330
}
331331

332-
private BsonDocument WrapCommandForQueryMessage(BsonDocument command, ConnectionDescription connectionDescription, out bool messageContainsSessionId)
332+
private BsonDocument WrapCommandForQueryMessage(BsonDocument command, ConnectionDescription connectionDescription, out bool messageContainsSessionId, out bool slaveOk)
333333
{
334334
messageContainsSessionId = false;
335335
var extraElements = new List<BsonElement>();
@@ -365,12 +365,8 @@ private BsonDocument WrapCommandForQueryMessage(BsonDocument command, Connection
365365
var appendExtraElementsSerializer = new ElementAppendingSerializer<BsonDocument>(BsonDocumentSerializer.Instance, extraElements, writerSettingsConfigurator);
366366
var commandWithExtraElements = new BsonDocumentWrapper(command, appendExtraElementsSerializer);
367367

368-
BsonDocument readPreferenceDocument = null;
369-
if (connectionDescription != null)
370-
{
371-
var serverType = connectionDescription.IsMasterResult.ServerType;
372-
readPreferenceDocument = QueryHelper.CreateReadPreferenceDocument(serverType, _readPreference);
373-
}
368+
var serverType = connectionDescription != null ? connectionDescription.IsMasterResult.ServerType : ServerType.Unknown;
369+
var readPreferenceDocument = QueryHelper.CreateReadPreferenceDocument(serverType, _readPreference, out slaveOk);
374370

375371
var wrappedCommand = new BsonDocument
376372
{

src/MongoDB.Driver.Core/ReadPreference.cs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public static ReadPreference FromBsonDocument(BsonDocument document)
121121
#endregion
122122

123123
// fields
124+
private readonly ReadPreferenceHedge _hedge;
124125
private readonly TimeSpan? _maxStaleness;
125126
private readonly ReadPreferenceMode _mode;
126127
private readonly IReadOnlyList<TagSet> _tagSets;
@@ -132,10 +133,12 @@ public static ReadPreference FromBsonDocument(BsonDocument document)
132133
/// <param name="mode">The read preference mode.</param>
133134
/// <param name="tagSets">The tag sets.</param>
134135
/// <param name="maxStaleness">The maximum staleness.</param>
136+
/// <param name="hedge">The hedge.</param>
135137
public ReadPreference(
136138
ReadPreferenceMode mode,
137139
IEnumerable<TagSet> tagSets = null,
138-
TimeSpan? maxStaleness = null)
140+
TimeSpan? maxStaleness = null,
141+
ReadPreferenceHedge hedge = null)
139142
{
140143
var tagSetsArray = tagSets == null ? __emptyTagSetsArray : tagSets.ToArray();
141144
if (tagSetsArray.Length > 0)
@@ -153,12 +156,23 @@ public ReadPreference(
153156
Ensure.That(mode != ReadPreferenceMode.Primary, "MaxStaleness cannot be used with ReadPreferenceMode Primary.", nameof(maxStaleness));
154157
}
155158

159+
if (hedge != null)
160+
{
161+
Ensure.That(mode != ReadPreferenceMode.Primary, "Hedged reads cannot be used with ReadPreferenceMode Primary.", nameof(hedge));
162+
}
163+
156164
_mode = mode;
157165
_tagSets = tagSetsArray;
158166
_maxStaleness = maxStaleness;
167+
_hedge = hedge;
159168
}
160169

161170
// properties
171+
/// <summary>
172+
/// Gets the hedge.
173+
/// </summary>
174+
public ReadPreferenceHedge Hedge => _hedge;
175+
162176
/// <summary>
163177
/// Gets the maximum staleness.
164178
/// </summary>
@@ -202,6 +216,7 @@ public bool Equals(ReadPreference other)
202216
}
203217

204218
return
219+
object.Equals(_hedge, other._hedge) &&
205220
_maxStaleness.Equals(other._maxStaleness) &&
206221
_mode.Equals(other._mode) &&
207222
_tagSets.SequenceEqual(other.TagSets);
@@ -217,6 +232,7 @@ public override bool Equals(object obj)
217232
public override int GetHashCode()
218233
{
219234
return new Hasher()
235+
.Hash(_hedge)
220236
.Hash(_maxStaleness)
221237
.Hash(_mode)
222238
.HashElements(_tagSets)
@@ -240,18 +256,33 @@ public override string ToString()
240256
sb.Append(", MaxStaleness : ");
241257
sb.Append(TimeSpanParser.ToString(_maxStaleness.Value));
242258
}
259+
if (_hedge != null)
260+
{
261+
sb.Append(", Hedge : ");
262+
sb.Append(_hedge.ToBsonDocument().ToJson());
263+
}
243264
sb.Append(" }");
244265
return sb.ToString();
245266
}
246267

268+
/// <summary>
269+
/// Returns a new instance of ReadPreference with some values changed.
270+
/// </summary>
271+
/// <param name="hedge">The hedge.</param>
272+
/// <returns>A new instance of ReadPreference.</returns>
273+
public ReadPreference With(ReadPreferenceHedge hedge)
274+
{
275+
return new ReadPreference(_mode, _tagSets, _maxStaleness, hedge);
276+
}
277+
247278
/// <summary>
248279
/// Returns a new instance of ReadPreference with some values changed.
249280
/// </summary>
250281
/// <param name="mode">The read preference mode.</param>
251282
/// <returns>A new instance of ReadPreference.</returns>
252283
public ReadPreference With(ReadPreferenceMode mode)
253284
{
254-
return new ReadPreference(mode, _tagSets, _maxStaleness);
285+
return new ReadPreference(mode, _tagSets, _maxStaleness, _hedge);
255286
}
256287

257288
/// <summary>
@@ -261,7 +292,7 @@ public ReadPreference With(ReadPreferenceMode mode)
261292
/// <returns>A new instance of ReadPreference.</returns>
262293
public ReadPreference With(IEnumerable<TagSet> tagSets)
263294
{
264-
return new ReadPreference(_mode, tagSets, _maxStaleness);
295+
return new ReadPreference(_mode, tagSets, _maxStaleness, _hedge);
265296
}
266297

267298
/// <summary>
@@ -271,7 +302,7 @@ public ReadPreference With(IEnumerable<TagSet> tagSets)
271302
/// <returns>A new instance of ReadPreference.</returns>
272303
public ReadPreference With(TimeSpan? maxStaleness)
273304
{
274-
return new ReadPreference(_mode, _tagSets, maxStaleness);
305+
return new ReadPreference(_mode, _tagSets, maxStaleness, _hedge);
275306
}
276307
}
277308
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/* Copyright 2020-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using MongoDB.Bson;
18+
19+
namespace MongoDB.Driver
20+
{
21+
/// <summary>
22+
/// Represents a read preference hedge.
23+
/// </summary>
24+
public sealed class ReadPreferenceHedge : IEquatable<ReadPreferenceHedge>
25+
{
26+
#region static
27+
// private static fields
28+
private static readonly ReadPreferenceHedge __disabled = new ReadPreferenceHedge(isEnabled: false);
29+
private static readonly ReadPreferenceHedge __enabled = new ReadPreferenceHedge(isEnabled: true);
30+
31+
// public static properties
32+
/// <summary>
33+
/// Gets a disabled read preference hedge.
34+
/// </summary>
35+
public static ReadPreferenceHedge Disabled => __disabled;
36+
37+
/// <summary>
38+
/// Gets an enabled read preference hedge.
39+
/// </summary>
40+
public static ReadPreferenceHedge Enabled => __enabled;
41+
#endregion
42+
43+
// private fields
44+
private readonly bool _isEnabled;
45+
46+
// constructors
47+
/// <summary>
48+
/// Initializes an instance of ReadPreferenceHedge.
49+
/// </summary>
50+
/// <param name="isEnabled">Whether hedged reads are enabled.</param>
51+
public ReadPreferenceHedge(bool isEnabled)
52+
{
53+
_isEnabled = isEnabled;
54+
}
55+
56+
// public properties
57+
/// <summary>
58+
/// Gets whether hedged reads are enabled.
59+
/// </summary>
60+
public bool IsEnabled => _isEnabled;
61+
62+
// public methods
63+
/// <inheritdoc/>
64+
public override bool Equals(object other)
65+
{
66+
return Equals(other as ReadPreferenceHedge);
67+
}
68+
69+
/// <inheritdoc/>
70+
public bool Equals(ReadPreferenceHedge other)
71+
{
72+
return other != null && _isEnabled == other._isEnabled;
73+
}
74+
75+
/// <inheritdoc/>
76+
public override int GetHashCode()
77+
{
78+
return _isEnabled.GetHashCode();
79+
}
80+
81+
/// <inheritdoc/>
82+
public BsonDocument ToBsonDocument()
83+
{
84+
return new BsonDocument("enabled", _isEnabled);
85+
}
86+
87+
/// <inheritdoc/>
88+
public override string ToString()
89+
{
90+
return ToBsonDocument().ToJson();
91+
}
92+
}
93+
}

tests/MongoDB.Driver.Core.Tests/Core/Operations/FindOpcodeOperationTests.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,11 @@ public void CreateWrappedQuery_should_create_the_correct_query_when_not_connecte
193193
};
194194

195195

196-
var result = subject.CreateWrappedQuery(ServerType.ReplicaSetArbiter, ReadPreference.Secondary);
196+
var result = subject.CreateWrappedQuery(ServerType.ReplicaSetArbiter, ReadPreference.Secondary, out var slaveOk);
197197

198198
result.Should().Be(expectedResult);
199199
result["$maxTimeMS"].BsonType.Should().Be(BsonType.Int32);
200+
slaveOk.Should().BeTrue();
200201
}
201202

202203
[Theory]
@@ -228,10 +229,11 @@ public void CreateWrappedQuery_should_create_the_correct_query_when_connected_to
228229
{ "$snapshot", true }
229230
};
230231

231-
var result = subject.CreateWrappedQuery(ServerType.ShardRouter, ReadPreference.Secondary);
232+
var result = subject.CreateWrappedQuery(ServerType.ShardRouter, ReadPreference.Secondary, out var slaveOk);
232233

233234
result.Should().Be(expectedResult);
234235
result["$maxTimeMS"].BsonType.Should().Be(BsonType.Int32);
236+
slaveOk.Should().BeTrue();
235237
}
236238

237239
[Theory]

0 commit comments

Comments
 (0)