Skip to content

Commit 317c431

Browse files
committed
JAVA-2716: Use the message start position, rather that the command start position,
as the basis for determining whether the max message size has been exceeded.
1 parent 3e41a5e commit 317c431

File tree

6 files changed

+147
-27
lines changed

6 files changed

+147
-27
lines changed

driver-core/src/main/com/mongodb/connection/BsonWriterHelper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,17 @@ static void writeElements(final BsonWriter writer, final List<BsonElement> bsonE
4545
}
4646

4747
static void writePayloadArray(final BsonWriter writer, final BsonOutput bsonOutput, final MessageSettings settings,
48-
final int commandStartPosition, final SplittablePayload payload) {
48+
final int messageStartPosition, final SplittablePayload payload) {
4949
writer.writeStartArray(payload.getPayloadName());
50-
writePayload(writer, bsonOutput, getDocumentMessageSettings(settings), commandStartPosition, payload);
50+
writePayload(writer, bsonOutput, getDocumentMessageSettings(settings), messageStartPosition, payload);
5151
writer.writeEndArray();
5252
}
5353

5454
static void writePayload(final BsonWriter writer, final BsonOutput bsonOutput, final MessageSettings settings,
55-
final int commandStartPosition, final SplittablePayload payload) {
55+
final int messageStartPosition, final SplittablePayload payload) {
5656
MessageSettings payloadSettings = getPayloadMessageSettings(payload.getPayloadType(), settings);
5757
for (int i = 0; i < payload.getPayload().size(); i++) {
58-
if (writeDocument(writer, bsonOutput, payloadSettings, payload.getPayload().get(i), commandStartPosition, i + 1)) {
58+
if (writeDocument(writer, bsonOutput, payloadSettings, payload.getPayload().get(i), messageStartPosition, i + 1)) {
5959
payload.setPosition(i + 1);
6060
} else {
6161
break;
@@ -69,10 +69,10 @@ static void writePayload(final BsonWriter writer, final BsonOutput bsonOutput, f
6969
}
7070

7171
private static boolean writeDocument(final BsonWriter writer, final BsonOutput bsonOutput, final MessageSettings settings,
72-
final BsonDocument document, final int startPosition, final int batchItemCount) {
72+
final BsonDocument document, final int messageStartPosition, final int batchItemCount) {
7373
int currentPosition = bsonOutput.getPosition();
7474
getCodec(document).encode(writer, document, ENCODER_CONTEXT);
75-
int messageSize = bsonOutput.getPosition() - startPosition;
75+
int messageSize = bsonOutput.getPosition() - messageStartPosition;
7676
int documentSize = bsonOutput.getPosition() - currentPosition;
7777
if (exceedsLimits(settings, messageSize, documentSize, batchItemCount)) {
7878
bsonOutput.truncateToPosition(currentPosition);

driver-core/src/main/com/mongodb/connection/CommandMessage.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ ReadPreference getReadPreference() {
111111

112112
@Override
113113
protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOutput, final SessionContext sessionContext) {
114+
int messageStartPosition = bsonOutput.getPosition() - MESSAGE_PROLOGUE_LENGTH;
114115
int commandStartPosition;
115116
if (useOpMsg()) {
116117
int flagPosition = bsonOutput.getPosition();
@@ -126,7 +127,7 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu
126127
bsonOutput.writeInt32(0); // size
127128
bsonOutput.writeCString(payload.getPayloadName());
128129
writePayload(new BsonBinaryWriter(bsonOutput, payloadFieldNameValidator), bsonOutput, getSettings(),
129-
commandStartPosition, payload);
130+
messageStartPosition, payload);
130131

131132
int payloadLength = bsonOutput.getPosition() - payloadPosition;
132133
bsonOutput.writeInt32(payloadPosition, payloadLength);
@@ -145,7 +146,7 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu
145146
if (payload == null) {
146147
addDocument(getCommandToEncode(), bsonOutput, commandFieldNameValidator, null);
147148
} else {
148-
addDocumentWithPayload(bsonOutput);
149+
addDocumentWithPayload(bsonOutput, messageStartPosition);
149150
}
150151
}
151152
return new EncodingMetadata(commandStartPosition);
@@ -157,11 +158,11 @@ private FieldNameValidator getPayloadArrayFieldNameValidator() {
157158
return new MappedFieldNameValidator(commandFieldNameValidator, rootMap);
158159
}
159160

160-
private void addDocumentWithPayload(final BsonOutput bsonOutput) {
161+
private void addDocumentWithPayload(final BsonOutput bsonOutput, final int messageStartPosition) {
161162
BsonBinaryWriter bsonBinaryWriter = new BsonBinaryWriter(bsonOutput, getPayloadArrayFieldNameValidator());
162163
BsonWriter bsonWriter = payload == null
163164
? bsonBinaryWriter
164-
: new SplittablePayloadBsonWriter(bsonBinaryWriter, bsonOutput, getSettings(), payload);
165+
: new SplittablePayloadBsonWriter(bsonBinaryWriter, bsonOutput, messageStartPosition, getSettings(), payload);
165166
BsonDocument commandToEncode = getCommandToEncode();
166167
getCodec(commandToEncode).encode(bsonWriter, commandToEncode, EncoderContext.builder().build());
167168
}

driver-core/src/main/com/mongodb/connection/RequestMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ abstract class RequestMessage {
4444

4545
static final AtomicInteger REQUEST_ID = new AtomicInteger(1);
4646

47+
static final int MESSAGE_PROLOGUE_LENGTH = 16;
48+
4749
// Allow an extra 16K to the maximum allowed size of a query or command document, so that, for example,
4850
// a 16M document can be upserted via findAndModify
4951
private static final int DOCUMENT_HEADROOM = 16 * 1024;

driver-core/src/main/com/mongodb/connection/SplittablePayloadBsonWriter.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,29 +28,27 @@ class SplittablePayloadBsonWriter extends LevelCountingBsonWriter {
2828
private final BsonOutput bsonOutput;
2929
private final SplittablePayload payload;
3030
private final MessageSettings settings;
31-
private int commandStartPosition;
31+
private final int messageStartPosition;
3232

33-
SplittablePayloadBsonWriter(final BsonBinaryWriter writer, final BsonOutput bsonOutput, final MessageSettings settings,
34-
final SplittablePayload payload) {
33+
SplittablePayloadBsonWriter(final BsonBinaryWriter writer, final BsonOutput bsonOutput, final int messageStartPosition,
34+
final MessageSettings settings, final SplittablePayload payload) {
3535
super(writer);
3636
this.writer = writer;
3737
this.bsonOutput = bsonOutput;
38+
this.messageStartPosition = messageStartPosition;
3839
this.settings = settings;
3940
this.payload = payload;
4041
}
4142

4243
@Override
4344
public void writeStartDocument() {
4445
super.writeStartDocument();
45-
if (getCurrentLevel() == 0) {
46-
commandStartPosition = bsonOutput.getPosition();
47-
}
4846
}
4947

5048
@Override
5149
public void writeEndDocument() {
5250
if (getCurrentLevel() == 0 && payload.getPayload().size() > 0) {
53-
writePayloadArray(writer, bsonOutput, settings, commandStartPosition, payload);
51+
writePayloadArray(writer, bsonOutput, settings, messageStartPosition, payload);
5452
}
5553
super.writeEndDocument();
5654
}

driver-core/src/test/functional/com/mongodb/operation/MixedBulkWriteOperationSpecification.groovy

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,47 @@ class MixedBulkWriteOperationSpecification extends OperationFunctionalSpecificat
490490
async << [true, false]
491491
}
492492

493+
@Category(Slow)
494+
def 'when documents together are just below the max message size, the documents are still inserted'() {
495+
given:
496+
def bsonBinary = new BsonBinary(new byte[16 * 1000 * 1000 - (getCollectionName().length() + 33)])
497+
def operation = new MixedBulkWriteOperation(getNamespace(),
498+
[
499+
new InsertRequest(new BsonDocument('_id', new BsonObjectId()).append('b', bsonBinary)),
500+
new InsertRequest(new BsonDocument('_id', new BsonObjectId()).append('b', bsonBinary)),
501+
new InsertRequest(new BsonDocument('_id', new BsonObjectId()).append('b', bsonBinary))
502+
],
503+
true, ACKNOWLEDGED, false)
504+
505+
when:
506+
BulkWriteResult result = execute(operation, true)
507+
508+
then:
509+
result == BulkWriteResult.acknowledged(INSERT, 3, [])
510+
getCollectionHelper().count() == 3
511+
}
512+
513+
@Category(Slow)
514+
def 'when documents together are just above the max message size, the documents are still inserted'() {
515+
given:
516+
def bsonBinary = new BsonBinary(new byte[16 * 1000 * 1000 - (getCollectionName().length() + 32)])
517+
def operation = new MixedBulkWriteOperation(getNamespace(),
518+
[
519+
new InsertRequest(new BsonDocument('_id', new BsonObjectId()).append('b', bsonBinary)),
520+
new InsertRequest(new BsonDocument('_id', new BsonObjectId()).append('b', bsonBinary)),
521+
new InsertRequest(new BsonDocument('_id', new BsonObjectId()).append('b', bsonBinary))
522+
],
523+
true, ACKNOWLEDGED, false)
524+
525+
when:
526+
BulkWriteResult result = execute(operation, true)
527+
528+
then:
529+
result == BulkWriteResult.acknowledged(INSERT, 3, [])
530+
getCollectionHelper().count() == 3
531+
}
532+
533+
493534
def 'should handle multi-length runs of ordered insert, update, replace, and remove'() {
494535
given:
495536
getCollectionHelper().insertDocuments(getTestInserts())

driver-core/src/test/unit/com/mongodb/connection/CommandMessageSpecification.groovy

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,92 @@ class CommandMessageSpecification extends Specification {
152152
]
153153
}
154154

155-
def 'should respect the message settings limits'() {
155+
def 'should respect the max message size'() {
156156
given:
157+
def maxMessageSize = 1024
158+
def messageSettings = MessageSettings.builder().maxMessageSize(maxMessageSize).serverVersion(new ServerVersion(3, 6)).build()
159+
def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonBinary(new byte[922])),
160+
new BsonDocument('b', new BsonBinary(new byte[450])),
161+
new BsonDocument('c', new BsonBinary(new byte[459])),
162+
new BsonDocument('b', new BsonBinary(new byte[450])),
163+
new BsonDocument('c', new BsonBinary(new byte[460]))])
164+
def message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
165+
false, payload, fieldNameValidator)
166+
def output = new BasicOutputBuffer()
167+
def sessionContext = Stub(SessionContext)
168+
169+
when:
170+
message.encode(output, sessionContext)
171+
def byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray()))
172+
def messageHeader = new MessageHeader(byteBuf, maxMessageSize)
173+
174+
then:
175+
messageHeader.opCode == OpCode.OP_MSG.value
176+
messageHeader.requestId < RequestMessage.currentGlobalId
177+
messageHeader.responseTo == 0
178+
messageHeader.messageLength == 1024
179+
byteBuf.getInt() == 0
180+
payload.getPosition() == 1
181+
payload.hasAnotherSplit()
182+
183+
when:
184+
payload = payload.getNextSplit()
185+
message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
186+
false, payload, fieldNameValidator)
187+
output.truncateToPosition(0)
188+
message.encode(output, sessionContext)
189+
byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray()))
190+
messageHeader = new MessageHeader(byteBuf, maxMessageSize)
191+
192+
then:
193+
messageHeader.opCode == OpCode.OP_MSG.value
194+
messageHeader.requestId < RequestMessage.currentGlobalId
195+
messageHeader.responseTo == 0
196+
messageHeader.messageLength == 1024
197+
byteBuf.getInt() == 0
198+
payload.getPosition() == 2
199+
payload.hasAnotherSplit()
200+
201+
when:
202+
payload = payload.getNextSplit()
203+
message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
204+
false, payload, fieldNameValidator)
205+
output.truncateToPosition(0)
206+
message.encode(output, sessionContext)
207+
byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray()))
208+
messageHeader = new MessageHeader(byteBuf, maxMessageSize)
209+
210+
then:
211+
messageHeader.opCode == OpCode.OP_MSG.value
212+
messageHeader.requestId < RequestMessage.currentGlobalId
213+
messageHeader.responseTo == 0
214+
messageHeader.messageLength == 552
215+
byteBuf.getInt() == 0
216+
payload.getPosition() == 1
217+
payload.hasAnotherSplit()
218+
219+
when:
220+
payload = payload.getNextSplit()
221+
message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
222+
false, payload, fieldNameValidator)
223+
output.truncateToPosition(0)
224+
message.encode(output, sessionContext)
225+
byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray()))
226+
messageHeader = new MessageHeader(byteBuf, maxMessageSize)
227+
228+
then:
229+
messageHeader.opCode == OpCode.OP_MSG.value
230+
messageHeader.requestId < RequestMessage.currentGlobalId
231+
messageHeader.responseTo == 0
232+
messageHeader.messageLength == 562
233+
byteBuf.getInt() == 1 << 1
234+
payload.getPosition() == 1
235+
!payload.hasAnotherSplit()
236+
}
237+
238+
def 'should respect the max batch count'() {
239+
given:
240+
def messageSettings = MessageSettings.builder().maxBatchCount(2).serverVersion(new ServerVersion(3, 6)).build()
157241
def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonBinary(new byte[900])),
158242
new BsonDocument('b', new BsonBinary(new byte[450])),
159243
new BsonDocument('c', new BsonBinary(new byte[450]))])
@@ -171,12 +255,12 @@ class CommandMessageSpecification extends Specification {
171255
messageHeader.opCode == OpCode.OP_MSG.value
172256
messageHeader.requestId < RequestMessage.currentGlobalId
173257
messageHeader.responseTo == 0
258+
messageHeader.messageLength == 1465
174259
byteBuf.getInt() == 0
175-
payload.getPosition() == pos1
260+
payload.getPosition() == 2
176261
payload.hasAnotherSplit()
177262

178263
when:
179-
def initialRequestId = messageHeader.requestId
180264
payload = payload.getNextSplit()
181265
message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
182266
false, payload, fieldNameValidator)
@@ -188,16 +272,10 @@ class CommandMessageSpecification extends Specification {
188272
then:
189273
messageHeader.opCode == OpCode.OP_MSG.value
190274
messageHeader.requestId < RequestMessage.currentGlobalId
191-
messageHeader.requestId > initialRequestId
192275
messageHeader.responseTo == 0
193276
byteBuf.getInt() == 1 << 1
194-
payload.getPosition() == pos2
277+
payload.getPosition() == 1
195278
!payload.hasAnotherSplit()
196-
197-
where:
198-
pos1 | pos2 | messageSettings
199-
1 | 2 | MessageSettings.builder().maxMessageSize(1024).serverVersion(new ServerVersion(3, 6)).build()
200-
2 | 1 | MessageSettings.builder().maxBatchCount(2).serverVersion(new ServerVersion(3, 6)).build()
201279
}
202280

203281
def 'should throw if payload document bigger than max document size'() {

0 commit comments

Comments
 (0)