Skip to content

Commit d64f5c8

Browse files
committed
Fixed race condition between threads attempting to get the result of an unacknowledged write after the fact and threads attempting to do write commands.
The race was cause by a lack of synchronization on DPPort in the write command path, and a failure to update DBPort.usageCount on that path. JAVA-1613
1 parent 4346f9a commit d64f5c8

File tree

2 files changed

+38
-42
lines changed

2 files changed

+38
-42
lines changed

src/main/com/mongodb/DBCollectionImpl.java

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -419,31 +419,39 @@ private BulkWriteResult updateWithCommandProtocol(final List<ModifyRequest> upda
419419
return writeWithCommandProtocol(port, UPDATE, message, writeConcern);
420420
}
421421

422-
private BulkWriteResult writeWithCommandProtocol(final DBPort port, final WriteRequest.Type type, BaseWriteCommandMessage message,
422+
private BulkWriteResult writeWithCommandProtocol(final DBPort port, final WriteRequest.Type type, final BaseWriteCommandMessage message,
423423
final WriteConcern writeConcern) {
424-
int batchNum = 0;
425-
int currentRangeStartIndex = 0;
426-
BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner(port.getAddress(), writeConcern);
427-
do {
428-
batchNum++;
429-
BaseWriteCommandMessage nextMessage = sendWriteCommandMessage(message, batchNum, port);
430-
int itemCount = nextMessage != null ? message.getItemCount() - nextMessage.getItemCount() : message.getItemCount();
431-
IndexMap indexMap = IndexMap.create(currentRangeStartIndex, itemCount);
432-
CommandResult commandResult = receiveWriteCommandMessage(port);
433-
if (willTrace() && nextMessage != null || batchNum > 1) {
434-
getLogger().fine(format("Received response for batch %d", batchNum));
435-
}
424+
return db.getConnector().doOperation(db, port, new DBPort.Operation<BulkWriteResult>() {
425+
@Override
426+
public BulkWriteResult execute() throws IOException {
427+
BaseWriteCommandMessage curMessage = message;
428+
int batchNum = 0;
429+
int currentRangeStartIndex = 0;
430+
BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner(port.getAddress(), writeConcern);
431+
do {
432+
batchNum++;
433+
BaseWriteCommandMessage nextMessage = sendWriteCommandMessage(curMessage, batchNum, port);
434+
int itemCount = nextMessage != null ? curMessage.getItemCount() - nextMessage.getItemCount()
435+
: curMessage.getItemCount();
436+
IndexMap indexMap = IndexMap.create(currentRangeStartIndex, itemCount);
437+
CommandResult commandResult = receiveWriteCommandMessage(port);
438+
if (willTrace() && nextMessage != null || batchNum > 1) {
439+
getLogger().fine(format("Received response for batch %d", batchNum));
440+
}
436441

437-
if (hasError(commandResult)) {
438-
bulkWriteBatchCombiner.addErrorResult(getBulkWriteException(type, commandResult), indexMap);
439-
} else {
440-
bulkWriteBatchCombiner.addResult(getBulkWriteResult(type, commandResult), indexMap);
442+
if (hasError(commandResult)) {
443+
bulkWriteBatchCombiner.addErrorResult(getBulkWriteException(type, commandResult), indexMap);
444+
} else {
445+
bulkWriteBatchCombiner.addResult(getBulkWriteResult(type, commandResult), indexMap);
446+
}
447+
currentRangeStartIndex += itemCount;
448+
curMessage = nextMessage;
449+
} while (curMessage != null && !bulkWriteBatchCombiner.shouldStopSendingMoreBatches());
450+
451+
return bulkWriteBatchCombiner.getResult();
441452
}
442-
currentRangeStartIndex += itemCount;
443-
message = nextMessage;
444-
} while (message != null && !bulkWriteBatchCombiner.shouldStopSendingMoreBatches());
453+
});
445454

446-
return bulkWriteBatchCombiner.getResult();
447455
}
448456

449457
private boolean useWriteCommands(final WriteConcern concern, final DBPort port) {
@@ -469,38 +477,26 @@ private MongoNamespace getNamespace() {
469477
}
470478

471479
private BaseWriteCommandMessage sendWriteCommandMessage(final BaseWriteCommandMessage message, final int batchNum,
472-
final DBPort port) {
480+
final DBPort port) throws IOException {
473481
final PoolOutputBuffer buffer = new PoolOutputBuffer();
474482
try {
475483
BaseWriteCommandMessage nextMessage = message.encode(buffer);
476484
if (nextMessage != null || batchNum > 1) {
477485
getLogger().fine(format("Sending batch %d", batchNum));
478486
}
479-
db.getConnector().doOperation(getDB(), port, new DBPort.Operation<Void>() {
480-
@Override
481-
public Void execute() throws IOException {
482-
buffer.pipe(port.getOutputStream());
483-
return null;
484-
}
485-
});
487+
buffer.pipe(port.getOutputStream());
486488
return nextMessage;
487489
} finally {
488490
buffer.reset();
489491
}
490492
}
491493

492-
private CommandResult receiveWriteCommandMessage(final DBPort port) {
493-
return db.getConnector().doOperation(getDB(), port, new DBPort.Operation<CommandResult>() {
494-
@Override
495-
public CommandResult execute() throws IOException {
496-
Response response = new Response(port.getAddress(), null, port.getInputStream(),
497-
DefaultDBDecoder.FACTORY.create());
498-
CommandResult writeCommandResult = new CommandResult(port.getAddress());
499-
writeCommandResult.putAll(response.get(0));
500-
writeCommandResult.throwOnError();
501-
return writeCommandResult;
502-
}
503-
});
494+
private CommandResult receiveWriteCommandMessage(final DBPort port) throws IOException {
495+
Response response = new Response(port.getAddress(), null, port.getInputStream(), DefaultDBDecoder.FACTORY.create());
496+
CommandResult writeCommandResult = new CommandResult(port.getAddress());
497+
writeCommandResult.putAll(response.get(0));
498+
writeCommandResult.throwOnError();
499+
return writeCommandResult;
504500
}
505501

506502

src/main/com/mongodb/DBTCPConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public WriteResult execute() throws IOException {
204204
<T> T doOperation(final DB db, final DBPort port, final DBPort.Operation<T> operation){
205205
try {
206206
port.checkAuth( db.getMongo() );
207-
return operation.execute();
207+
return port.doOperation(operation);
208208
}
209209
catch ( MongoException re ){
210210
throw re;

0 commit comments

Comments
 (0)