Skip to content

Commit 5064821

Browse files
author
rstam
committed
CSHARP-943: Split large batches correctly when InsertBatch is being emulated using write commands.
1 parent 10ecd7c commit 5064821

16 files changed

+92
-111
lines changed

MongoDB.Driver/MongoCollection.cs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ internal virtual BulkWriteResult BulkWrite(BulkWriteArgs args)
204204
{
205205
var assignId = args.AssignId ?? (_settings.AssignIdOnInsert ? (Action<InsertRequest>)AssignId : null);
206206
var checkElementNames = args.CheckElementNames ?? true;
207-
var maxBatchCount = args.MaxBatchCount ?? connection.ServerInstance.MaxBatchCount;
208-
var maxBatchLength = Math.Min(args.MaxBatchLength ?? int.MaxValue, connection.ServerInstance.MaxDocumentSize);
207+
var maxBatchCount = args.MaxBatchCount ?? int.MaxValue;
208+
var maxBatchLength = args.MaxBatchLength ?? int.MaxValue;
209209
var maxDocumentSize = connection.ServerInstance.MaxDocumentSize;
210210
var maxWireDocumentSize = connection.ServerInstance.MaxWireDocumentSize;
211211
var writeConcern = args.WriteConcern ?? _settings.WriteConcern;
@@ -1521,9 +1521,7 @@ public virtual IEnumerable<WriteConcernResult> InsertBatch(
15211521
var checkElementNames = options.CheckElementNames;
15221522
var isOrdered = ((options.Flags & InsertFlags.ContinueOnError) == 0);
15231523
var maxBatchCount = int.MaxValue;
1524-
var maxBatchLength = connection.ServerInstance.MaxMessageLength;
1525-
var maxDocumentSize = connection.ServerInstance.MaxDocumentSize;
1526-
var maxWireDocumentSize = connection.ServerInstance.MaxWireDocumentSize;
1524+
var maxBatchLength = int.MaxValue;
15271525
var requests = documents.Cast<object>().Select(document =>
15281526
{
15291527
return new InsertRequest(nominalType, document);
@@ -1537,8 +1535,6 @@ public virtual IEnumerable<WriteConcernResult> InsertBatch(
15371535
_database.Name,
15381536
maxBatchCount,
15391537
maxBatchLength,
1540-
maxDocumentSize,
1541-
maxWireDocumentSize,
15421538
isOrdered,
15431539
GetBinaryReaderSettings(),
15441540
requests,
@@ -1788,8 +1784,6 @@ public virtual WriteConcernResult Remove(IMongoQuery query, RemoveFlags flags, W
17881784
{
17891785
var maxBatchCount = 1;
17901786
var maxBatchLength = connection.ServerInstance.MaxDocumentSize;
1791-
var maxDocumentSize = connection.ServerInstance.MaxDocumentSize;
1792-
var maxWireDocumentSize = connection.ServerInstance.MaxWireDocumentSize;
17931787
var isOrdered = true;
17941788
var requests = new[]
17951789
{
@@ -1802,8 +1796,6 @@ public virtual WriteConcernResult Remove(IMongoQuery query, RemoveFlags flags, W
18021796
_database.Name,
18031797
maxBatchCount,
18041798
maxBatchLength,
1805-
maxDocumentSize,
1806-
maxWireDocumentSize,
18071799
isOrdered,
18081800
GetBinaryReaderSettings(),
18091801
requests,
@@ -2024,8 +2016,6 @@ public virtual WriteConcernResult Update(IMongoQuery query, IMongoUpdate update,
20242016
var checkElementNames = options.CheckElementNames;
20252017
var maxBatchCount = 1;
20262018
var maxBatchLength = connection.ServerInstance.MaxDocumentSize;
2027-
var maxDocumentSize = connection.ServerInstance.MaxDocumentSize;
2028-
var maxWireDocumentSize = connection.ServerInstance.MaxWireDocumentSize;
20292019
var isOrdered = true;
20302020
var requests = new[]
20312021
{
@@ -2043,8 +2033,6 @@ public virtual WriteConcernResult Update(IMongoQuery query, IMongoUpdate update,
20432033
_database.Name,
20442034
maxBatchCount,
20452035
maxBatchLength,
2046-
maxDocumentSize,
2047-
maxWireDocumentSize,
20482036
isOrdered,
20492037
GetBinaryReaderSettings(),
20502038
requests,

MongoDB.Driver/Operations/BulkDeleteOperation.cs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,29 +58,25 @@ public override BulkWriteResult Execute(MongoConnection connection)
5858
}
5959

6060
// protected methods
61-
protected override BatchSerializer CreateBatchSerializer()
61+
protected override BatchSerializer CreateBatchSerializer(int maxBatchCount, int maxBatchLength, int maxDocumentSize, int maxWireDocumentSize)
6262
{
63-
return new DeleteBatchSerializer(_args);
63+
return new DeleteBatchSerializer(maxBatchCount, maxBatchLength, maxDocumentSize, maxWireDocumentSize);
6464
}
6565

6666
// nested classes
6767
private class DeleteBatchSerializer : BatchSerializer
6868
{
69-
// private fields
70-
private readonly BulkDeleteOperationArgs _args;
71-
7269
// constructors
73-
public DeleteBatchSerializer(BulkDeleteOperationArgs args)
74-
: base(args)
70+
public DeleteBatchSerializer(int maxBatchCount, int maxBatchLength, int maxDocumentSize, int maxWireDocumentSize)
71+
: base(maxBatchCount, maxBatchLength, maxDocumentSize, maxWireDocumentSize)
7572
{
76-
_args = args;
7773
}
7874

7975
// protected methods
8076
protected override void SerializeRequest(BsonBinaryWriter bsonWriter, WriteRequest request)
8177
{
8278
var deleteRequest = (DeleteRequest)request;
83-
bsonWriter.PushMaxDocumentSize(_args.MaxDocumentSize);
79+
bsonWriter.PushMaxDocumentSize(MaxDocumentSize);
8480
bsonWriter.WriteStartDocument();
8581
bsonWriter.WriteName("q");
8682
BsonSerializer.Serialize(bsonWriter, deleteRequest.Query ?? new QueryDocument());

MongoDB.Driver/Operations/BulkDeleteOperationArgs.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ public BulkDeleteOperationArgs(
3030
string databaseName,
3131
int maxBatchCount,
3232
int maxBatchLength,
33-
int maxDocumentSize,
34-
int maxWireDocumentSize,
3533
bool isOrdered,
3634
BsonBinaryReaderSettings readerSettings,
3735
IEnumerable<DeleteRequest> requests,
@@ -42,8 +40,6 @@ public BulkDeleteOperationArgs(
4240
databaseName,
4341
maxBatchCount,
4442
maxBatchLength,
45-
maxDocumentSize,
46-
maxWireDocumentSize,
4743
isOrdered,
4844
readerSettings,
4945
requests.Cast<WriteRequest>(),

MongoDB.Driver/Operations/BulkDeleteOperationEmulator.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ protected override BulkWriteBatchResult EmulateSingleRequest(MongoConnection con
4242
_args.DatabaseName,
4343
1, // maxBatchCount
4444
serverInstance.MaxMessageLength, // maxBatchLength
45-
serverInstance.MaxDocumentSize,
46-
serverInstance.MaxWireDocumentSize,
4745
true, // isOrdered
4846
_args.ReaderSettings,
4947
deleteRequests,

MongoDB.Driver/Operations/BulkInsertOperation.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ public override BulkWriteResult Execute(MongoConnection connection)
6464
}
6565

6666
// protected methods
67-
protected override BatchSerializer CreateBatchSerializer()
67+
protected override BatchSerializer CreateBatchSerializer(int maxBatchCount, int maxBatchLength, int maxDocumentSize, int maxWireDocumentSize)
6868
{
69-
return new InsertBatchSerializer(_args);
69+
return new InsertBatchSerializer(maxBatchCount, maxBatchLength, maxDocumentSize, maxWireDocumentSize, _args.CheckElementNames);
7070
}
7171

7272
protected override IEnumerable<WriteRequest> DecorateRequests(IEnumerable<WriteRequest> requests)
@@ -85,15 +85,15 @@ protected override IEnumerable<WriteRequest> DecorateRequests(IEnumerable<WriteR
8585
private class InsertBatchSerializer : BatchSerializer
8686
{
8787
// private fields
88-
private readonly BulkInsertOperationArgs _args;
8988
private IBsonSerializer _cachedSerializer;
9089
private Type _cachedSerializerType;
90+
private readonly bool _checkElementNames;
9191

9292
// constructors
93-
public InsertBatchSerializer(BulkInsertOperationArgs args)
94-
: base(args)
93+
public InsertBatchSerializer(int maxBatchCount, int maxBatchLength, int maxDocumentSize, int maxWireDocumentSize, bool checkElementNames)
94+
: base(maxBatchCount, maxBatchLength, maxDocumentSize, maxWireDocumentSize)
9595
{
96-
_args = args;
96+
_checkElementNames = checkElementNames;
9797
}
9898

9999
// protected methods
@@ -127,8 +127,8 @@ protected override void SerializeRequest(BsonBinaryWriter bsonWriter, WriteReque
127127
var savedCheckElementNames = bsonWriter.CheckElementNames;
128128
try
129129
{
130-
bsonWriter.PushMaxDocumentSize(_args.MaxDocumentSize);
131-
bsonWriter.CheckElementNames = _args.CheckElementNames;
130+
bsonWriter.PushMaxDocumentSize(MaxDocumentSize);
131+
bsonWriter.CheckElementNames = _checkElementNames;
132132
serializer.Serialize(bsonWriter, insertRequest.NominalType, document, serializationOptions);
133133
}
134134
finally

MongoDB.Driver/Operations/BulkInsertOperationArgs.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ public BulkInsertOperationArgs(
3838
string databaseName,
3939
int maxBatchCount,
4040
int maxBatchLength,
41-
int maxDocumentSize,
42-
int maxWireDocumentSize,
4341
bool isOrdered,
4442
BsonBinaryReaderSettings readerSettings,
4543
IEnumerable<InsertRequest> requests,
@@ -50,8 +48,6 @@ public BulkInsertOperationArgs(
5048
databaseName,
5149
maxBatchCount,
5250
maxBatchLength,
53-
maxDocumentSize,
54-
maxWireDocumentSize,
5551
isOrdered,
5652
readerSettings,
5753
requests.Cast<WriteRequest>(),

MongoDB.Driver/Operations/BulkInsertOperationEmulator.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ protected override BulkWriteBatchResult EmulateSingleRequest(MongoConnection con
4545
_args.DatabaseName,
4646
1, // maxBatchCount
4747
serverInstance.MaxMessageLength, // maxBatchLength
48-
serverInstance.MaxDocumentSize,
49-
serverInstance.MaxWireDocumentSize,
5048
true, // isOrdered
5149
_args.ReaderSettings,
5250
insertRequests,

MongoDB.Driver/Operations/BulkMixedWriteOperation.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,6 @@ private BulkWriteResult ExecuteDeletes(MongoConnection connection, IEnumerable<D
143143
_databaseName,
144144
_maxBatchCount,
145145
_maxBatchLength,
146-
connection.ServerInstance.MaxDocumentSize,
147-
connection.ServerInstance.MaxWireDocumentSize,
148146
_isOrdered,
149147
_readerSettings,
150148
requests,
@@ -162,8 +160,6 @@ private BulkWriteResult ExecuteInserts(MongoConnection connection, IEnumerable<I
162160
_databaseName,
163161
_maxBatchCount,
164162
_maxBatchLength,
165-
_maxDocumentSize,
166-
_maxWireDocumentSize,
167163
_isOrdered,
168164
_readerSettings,
169165
requests,
@@ -180,8 +176,6 @@ private BulkWriteResult ExecuteUpdates(MongoConnection connection, IEnumerable<U
180176
_databaseName,
181177
_maxBatchCount,
182178
_maxBatchLength,
183-
_maxDocumentSize,
184-
_maxWireDocumentSize,
185179
_isOrdered,
186180
_readerSettings,
187181
requests,

MongoDB.Driver/Operations/BulkUnmixedWriteOperationBase.cs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public virtual BulkWriteResult Execute(MongoConnection connection)
7676
}
7777

7878
// protected methods
79-
protected abstract BatchSerializer CreateBatchSerializer();
79+
protected abstract BatchSerializer CreateBatchSerializer(int maxBatchCount, int maxBatchLength, int maxDocumentSize, int maxWireDocumentSize);
8080

8181
protected virtual IEnumerable<WriteRequest> DecorateRequests(IEnumerable<WriteRequest> requests)
8282
{
@@ -120,7 +120,12 @@ private CommandOperation<CommandResult> CreateWriteCommandOperation(IMongoComman
120120

121121
private BulkWriteBatchResult ExecuteBatch(MongoConnection connection, Batch<WriteRequest> batch, int originalIndex)
122122
{
123-
var batchSerializer = CreateBatchSerializer();
123+
var maxBatchCount = Math.Min(_args.MaxBatchCount, connection.ServerInstance.MaxBatchCount);
124+
var maxBatchLength = Math.Min(_args.MaxBatchLength, connection.ServerInstance.MaxDocumentSize);
125+
var maxDocumentSize = connection.ServerInstance.MaxDocumentSize;
126+
var maxWireDocumentSize = connection.ServerInstance.MaxWireDocumentSize;
127+
128+
var batchSerializer = CreateBatchSerializer(maxBatchCount, maxBatchLength, maxDocumentSize, maxWireDocumentSize);
124129
var writeCommand = CreateWriteCommand(batchSerializer, batch);
125130
var writeCommandOperation = CreateWriteCommandOperation(writeCommand);
126131
var writeCommandResult = writeCommandOperation.Execute(connection);
@@ -139,17 +144,23 @@ private BulkWriteBatchResult ExecuteBatch(MongoConnection connection, Batch<Writ
139144
protected abstract class BatchSerializer : BsonBaseSerializer
140145
{
141146
// private fields
142-
private readonly BulkWriteOperationArgs _args;
143147
private int _batchCount;
144148
private int _batchLength;
145149
private BatchProgress<WriteRequest> _batchProgress;
146150
private int _batchStartPosition;
147151
private int _lastRequestPosition;
152+
private readonly int _maxBatchCount;
153+
private readonly int _maxBatchLength;
154+
private readonly int _maxDocumentSize;
155+
private readonly int _maxWireDocumentSize;
148156

149157
// constructors
150-
public BatchSerializer(BulkWriteOperationArgs args)
158+
public BatchSerializer(int maxBatchCount, int maxBatchLength, int maxDocumentSize, int maxWireDocumentSize)
151159
{
152-
_args = args;
160+
_maxBatchCount = maxBatchCount;
161+
_maxBatchLength = maxBatchLength;
162+
_maxDocumentSize = maxDocumentSize;
163+
_maxWireDocumentSize = maxWireDocumentSize;
153164
}
154165

155166
// public properties
@@ -158,6 +169,27 @@ public BatchProgress<WriteRequest> BatchProgress
158169
get { return _batchProgress; }
159170
}
160171

172+
// protected properties
173+
protected int MaxBatchCount
174+
{
175+
get { return _maxBatchCount; }
176+
}
177+
178+
protected int MaxBatchLength
179+
{
180+
get { return _maxBatchLength; }
181+
}
182+
183+
protected int MaxDocumentSize
184+
{
185+
get { return _maxDocumentSize; }
186+
}
187+
188+
protected int MaxWireDocumentSize
189+
{
190+
get { return _maxWireDocumentSize; }
191+
}
192+
161193
// public methods
162194
public override void Serialize(BsonWriter bsonWriter, Type nominalType, object value, IBsonSerializationOptions options)
163195
{
@@ -174,20 +206,14 @@ public override void Serialize(BsonWriter bsonWriter, Type nominalType, object v
174206
continuationBatch.ClearPending(); // so pending objects can be garbage collected sooner
175207
}
176208

177-
var maxBatchLength = _args.MaxBatchLength;
178-
if (maxBatchLength > _args.MaxDocumentSize)
179-
{
180-
maxBatchLength = _args.MaxDocumentSize; // not MaxWireDocumentSize! leave room for overhead
181-
}
182-
183209
// always go one document too far so that we can set IsDone as early as possible
184210
var enumerator = batch.Enumerator;
185211
while (enumerator.MoveNext())
186212
{
187213
var request = enumerator.Current;
188214
AddRequest(bsonBinaryWriter, request);
189215

190-
if ((_batchCount > _args.MaxBatchCount || _batchLength > maxBatchLength) && _batchCount > 1)
216+
if ((_batchCount > _maxBatchCount || _batchLength > _maxBatchLength) && _batchCount > 1)
191217
{
192218
var serializedRequest = RemoveOverflow(bsonBinaryWriter.Buffer);
193219
var nextBatch = new ContinuationBatch<WriteRequest, IByteBuffer>(enumerator, request, serializedRequest);

MongoDB.Driver/Operations/BulkUpdateOperation.cs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,30 +58,26 @@ public override BulkWriteResult Execute(MongoConnection connection)
5858
}
5959

6060
// protected methods
61-
protected override BatchSerializer CreateBatchSerializer()
61+
protected override BatchSerializer CreateBatchSerializer(int maxBatchCount, int maxBatchLength, int maxDocumentSize, int maxWireDocumentSize)
6262
{
63-
return new UpdateBatchSerializer(_args);
63+
return new UpdateBatchSerializer(maxBatchCount, maxBatchLength, maxDocumentSize, maxWireDocumentSize);
6464
}
6565

6666
// nested classes
6767
private class UpdateBatchSerializer : BatchSerializer
6868
{
69-
// private fields
70-
private readonly BulkUpdateOperationArgs _args;
71-
7269
// constructors
73-
public UpdateBatchSerializer(BulkUpdateOperationArgs args)
74-
: base(args)
70+
public UpdateBatchSerializer(int maxBatchCount, int maxBatchLength, int maxDocumentSize, int maxWireDocumentSize)
71+
: base(maxBatchCount, maxBatchLength, maxDocumentSize, maxWireDocumentSize)
7572
{
76-
_args = args;
7773
}
7874

7975
// protected methods
8076
protected override void SerializeRequest(BsonBinaryWriter bsonWriter, WriteRequest request)
8177
{
8278
var updateRequest = (UpdateRequest)request;
8379

84-
bsonWriter.PushMaxDocumentSize(_args.MaxWireDocumentSize);
80+
bsonWriter.PushMaxDocumentSize(MaxWireDocumentSize);
8581
bsonWriter.WriteStartDocument();
8682
bsonWriter.WriteName("q");
8783
BsonSerializer.Serialize(bsonWriter, updateRequest.Query ?? new QueryDocument());

0 commit comments

Comments
 (0)