Skip to content

Commit a7e0383

Browse files
committed
JAVA-2633: Remove batch splitting support for legacy writes since MixedBulkWriteOperation only ever sends a single write request
1 parent 332ffc7 commit a7e0383

25 files changed

+152
-664
lines changed

driver-core/src/main/com/mongodb/connection/AsyncConnection.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.ReadPreference;
21-
import com.mongodb.WriteConcern;
2221
import com.mongodb.WriteConcernResult;
2322
import com.mongodb.annotations.ThreadSafe;
2423
import com.mongodb.async.SingleResultCallback;
@@ -56,38 +55,32 @@ public interface AsyncConnection extends ReferenceCounted {
5655

5756
/**
5857
* Insert the documents using the insert wire protocol and apply the write concern asynchronously.
59-
*
6058
* @param namespace the namespace
6159
* @param ordered whether the writes are ordered
62-
* @param writeConcern the write concern
63-
* @param inserts the inserts
60+
* @param insertRequest the insert request
6461
* @param callback the callback to be passed the write result
6562
*/
66-
void insertAsync(MongoNamespace namespace, boolean ordered, WriteConcern writeConcern,
67-
List<InsertRequest> inserts, SingleResultCallback<WriteConcernResult> callback);
63+
void insertAsync(MongoNamespace namespace, boolean ordered, InsertRequest insertRequest,
64+
SingleResultCallback<WriteConcernResult> callback);
6865

6966
/**
7067
* Update the documents using the update wire protocol and apply the write concern asynchronously.
71-
*
7268
* @param namespace the namespace
7369
* @param ordered whether the writes are ordered
74-
* @param writeConcern the write concern
75-
* @param updates the updates
70+
* @param updateRequest the update request
7671
* @param callback the callback to be passed the write result
7772
*/
78-
void updateAsync(MongoNamespace namespace, boolean ordered, WriteConcern writeConcern, List<UpdateRequest> updates,
73+
void updateAsync(MongoNamespace namespace, boolean ordered, UpdateRequest updateRequest,
7974
SingleResultCallback<WriteConcernResult> callback);
8075

8176
/**
8277
* Delete the documents using the delete wire protocol and apply the write concern asynchronously.
83-
*
8478
* @param namespace the namespace
8579
* @param ordered whether the writes are ordered
86-
* @param writeConcern the write concern
87-
* @param deletes the deletes
80+
* @param deleteRequest the delete request
8881
* @param callback the callback to be passed the write result
8982
*/
90-
void deleteAsync(MongoNamespace namespace, boolean ordered, WriteConcern writeConcern, List<DeleteRequest> deletes,
83+
void deleteAsync(MongoNamespace namespace, boolean ordered, DeleteRequest deleteRequest,
9184
SingleResultCallback<WriteConcernResult> callback);
9285

9386
/**

driver-core/src/main/com/mongodb/connection/CommandMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu
120120
addDocumentWithPayload(bsonOutput);
121121
}
122122
}
123-
return new EncodingMetadata(null, commandStartPosition);
123+
return new EncodingMetadata(commandStartPosition);
124124
}
125125

126126
private FieldNameValidator getPayloadArrayFieldNameValidator() {

driver-core/src/main/com/mongodb/connection/CompressedMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu
4848

4949
compressor.compress(wrappedMessageBuffers, bsonOutput);
5050

51-
return new EncodingMetadata(null, 0);
51+
return new EncodingMetadata(0);
5252
}
5353

5454
private static int getWrappedMessageSize(final List<ByteBuf> wrappedMessageBuffers) {

driver-core/src/main/com/mongodb/connection/Connection.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.ReadPreference;
21-
import com.mongodb.WriteConcern;
2221
import com.mongodb.WriteConcernResult;
2322
import com.mongodb.annotations.ThreadSafe;
2423
import com.mongodb.binding.ReferenceCounted;
@@ -59,35 +58,30 @@ public interface Connection extends ReferenceCounted {
5958
*
6059
* @param namespace the namespace
6160
* @param ordered whether the writes are ordered
62-
* @param writeConcern the write concern
63-
* @param inserts the inserts
61+
* @param insertRequest the insert request
6462
* @return the write concern result
6563
*/
66-
WriteConcernResult insert(MongoNamespace namespace, boolean ordered, WriteConcern writeConcern, List<InsertRequest> inserts);
64+
WriteConcernResult insert(MongoNamespace namespace, boolean ordered, InsertRequest insertRequest);
6765

6866
/**
6967
* Update the documents using the update wire protocol and apply the write concern.
7068
*
7169
* @param namespace the namespace
7270
* @param ordered whether the writes are ordered
73-
* @param writeConcern the write concern
74-
* @param updates the updates
71+
* @param updateRequest the update request
7572
* @return the write concern result
7673
*/
77-
WriteConcernResult update(MongoNamespace namespace, boolean ordered, WriteConcern writeConcern,
78-
List<UpdateRequest> updates);
74+
WriteConcernResult update(MongoNamespace namespace, boolean ordered, UpdateRequest updateRequest);
7975

8076
/**
8177
* Delete the documents using the delete wire protocol and apply the write concern.
8278
*
8379
* @param namespace the namespace
8480
* @param ordered whether the writes are ordered
85-
* @param writeConcern the write concern
86-
* @param deletes the deletes
81+
* @param deleteRequest the delete request
8782
* @return the write concern result
8883
*/
89-
WriteConcernResult delete(MongoNamespace namespace, boolean ordered, WriteConcern writeConcern,
90-
List<DeleteRequest> deletes);
84+
WriteConcernResult delete(MongoNamespace namespace, boolean ordered, DeleteRequest deleteRequest);
9185

9286
/**
9387
* Execute the command.

driver-core/src/main/com/mongodb/connection/DefaultServerConnection.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.ReadPreference;
21-
import com.mongodb.WriteConcern;
2221
import com.mongodb.WriteConcernResult;
2322
import com.mongodb.async.SingleResultCallback;
2423
import com.mongodb.bulk.DeleteRequest;
@@ -72,39 +71,36 @@ public ConnectionDescription getDescription() {
7271
}
7372

7473
@Override
75-
public WriteConcernResult insert(final MongoNamespace namespace, final boolean ordered, final WriteConcern writeConcern,
76-
final List<InsertRequest> inserts) {
77-
return executeProtocol(new InsertProtocol(namespace, ordered, writeConcern, inserts));
74+
public WriteConcernResult insert(final MongoNamespace namespace, final boolean ordered, final InsertRequest insertRequest) {
75+
return executeProtocol(new InsertProtocol(namespace, ordered, insertRequest));
7876
}
7977

8078
@Override
81-
public void insertAsync(final MongoNamespace namespace, final boolean ordered, final WriteConcern writeConcern,
82-
final List<InsertRequest> inserts, final SingleResultCallback<WriteConcernResult> callback) {
83-
executeProtocolAsync(new InsertProtocol(namespace, ordered, writeConcern, inserts), callback);
79+
public void insertAsync(final MongoNamespace namespace, final boolean ordered, final InsertRequest insertRequest,
80+
final SingleResultCallback<WriteConcernResult> callback) {
81+
executeProtocolAsync(new InsertProtocol(namespace, ordered, insertRequest), callback);
8482
}
8583

8684
@Override
87-
public WriteConcernResult update(final MongoNamespace namespace, final boolean ordered, final WriteConcern writeConcern,
88-
final List<UpdateRequest> updates) {
89-
return executeProtocol(new UpdateProtocol(namespace, ordered, writeConcern, updates));
85+
public WriteConcernResult update(final MongoNamespace namespace, final boolean ordered, final UpdateRequest updateRequest) {
86+
return executeProtocol(new UpdateProtocol(namespace, ordered, updateRequest));
9087
}
9188

9289
@Override
93-
public void updateAsync(final MongoNamespace namespace, final boolean ordered, final WriteConcern writeConcern,
94-
final List<UpdateRequest> updates, final SingleResultCallback<WriteConcernResult> callback) {
95-
executeProtocolAsync(new UpdateProtocol(namespace, ordered, writeConcern, updates), callback);
90+
public void updateAsync(final MongoNamespace namespace, final boolean ordered, final UpdateRequest updateRequest,
91+
final SingleResultCallback<WriteConcernResult> callback) {
92+
executeProtocolAsync(new UpdateProtocol(namespace, ordered, updateRequest), callback);
9693
}
9794

9895
@Override
99-
public WriteConcernResult delete(final MongoNamespace namespace, final boolean ordered, final WriteConcern writeConcern,
100-
final List<DeleteRequest> deletes) {
101-
return executeProtocol(new DeleteProtocol(namespace, ordered, writeConcern, deletes));
96+
public WriteConcernResult delete(final MongoNamespace namespace, final boolean ordered, final DeleteRequest deleteRequest) {
97+
return executeProtocol(new DeleteProtocol(namespace, ordered, deleteRequest));
10298
}
10399

104100
@Override
105-
public void deleteAsync(final MongoNamespace namespace, final boolean ordered, final WriteConcern writeConcern,
106-
final List<DeleteRequest> deletes, final SingleResultCallback<WriteConcernResult> callback) {
107-
executeProtocolAsync(new DeleteProtocol(namespace, ordered, writeConcern, deletes), callback);
101+
public void deleteAsync(final MongoNamespace namespace, final boolean ordered, final DeleteRequest deleteRequest,
102+
final SingleResultCallback<WriteConcernResult> callback) {
103+
executeProtocolAsync(new DeleteProtocol(namespace, ordered, deleteRequest), callback);
108104
}
109105

110106
@Override

driver-core/src/main/com/mongodb/connection/DeleteMessage.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,21 @@
2020
import com.mongodb.internal.validator.NoOpFieldNameValidator;
2121
import org.bson.io.BsonOutput;
2222

23-
import java.util.List;
24-
2523
/**
2624
* An OP_DELETE message.
2725
*
2826
* @mongodb.driver.manual ../meta-driver/latest/legacy/mongodb-wire-protocol/#op-delete OP_DELETE
2927
*/
3028
class DeleteMessage extends LegacyMessage {
31-
private final List<DeleteRequest> deleteRequests;
29+
private final DeleteRequest deleteRequest;
3230

33-
DeleteMessage(final String collectionName, final List<DeleteRequest> deletes, final MessageSettings settings) {
31+
DeleteMessage(final String collectionName, final DeleteRequest deleteRequest, final MessageSettings settings) {
3432
super(collectionName, OpCode.OP_DELETE, settings);
35-
this.deleteRequests = deletes;
33+
this.deleteRequest = deleteRequest;
3634
}
3735

3836
@Override
3937
protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOutput) {
40-
DeleteRequest deleteRequest = deleteRequests.get(0);
4138
bsonOutput.writeInt32(0); // reserved
4239
bsonOutput.writeCString(getCollectionName());
4340

@@ -50,14 +47,7 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu
5047
int firstDocumentStartPosition = bsonOutput.getPosition();
5148

5249
addDocument(deleteRequest.getFilter(), bsonOutput, new NoOpFieldNameValidator());
53-
if (deleteRequests.size() == 1) {
54-
return new EncodingMetadata(null, firstDocumentStartPosition);
55-
} else {
56-
return new EncodingMetadata(new DeleteMessage(getCollectionName(), deleteRequests.subList(1, deleteRequests.size()),
57-
getSettings()),
58-
firstDocumentStartPosition);
59-
}
50+
return new EncodingMetadata(firstDocumentStartPosition);
6051
}
61-
6252
}
6353

driver-core/src/main/com/mongodb/connection/DeleteProtocol.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.mongodb.connection;
1818

1919
import com.mongodb.MongoNamespace;
20-
import com.mongodb.WriteConcern;
2120
import com.mongodb.WriteConcernResult;
2221
import com.mongodb.async.SingleResultCallback;
2322
import com.mongodb.bulk.DeleteRequest;
@@ -27,8 +26,6 @@
2726
import org.bson.BsonDocument;
2827
import org.bson.BsonInt32;
2928

30-
import java.util.List;
31-
3229
import static com.mongodb.connection.ByteBufBsonDocument.createOne;
3330
import static java.lang.String.format;
3431
import static java.util.Collections.singletonList;
@@ -41,12 +38,11 @@
4138
class DeleteProtocol extends WriteProtocol {
4239
private static final Logger LOGGER = Loggers.getLogger("protocol.delete");
4340

44-
private final List<DeleteRequest> deletes;
41+
private final DeleteRequest deleteRequest;
4542

46-
DeleteProtocol(final MongoNamespace namespace, final boolean ordered, final WriteConcern writeConcern,
47-
final List<DeleteRequest> deletes) {
48-
super(namespace, ordered, writeConcern);
49-
this.deletes = deletes;
43+
DeleteProtocol(final MongoNamespace namespace, final boolean ordered, final DeleteRequest deleteRequest) {
44+
super(namespace, ordered);
45+
this.deleteRequest = deleteRequest;
5046
}
5147

5248
@Override
@@ -86,13 +82,13 @@ public void onResult(final WriteConcernResult result, final Throwable t) {
8682
@Override
8783
protected BsonDocument getAsWriteCommand(final ByteBufferBsonOutput bsonOutput, final int firstDocumentPosition) {
8884
BsonDocument deleteDocument = new BsonDocument("q", createOne(bsonOutput, firstDocumentPosition))
89-
.append("limit", deletes.get(0).isMulti() ? new BsonInt32(0) : new BsonInt32(1));
85+
.append("limit", deleteRequest.isMulti() ? new BsonInt32(0) : new BsonInt32(1));
9086
return getBaseCommandDocument("delete").append("deletes", new BsonArray(singletonList(deleteDocument)));
9187
}
9288

9389
@Override
9490
protected RequestMessage createRequestMessage(final MessageSettings settings) {
95-
return new DeleteMessage(getNamespace().getFullName(), deletes, settings);
91+
return new DeleteMessage(getNamespace().getFullName(), deleteRequest, settings);
9692
}
9793

9894
@Override

driver-core/src/main/com/mongodb/connection/GetMoreMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public long getCursorId() {
4545
@Override
4646
protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOutput) {
4747
writeGetMore(bsonOutput);
48-
return new EncodingMetadata(null, bsonOutput.getPosition());
48+
return new EncodingMetadata(bsonOutput.getPosition());
4949
}
5050

5151
private void writeGetMore(final BsonOutput buffer) {

driver-core/src/main/com/mongodb/connection/InsertMessage.java

Lines changed: 6 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,70 +16,34 @@
1616

1717
package com.mongodb.connection;
1818

19-
import com.mongodb.WriteConcern;
2019
import com.mongodb.bulk.InsertRequest;
2120
import com.mongodb.internal.validator.CollectibleDocumentFieldNameValidator;
22-
import com.mongodb.internal.validator.NoOpFieldNameValidator;
23-
import org.bson.BsonDocument;
24-
import org.bson.FieldNameValidator;
2521
import org.bson.io.BsonOutput;
2622

27-
import java.util.List;
28-
2923
/**
3024
* An insert message.
3125
*
3226
* @mongodb.driver.manual ../meta-driver/latest/legacy/mongodb-wire-protocol/#op-insert OP_INSERT
3327
*/
3428
class InsertMessage extends LegacyMessage {
3529

36-
private final boolean ordered;
37-
private final WriteConcern writeConcern;
38-
private final List<InsertRequest> insertRequestList;
30+
private final InsertRequest insertRequest;
3931

40-
InsertMessage(final String collectionName, final boolean ordered, final WriteConcern writeConcern,
41-
final List<InsertRequest> insertRequestList, final MessageSettings settings) {
32+
InsertMessage(final String collectionName, final InsertRequest insertRequest, final MessageSettings settings) {
4233
super(collectionName, OpCode.OP_INSERT, settings);
43-
this.ordered = ordered;
44-
this.writeConcern = writeConcern;
45-
this.insertRequestList = insertRequestList;
46-
}
47-
48-
public List<InsertRequest> getInsertRequestList() {
49-
return insertRequestList;
34+
this.insertRequest = insertRequest;
5035
}
5136

5237
@Override
5338
protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput outputStream) {
5439
writeInsertPrologue(outputStream);
5540
int firstDocumentPosition = outputStream.getPosition();
56-
for (int i = 0; i < insertRequestList.size(); i++) {
57-
BsonDocument document = insertRequestList.get(i).getDocument();
58-
int pos = outputStream.getPosition();
59-
addCollectibleDocument(document, outputStream, createValidator());
60-
if (outputStream.getPosition() > getSettings().getMaxMessageSize()) {
61-
outputStream.truncateToPosition(pos);
62-
return new EncodingMetadata(new InsertMessage(getCollectionName(), ordered, writeConcern,
63-
insertRequestList.subList(i, insertRequestList.size()), getSettings()), firstDocumentPosition);
64-
}
65-
}
66-
return new EncodingMetadata(null, firstDocumentPosition);
67-
}
68-
69-
private FieldNameValidator createValidator() {
70-
if (getCollectionName().endsWith(".system.indexes")) {
71-
return new NoOpFieldNameValidator();
72-
} else {
73-
return new CollectibleDocumentFieldNameValidator();
74-
}
41+
addCollectibleDocument(insertRequest.getDocument(), outputStream, new CollectibleDocumentFieldNameValidator());
42+
return new EncodingMetadata(firstDocumentPosition);
7543
}
7644

7745
private void writeInsertPrologue(final BsonOutput outputStream) {
78-
int flags = 0;
79-
if (!ordered) {
80-
flags |= 1;
81-
}
82-
outputStream.writeInt32(flags);
46+
outputStream.writeInt32(0); // flags
8347
outputStream.writeCString(getCollectionName());
8448
}
8549
}

0 commit comments

Comments
 (0)