Skip to content

Commit 5ba8320

Browse files
committed
CSHARP-2207: Large bulk writes not always splitting correctly
1 parent a5bdb75 commit 5ba8320

File tree

6 files changed

+182
-25
lines changed

6 files changed

+182
-25
lines changed

src/MongoDB.Driver.Core/Core/Misc/SizeLimitingBatchableSourceSerializer.cs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ public SizeLimitingBatchableSourceSerializer(
5252
_itemSerializer = Ensure.IsNotNull(itemSerializer, nameof(itemSerializer));
5353
_itemElementNameValidator = Ensure.IsNotNull(itemElementNameValidator, nameof(itemElementNameValidator));
5454
_maxBatchCount = Ensure.IsGreaterThanZero(maxBatchCount, nameof(maxBatchCount));
55-
_maxItemSize = Ensure.IsBetween(maxItemSize, 1, int.MaxValue, nameof(maxItemSize));
56-
_maxBatchSize = Ensure.IsBetween(maxBatchSize, maxItemSize, int.MaxValue, nameof(maxBatchSize));
55+
_maxItemSize = Ensure.IsGreaterThanZero(maxItemSize, nameof(maxItemSize));
56+
_maxBatchSize = Ensure.IsGreaterThanZero(maxBatchSize, nameof(maxBatchSize));
5757
}
5858

5959
// public methods
@@ -88,19 +88,23 @@ public override void Serialize(BsonSerializationContext context, BsonSerializati
8888
var item = value.Items[value.Offset + i];
8989
_itemSerializer.Serialize(context, args, item);
9090

91-
var batchSize = binaryWriter?.Position - startPosition;
92-
if (batchSize > _maxBatchSize)
91+
// always process at least one item
92+
if (i > 0)
9393
{
94-
if (i > 0 && value.CanBeSplit)
94+
var batchSize = binaryWriter?.Position - startPosition;
95+
if (batchSize > _maxBatchSize)
9596
{
96-
binaryWriter.BaseStream.Position = itemPosition.Value; // remove the last item
97-
binaryWriter.BaseStream.SetLength(itemPosition.Value);
98-
value.SetProcessedCount(i);
99-
return;
100-
}
101-
else
102-
{
103-
throw new ArgumentException("Batch is too large.");
97+
if (value.CanBeSplit)
98+
{
99+
binaryWriter.BaseStream.Position = itemPosition.Value; // remove the last item
100+
binaryWriter.BaseStream.SetLength(itemPosition.Value);
101+
value.SetProcessedCount(i);
102+
return;
103+
}
104+
else
105+
{
106+
throw new ArgumentException("Batch is too large.");
107+
}
104108
}
105109
}
106110
}

