Skip to content

Commit 3224fab

Browse files
CSHARP-2199: Prohibit using unacknowledged writes with explicit sessions.
1 parent 2f68bdf commit 3224fab

File tree

4 files changed

+167
-37
lines changed

4 files changed

+167
-37
lines changed

src/MongoDB.Driver.Core/Core/WireProtocol/CommandUsingCommandMessageWireProtocol.cs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -262,54 +262,81 @@ private Type0CommandMessageSection<BsonDocument> CreateType0Section(ConnectionDe
262262
{
263263
var extraElements = new List<BsonElement>();
264264

265-
addIfNotAlreadyAdded("$db", _databaseNamespace.DatabaseName);
265+
AddIfNotAlreadyAdded("$db", _databaseNamespace.DatabaseName);
266266

267267
if (connectionDescription.IsMasterResult.ServerType != ServerType.Standalone
268268
&& _readPreference != null
269269
&& _readPreference != ReadPreference.Primary)
270270
{
271271
var readPreferenceDocument = QueryHelper.CreateReadPreferenceDocument(_readPreference);
272-
addIfNotAlreadyAdded("$readPreference", readPreferenceDocument);
272+
AddIfNotAlreadyAdded("$readPreference", readPreferenceDocument);
273273
}
274274

275275
if (_session.Id != null)
276276
{
277-
addIfNotAlreadyAdded("lsid", _session.Id);
277+
if (IsSessionAcknowledged())
278+
{
279+
AddIfNotAlreadyAdded("lsid", _session.Id);
280+
}
281+
else
282+
{
283+
if (_session.IsImplicit)
284+
{
285+
// do not set sessionId if session is implicit and write is unacknowledged
286+
}
287+
else
288+
{
289+
throw new InvalidOperationException("Explicit session must not be used with unacknowledged writes.");
290+
}
291+
}
278292
}
279293

280294
if (_session.ClusterTime != null)
281295
{
282-
addIfNotAlreadyAdded("$clusterTime", _session.ClusterTime);
296+
AddIfNotAlreadyAdded("$clusterTime", _session.ClusterTime);
283297
}
284298
Action<BsonWriterSettings> writerSettingsConfigurator = s => s.GuidRepresentation = GuidRepresentation.Unspecified;
285299

286300
_session.AboutToSendCommand();
287301
if (_session.IsInTransaction)
288302
{
289303
var transaction = _session.CurrentTransaction;
290-
addIfNotAlreadyAdded("txnNumber", transaction.TransactionNumber);
304+
AddIfNotAlreadyAdded("txnNumber", transaction.TransactionNumber);
291305
if (transaction.State == CoreTransactionState.Starting)
292306
{
293-
addIfNotAlreadyAdded("startTransaction", true);
307+
AddIfNotAlreadyAdded("startTransaction", true);
294308
var readConcern = ReadConcernHelper.GetReadConcernForFirstCommandInTransaction(_session, connectionDescription);
295309
if (readConcern != null)
296310
{
297-
addIfNotAlreadyAdded("readConcern", readConcern);
311+
AddIfNotAlreadyAdded("readConcern", readConcern);
298312
}
299313
}
300-
addIfNotAlreadyAdded("autocommit", false);
314+
AddIfNotAlreadyAdded("autocommit", false);
301315
}
302316

303317
var elementAppendingSerializer = new ElementAppendingSerializer<BsonDocument>(BsonDocumentSerializer.Instance, extraElements, writerSettingsConfigurator);
304318
return new Type0CommandMessageSection<BsonDocument>(_command, elementAppendingSerializer);
305319

306-
void addIfNotAlreadyAdded(string key, BsonValue value)
320+
void AddIfNotAlreadyAdded(string key, BsonValue value)
307321
{
308322
if (!_command.Contains(key))
309323
{
310324
extraElements.Add(new BsonElement(key, value));
311325
}
312326
}
327+
328+
bool IsSessionAcknowledged()
329+
{
330+
if (_command.TryGetValue("writeConcern", out var writeConcernDocument))
331+
{
332+
var writeConcern = WriteConcern.FromBsonDocument(writeConcernDocument.AsBsonDocument);
333+
return writeConcern.IsAcknowledged;
334+
}
335+
else
336+
{
337+
return true;
338+
}
339+
}
313340
}
314341

315342
private bool IsRetryableWriteExceptionAndDeploymentDoesNotSupportRetryableWrites(MongoCommandException exception)

tests/MongoDB.Driver.Core.TestHelpers/CoreTestConfiguration.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,11 +307,11 @@ public static ICoreSessionHandle StartSession()
307307
return StartSession(__cluster.Value);
308308
}
309309