src/MongoDB.Driver.Core/Core/Operations/RetryableDeleteCommandOperation.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ private IBsonSerializer<BatchableSource<DeleteRequest>> CreateBatchSerializer(Co
123123
if (attempt == 1)
124124
{
125125
var maxBatchCount = Math.Min(MaxBatchCount ?? int.MaxValue, connectionDescription.MaxBatchCount);
126-
var maxItemSize = connectionDescription.MaxDocumentSize;
127-
var maxBatchSize = connectionDescription.MaxWireDocumentSize;
126+
var maxItemSize = connectionDescription.MaxWireDocumentSize;
127+
var maxBatchSize = connectionDescription.MaxDocumentSize;
128128
return new SizeLimitingBatchableSourceSerializer<DeleteRequest>(DeleteRequestSerializer.Instance, NoOpElementNameValidator.Instance, maxBatchCount, maxItemSize, maxBatchSize);
129129
}
130130
else

src/MongoDB.Driver.Core/Core/Operations/RetryableInsertCommandOperation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ private IBsonSerializer<BatchableSource<TDocument>> CreateBatchSerializer(Connec
150150
{
151151
var maxBatchCount = Math.Min(MaxBatchCount ?? int.MaxValue, connectionDescription.MaxBatchCount);
152152
var maxItemSize = connectionDescription.MaxDocumentSize;
153-
var maxBatchSize = connectionDescription.MaxWireDocumentSize;
153+
var maxBatchSize = connectionDescription.MaxDocumentSize;
154154
return new SizeLimitingBatchableSourceSerializer<TDocument>(itemSerializer, elementNameValidator, maxBatchCount, maxItemSize, maxBatchSize);
155155
}
156156
else

src/MongoDB.Driver.Core/Core/Operations/RetryableUpdateCommandOperation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private IBsonSerializer<BatchableSource<UpdateRequest>> CreateBatchSerializer(Co
145145
{
146146
var maxBatchCount = Math.Min(MaxBatchCount ?? int.MaxValue, connectionDescription.MaxBatchCount);
147147
var maxItemSize = connectionDescription.MaxWireDocumentSize;
148-
var maxBatchSize = connectionDescription.MaxWireDocumentSize;
148+
var maxBatchSize = connectionDescription.MaxDocumentSize;
149149
return new SizeLimitingBatchableSourceSerializer<UpdateRequest>(UpdateRequestSerializer.Instance, NoOpElementNameValidator.Instance, maxBatchCount, maxItemSize, maxBatchSize);
150150
}
151151
else

tests/MongoDB.Driver.Core.TestHelpers/EventCapturer.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
using System;
1717
using System.Collections.Generic;
18+
using System.Linq;
1819
using MongoDB.Bson;
1920
using MongoDB.Driver.Core.Events;
2021
using MongoDB.Driver.Core.Misc;
@@ -64,6 +65,17 @@ public void Clear()
6465
}
6566
}
6667

68+
public List<object> Events
69+
{
70+
get
71+
{
72+
lock (_lock)
73+
{
74+
return _capturedEvents.ToList();
75+
}
76+
}
77+
}
78+
6779
public object Next()
6880
{
6981
lock (_lock)

tests/MongoDB.Driver.Core.Tests/Core/Operations/BulkMixedWriteOperationTests.cs

Lines changed: 149 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,13 @@
1717
using System.Collections.Generic;
1818
using System.Linq;
1919
using System.Threading;
20-
using System.Threading.Tasks;
2120
using FluentAssertions;
2221
using MongoDB.Bson;
2322
using MongoDB.Bson.Serialization.Serializers;
2423
using MongoDB.Bson.TestHelpers.XunitExtensions;
2524
using MongoDB.Driver.Core.Bindings;
26-
using MongoDB.Driver.Core.Clusters;
27-
using MongoDB.Driver.Core.Configuration;
2825
using MongoDB.Driver.Core.Events;
2926
using MongoDB.Driver.Core.Misc;
30-
using MongoDB.Driver.Core.TestHelpers;
3127
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
3228
using Xunit;
3329

@@ -1257,11 +1253,149 @@ public void Execute_with_update_should_send_session_id_when_supported(
12571253
VerifySessionIdWasSentWhenSupported(subject, "update", async);
12581254
}
12591255

1260-
private List<BsonDocument> ReadAllFromCollection(IReadBinding binding)
1256+
[SkippableTheory]
1257+
[InlineData(new[] { 1 }, new[] { 1 })]
1258+
[InlineData(new[] { 1, 1 }, new[] { 2 })]
1259+
[InlineData(new[] { 8388605, 8388605 }, new[] { 2 })]
1260+
[InlineData(new[] { 8388605, 8388606 }, new[] { 1, 1 })]
1261+
[InlineData(new[] { 16777216 }, new[] { 1 })]
1262+
[InlineData(new[] { 16777216, 1 }, new[] { 1, 1 })]
1263+
[InlineData(new[] { 16777216, 16777216 }, new[] { 1, 1 })]
1264+
public void Execute_with_multiple_deletes_should_split_batches_as_expected_when_using_write_commands_via_opquery(int[] requestSizes, int[] expectedBatchCounts)
12611265
{
1262-
var operation = new FindOperation<BsonDocument>(_collectionNamespace, BsonDocumentSerializer.Instance, _messageEncoderSettings);
1263-
var cursor = ExecuteOperation(operation, binding, false);
1264-
return ReadCursorToEnd(cursor);
1266+
RequireServer.Check().Supports(Feature.WriteCommands);
1267+
DropCollection();
1268+
1269+
using (EventContext.BeginOperation())
1270+
{
1271+
var eventCapturer = new EventCapturer().Capture<CommandStartedEvent>(e => e.CommandName == "delete" && e.OperationId == EventContext.OperationId);
1272+
using (var cluster = CoreTestConfiguration.CreateCluster(b => b.Subscribe(eventCapturer)))
1273+
using (var session = NoCoreSession.NewHandle())
1274+
using (var binding = new ReadWriteBindingHandle(new WritableServerBinding(cluster, session.Fork())))
1275+
{
1276+
var requests = requestSizes.Select(size => CreateDeleteRequest(size));
1277+
var operation = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings);
1278+
1279+
var result = ExecuteOperation(operation, binding, async: false);
1280+
1281+
result.RequestCount.Should().Be(requestSizes.Length);
1282+
var commandStartedEvents = eventCapturer.Events.OfType<CommandStartedEvent>().ToList();
1283+
var actualBatchCounts = commandStartedEvents.Select(e => e.Command["deletes"].AsBsonArray.Count).ToList();
1284+
actualBatchCounts.Should().Equal(expectedBatchCounts);
1285+
}
1286+
}
1287+
}
1288+
1289+
[SkippableTheory]
1290+
[InlineData(new[] { 1 }, new[] { 1 })]
1291+
[InlineData(new[] { 1, 1 }, new[] { 2 })]
1292+
[InlineData(new[] { 8388605, 8388605 }, new[] { 2 })]
1293+
[InlineData(new[] { 8388605, 8388606 }, new[] { 1, 1 })]
1294+
[InlineData(new[] { 16777216 }, new[] { 1 })]
1295+
[InlineData(new[] { 16777216, 1 }, new[] { 1, 1 })]
1296+
[InlineData(new[] { 16777216, 16777216 }, new[] { 1, 1 })]
1297+
public void Execute_with_multiple_inserts_should_split_batches_as_expected_when_using_write_commands_via_opquery(int[] documentSizes, int[] expectedBatchCounts)
1298+
{
1299+
RequireServer.Check().Supports(Feature.WriteCommands);
1300+
DropCollection();
1301+
1302+
using (EventContext.BeginOperation())
1303+
{
1304+
var eventCapturer = new EventCapturer().Capture<CommandStartedEvent>(e => e.CommandName == "insert" && e.OperationId == EventContext.OperationId);
1305+
using (var cluster = CoreTestConfiguration.CreateCluster(b => b.Subscribe(eventCapturer)))
1306+
using (var session = NoCoreSession.NewHandle())
1307+
using (var binding = new ReadWriteBindingHandle(new WritableServerBinding(cluster, session.Fork())))
1308+
{
1309+
var documents = documentSizes.Select((size, index) => CreateDocument(index + 1, size)).ToList();
1310+
var requests = documents.Select(d => new InsertRequest(d)).ToList();
1311+
var operation = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings);
1312+
1313+
var result = ExecuteOperation(operation, binding, async: false);
1314+
1315+
result.InsertedCount.Should().Be(documents.Count);
1316+
var commandStartedEvents = eventCapturer.Events.OfType<CommandStartedEvent>().ToList();
1317+
var actualBatchCounts = commandStartedEvents.Select(e => e.Command["documents"].AsBsonArray.Count).ToList();
1318+
actualBatchCounts.Should().Equal(expectedBatchCounts);
1319+
}
1320+
}
1321+
}
1322+
1323+
[SkippableTheory]
1324+
[InlineData(new[] { 1 }, new[] { 1 })]
1325+
[InlineData(new[] { 1, 1 }, new[] { 2 })]
1326+
[InlineData(new[] { 8388605, 8388605 }, new[] { 2 })]
1327+
[InlineData(new[] { 8388605, 8388606 }, new[] { 1, 1 })]
1328+
[InlineData(new[] { 16777216 }, new[] { 1 })]
1329+
[InlineData(new[] { 16777216, 1 }, new[] { 1, 1 })]
1330+
[InlineData(new[] { 16777216, 16777216 }, new[] { 1, 1 })]
1331+
public void Execute_with_multiple_updates_should_split_batches_as_expected_when_using_write_commands_via_opquery(int[] requestSizes, int[] expectedBatchCounts)
1332+
{
1333+
RequireServer.Check().Supports(Feature.WriteCommands);
1334+
DropCollection();
1335+
1336+
using (EventContext.BeginOperation())
1337+
{
1338+
var eventCapturer = new EventCapturer().Capture<CommandStartedEvent>(e => e.CommandName == "update" && e.OperationId == EventContext.OperationId);
1339+
using (var cluster = CoreTestConfiguration.CreateCluster(b => b.Subscribe(eventCapturer)))
1340+
using (var session = NoCoreSession.NewHandle())
1341+
using (var binding = new ReadWriteBindingHandle(new WritableServerBinding(cluster, session.Fork())))
1342+
{
1343+
var requests = requestSizes.Select(size => CreateUpdateRequest(size));
1344+
var operation = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings);
1345+
1346+
var result = ExecuteOperation(operation, binding, async: false);
1347+
1348+
result.RequestCount.Should().Be(requestSizes.Length);
1349+
var commandStartedEvents = eventCapturer.Events.OfType<CommandStartedEvent>().ToList();
1350+
var actualBatchCounts = commandStartedEvents.Select(e => e.Command["updates"].AsBsonArray.Count).ToList();
1351+
actualBatchCounts.Should().Equal(expectedBatchCounts);
1352+
}
1353+
}
1354+
}
1355+
1356+
// private methods
1357+
private DeleteRequest CreateDeleteRequest(int size)
1358+
{
1359+
var filter = new BsonDocument("filler", "");
1360+
var requestDocument = new BsonDocument
1361+
{
1362+
{ "q", filter },
1363+
{ "limit", 1 }
1364+
};
1365+
var fillerSize = size - requestDocument.ToBson().Length;
1366+
if (fillerSize > 0)
1367+
{
1368+
filter["filler"] = new string('x', fillerSize);
1369+
}
1370+
return new DeleteRequest(filter);
1371+
}
1372+
1373+
private BsonDocument CreateDocument(int id, int size)
1374+
{
1375+
var document = new BsonDocument { { "_id", id }, { "filler", "" } };
1376+
var fillerSize = size - document.ToBson().Length;
1377+
if (fillerSize > 0)
1378+
{
1379+
document["filler"] = new string('x', fillerSize);
1380+
}
1381+
return document;
1382+
}
1383+
1384+
private UpdateRequest CreateUpdateRequest(int size)
1385+
{
1386+
var filter = new BsonDocument("filler", "");
1387+
var update = new BsonDocument("$set", new BsonDocument("x", 1));
1388+
var requestDocument = new BsonDocument
1389+
{
1390+
{ "q", filter },
1391+
{ "u", update }
1392+
};
1393+
var fillerSize = size - requestDocument.ToBson().Length;
1394+
if (fillerSize > 0)
1395+
{
1396+
filter["filler"] = new string('x', fillerSize);
1397+
}
1398+
return new UpdateRequest(UpdateType.Update, filter, update);
12651399
}
12661400

12671401
private void EnsureTestData()
@@ -1275,5 +1409,12 @@ private void EnsureTestData()
12751409
BsonDocument.Parse("{_id: 5, x: 2 }"),
12761410
BsonDocument.Parse("{_id: 6, x: 3 }"));
12771411
}
1412+
1413+
private List<BsonDocument> ReadAllFromCollection(IReadBinding binding)
1414+
{
1415+
var operation = new FindOperation<BsonDocument>(_collectionNamespace, BsonDocumentSerializer.Instance, _messageEncoderSettings);
1416+
var cursor = ExecuteOperation(operation, binding, false);
1417+
return ReadCursorToEnd(cursor);
1418+
}
12781419
}
12791420
}

0 commit comments

Comments
 (0)