310-
public static ICoreSessionHandle StartSession(ICluster cluster)
310+
public static ICoreSessionHandle StartSession(ICluster cluster, CoreSessionOptions options = null)
311311
{
312312
if (AreSessionsSupported(cluster))
313313
{
314-
return cluster.StartSession();
314+
return cluster.StartSession(options);
315315
}
316316
else
317317
{

tests/MongoDB.Driver.Core.Tests/Core/Operations/BulkMixedWriteOperationTests.cs

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,8 +1092,6 @@ public void Execute_with_an_error_in_the_second_batch_and_ordered_is_true(
10921092
list.Should().HaveCount(3);
10931093
}
10941094

1095-
//
1096-
10971095
[SkippableTheory]
10981096
[ParameterAttributeData]
10991097
public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_is_false(
@@ -1117,7 +1115,7 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_
11171115
WriteConcern = WriteConcern.Unacknowledged
11181116
};
11191117

1120-
using (var readWriteBinding = CreateReadWriteBinding())
1118+
using (var readWriteBinding = CreateReadWriteBinding(useImplicitSession: true))
11211119
using (var channelSource = readWriteBinding.GetWriteChannelSource(CancellationToken.None))
11221120
using (var channel = channelSource.GetChannel(CancellationToken.None))
11231121
using (var channelBinding = new ChannelReadWriteBinding(channelSource.Server, channel, readWriteBinding.Session.Fork()))
@@ -1161,7 +1159,7 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_
11611159
WriteConcern = WriteConcern.Unacknowledged
11621160
};
11631161

1164-
using (var readWriteBinding = CreateReadWriteBinding())
1162+
using (var readWriteBinding = CreateReadWriteBinding(useImplicitSession: true))
11651163
using (var channelSource = readWriteBinding.GetWriteChannelSource(CancellationToken.None))
11661164
using (var channel = channelSource.GetChannel(CancellationToken.None))
11671165
using (var channelBinding = new ChannelReadWriteBinding(channelSource.Server, channel, readWriteBinding.Session.Fork()))
@@ -1199,7 +1197,7 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered
11991197
WriteConcern = WriteConcern.Unacknowledged
12001198
};
12011199

1202-
using (var readWriteBinding = CreateReadWriteBinding())
1200+
using (var readWriteBinding = CreateReadWriteBinding(useImplicitSession: true))
12031201
using (var channelSource = readWriteBinding.GetWriteChannelSource(CancellationToken.None))
12041202
using (var channel = channelSource.GetChannel(CancellationToken.None))
12051203
using (var channelBinding = new ChannelReadWriteBinding(channelSource.Server, channel, readWriteBinding.Session.Fork()))
@@ -1237,7 +1235,7 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered
12371235
WriteConcern = WriteConcern.Unacknowledged
12381236
};
12391237

1240-
using (var readWriteBinding = CreateReadWriteBinding())
1238+
using (var readWriteBinding = CreateReadWriteBinding(useImplicitSession: true))
12411239
using (var channelSource = readWriteBinding.GetWriteChannelSource(CancellationToken.None))
12421240
using (var channel = channelSource.GetChannel(CancellationToken.None))
12431241
using (var channelBinding = new ChannelReadWriteBinding(channelSource.Server, channel, readWriteBinding.Session.Fork()))
@@ -1251,6 +1249,23 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered
12511249
}
12521250
}
12531251

1252+
[SkippableTheory]
1253+
[ParameterAttributeData]
1254+
public void Execute_with_delete_should_not_send_session_id_when_unacknowledged_writes(
1255+
[Values(false, true)] bool useImplicitSession,
1256+
[Values(false, true)] bool async)
1257+
{
1258+
RequireServer.Check();
1259+
DropCollection();
1260+
var requests = new[] { new DeleteRequest(BsonDocument.Parse("{ x : 1 }")) };
1261+
var subject = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings)
1262+
{
1263+
WriteConcern = WriteConcern.Unacknowledged
1264+
};
1265+
1266+
VerifySessionIdWasNotSentIfUnacknowledgedWrite(subject, "delete", async, useImplicitSession);
1267+
}
1268+
12541269
[SkippableTheory]
12551270
[ParameterAttributeData]
12561271
public void Execute_with_delete_should_send_session_id_when_supported(
@@ -1263,6 +1278,24 @@ public void Execute_with_delete_should_send_session_id_when_supported(
12631278
VerifySessionIdWasSentWhenSupported(subject, "delete", async);
12641279
}
12651280

1281+
1282+
[SkippableTheory]
1283+
[ParameterAttributeData]
1284+
public void Execute_with_insert_should_not_send_session_id_when_unacknowledged_writes(
1285+
[Values(false, true)] bool useImplicitSession,
1286+
[Values(false, true)] bool async)
1287+
{
1288+
RequireServer.Check();
1289+
DropCollection();
1290+
var requests = new[] { new InsertRequest(BsonDocument.Parse("{ _id : 1, x : 3 }")) };
1291+
var subject = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings)
1292+
{
1293+
WriteConcern = WriteConcern.Unacknowledged
1294+
};
1295+
1296+
VerifySessionIdWasNotSentIfUnacknowledgedWrite(subject, "insert", async, useImplicitSession);
1297+
}
1298+
12661299
[SkippableTheory]
12671300
[ParameterAttributeData]
12681301
public void Execute_with_insert_should_send_session_id_when_supported(
@@ -1276,6 +1309,23 @@ public void Execute_with_insert_should_send_session_id_when_supported(
12761309
VerifySessionIdWasSentWhenSupported(subject, "insert", async);
12771310
}
12781311

1312+
[SkippableTheory]
1313+
[ParameterAttributeData]
1314+
public void Execute_with_update_should_not_send_session_id_when_unacknowledged_writes(
1315+
[Values(false, true)] bool useImplicitSession,
1316+
[Values(false, true)] bool async)
1317+
{
1318+
RequireServer.Check();
1319+
DropCollection();
1320+
var requests = new[] { new UpdateRequest(UpdateType.Update, BsonDocument.Parse("{ x : 1 }"), BsonDocument.Parse("{ $set : { a : 1 } }")) };
1321+
var subject = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings)
1322+
{
1323+
WriteConcern = WriteConcern.Unacknowledged
1324+
};
1325+
1326+
VerifySessionIdWasNotSentIfUnacknowledgedWrite(subject, "update", async, useImplicitSession);
1327+
}
1328+
12791329
[SkippableTheory]
12801330
[ParameterAttributeData]
12811331
public void Execute_with_update_should_send_session_id_when_supported(

tests/MongoDB.Driver.Core.Tests/Core/Operations/OperationTestBase.cs

Lines changed: 73 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
using MongoDB.Driver.Core.Events;
2727
using MongoDB.Driver.Core.TestHelpers;
2828
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
29+
using Xunit;
2930

3031
namespace MongoDB.Driver.Core.Operations
3132
{
@@ -247,9 +248,11 @@ protected IReadBinding CreateReadBinding(ReadPreference readPreference)
247248
return new ReadPreferenceBinding(_cluster, readPreference, _session.Fork());
248249
}
249250

250-
protected IReadWriteBinding CreateReadWriteBinding()
251+
protected IReadWriteBinding CreateReadWriteBinding(bool useImplicitSession = false)
251252
{
252-
return new WritableServerBinding(_cluster, _session.Fork());
253+
var options = new CoreSessionOptions(isImplicit: useImplicitSession);
254+
var session = CoreTestConfiguration.StartSession(_cluster, options);
255+
return new WritableServerBinding(_cluster, session);
253256
}
254257

255258
protected void Insert(params BsonDocument[] documents)
@@ -398,61 +401,111 @@ protected void Update(string filter, string update)
398401
Update(BsonDocument.Parse(filter), BsonDocument.Parse(update));
399402
}
400403

404+
protected void VerifySessionIdWasNotSentIfUnacknowledgedWrite<TResult>(
405+
IWriteOperation<TResult> operation,
406+
string commandName,
407+
bool async,
408+
bool useImplicitSession)
409+
{
410+
VerifySessionIdSending(
411+
(binding, cancellationToken) => operation.ExecuteAsync(binding, cancellationToken),
412+
(binding, cancellationToken) => operation.Execute(binding, cancellationToken),
413+
AssertSessionIdWasNotSentIfUnacknowledgedWrite,
414+
commandName,
415+
async,
416+
useImplicitSession);
417+
}
418+
401419
protected void VerifySessionIdWasSentWhenSupported<TResult>(IReadOperation<TResult> operation, string commandName, bool async)
402420
{
403-
VerifySessionIdWasSentWhenSupported(
421+
VerifySessionIdSending(
404422
(binding, cancellationToken) => operation.ExecuteAsync(binding, cancellationToken),
405423
(binding, cancellationToken) => operation.Execute(binding, cancellationToken),
424+
AssertSessionIdWasSentWhenSupported,
406425
commandName,
407426
async);
408427
}
409428

410429
protected void VerifySessionIdWasSentWhenSupported<TResult>(IWriteOperation<TResult> operation, string commandName, bool async)
411430
{
412-
VerifySessionIdWasSentWhenSupported(
431+
VerifySessionIdSending(
413432
(binding, cancellationToken) => operation.ExecuteAsync(binding, cancellationToken),
414433
(binding, cancellationToken) => operation.Execute(binding, cancellationToken),
434+
AssertSessionIdWasSentWhenSupported,
415435
commandName,
416436
async);
417437
}
418438

419-
protected void VerifySessionIdWasSentWhenSupported<TResult>(
439+
protected void VerifySessionIdSending<TResult>(
420440
Func<WritableServerBinding, CancellationToken, Task<TResult>> executeAsync,
421441
Func<WritableServerBinding, CancellationToken, TResult> execute,
442+
Action<EventCapturer, ICoreSessionHandle, Exception> assertResults,
422443
string commandName,
423-
bool async)
444+
bool async,
445+
bool useImplicitSession = false)
424446
{
425447
var eventCapturer = new EventCapturer().Capture<CommandStartedEvent>(e => e.CommandName == commandName);
426448
using (var cluster = CoreTestConfiguration.CreateCluster(b => b.Subscribe(eventCapturer)))
427449
{
428-
using (var session = CoreTestConfiguration.StartSession(cluster))
450+
using (var session = CreateSession(cluster, useImplicitSession))
429451
using (var binding = new WritableServerBinding(cluster, session.Fork()))
430452
{
431453
var cancellationToken = new CancellationTokenSource().Token;
454+
Exception exception;
432455
if (async)
433456
{
434-
executeAsync(binding, cancellationToken).GetAwaiter().GetResult();
457+
exception = Record.Exception(() => executeAsync(binding, cancellationToken).GetAwaiter().GetResult());
435458
}
436459
else
437460
{
438-
execute(binding, cancellationToken);
461+
exception = Record.Exception(() => execute(binding, cancellationToken));
439462
}
440463

441-
var commandStartedEvent = (CommandStartedEvent)eventCapturer.Next();
442-
var command = commandStartedEvent.Command;
443-
if (session.Id == null)
444-
{
445-
command.Contains("lsid").Should().BeFalse();
446-
}
447-
else
448-
{
449-
command["lsid"].Should().Be(session.Id);
450-
}
451-
session.ReferenceCount().Should().Be(2);
464+
assertResults(eventCapturer, session, exception);
452465
}
453466
}
454467
}
455468

469+
// private methods
470+
private void AssertSessionIdWasNotSentIfUnacknowledgedWrite(EventCapturer eventCapturer, ICoreSessionHandle session, Exception ex)
471+
{
472+
if (session.IsImplicit)
473+
{
474+
var commandStartedEvent = (CommandStartedEvent)eventCapturer.Next();
475+
var command = commandStartedEvent.Command;
476+
command.Contains("lsid").Should().BeFalse();
477+
session.ReferenceCount().Should().Be(2);
478+
}
479+
else
480+
{
481+
var e = ex.Should().BeOfType<InvalidOperationException>().Subject;
482+
e.Message.Should().Be("Explicit session must not be used with unacknowledged writes.");
483+
}
484+
}
485+
486+
private void AssertSessionIdWasSentWhenSupported(EventCapturer eventCapturer, ICoreSessionHandle session, Exception exception)
487+
{
488+
exception.Should().BeNull();
489+
var commandStartedEvent = (CommandStartedEvent)eventCapturer.Next();
490+
var command = commandStartedEvent.Command;
491+
if (session.Id == null)
492+
{
493+
command.Contains("lsid").Should().BeFalse();
494+
}
495+
else
496+
{
497+
command["lsid"].Should().Be(session.Id);
498+
}
499+
500+
session.ReferenceCount().Should().Be(2);
501+
}
502+
503+
private ICoreSessionHandle CreateSession(ICluster cluster, bool useImplicitSession)
504+
{
505+
var options = new CoreSessionOptions(isImplicit: useImplicitSession);
506+
return CoreTestConfiguration.StartSession(cluster, options);
507+
}
508+
456509
protected class Profiler : IDisposable
457510
{
458511
private readonly OperationTestBase _testBase;

0 commit comments

Comments
 (0)