Skip to content

Commit befb3cf

Browse files
committed
Fixed race condition between threads attempting to get the result of an unacknowledged write after the fact and threads attempting to do write commands.
The race was cause by a lack of synchronization on DPPort in the write command path, and a failure to update DBPort.usageCount on that path. JAVA-1613
1 parent c8a7c84 commit befb3cf

File tree

2 files changed

+38
-42
lines changed

2 files changed

+38
-42
lines changed

src/main/com/mongodb/DBCollectionImpl.java

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -463,31 +463,39 @@ private BulkWriteResult updateWithCommandProtocol(final List<ModifyRequest> upda
463463
return writeWithCommandProtocol(port, UPDATE, message, writeConcern);
464464
}
465465

466-
private BulkWriteResult writeWithCommandProtocol(final DBPort port, final WriteRequest.Type type, BaseWriteCommandMessage message,
466+
private BulkWriteResult writeWithCommandProtocol(final DBPort port, final WriteRequest.Type type, final BaseWriteCommandMessage message,
467467
final WriteConcern writeConcern) {
468-
int batchNum = 0;
469-
int currentRangeStartIndex = 0;
470-
BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner(port.getAddress(), writeConcern);
471-
do {
472-
batchNum++;
473-
BaseWriteCommandMessage nextMessage = sendWriteCommandMessage(message, batchNum, port);
474-
int itemCount = nextMessage != null ? message.getItemCount() - nextMessage.getItemCount() : message.getItemCount();
475-
IndexMap indexMap = IndexMap.create(currentRangeStartIndex, itemCount);
476-
CommandResult commandResult = receiveWriteCommandMessage(port);
477-
if (willTrace() && nextMessage != null || batchNum > 1) {
478-
getLogger().fine(format("Received response for batch %d", batchNum));
479-
}
468+
return db.getConnector().doOperation(db, port, new DBPort.Operation<BulkWriteResult>() {
469+
@Override
470+
public BulkWriteResult execute() throws IOException {
471+
BaseWriteCommandMessage curMessage = message;
472+
int batchNum = 0;
473+
int currentRangeStartIndex = 0;
474+
BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner(port.getAddress(), writeConcern);
475+
do {
476+
batchNum++;
477+
BaseWriteCommandMessage nextMessage = sendWriteCommandMessage(curMessage, batchNum, port);
478+
int itemCount = nextMessage != null ? curMessage.getItemCount() - nextMessage.getItemCount()
479+
: curMessage.getItemCount();
480+
IndexMap indexMap = IndexMap.create(currentRangeStartIndex, itemCount);
481+
CommandResult commandResult = receiveWriteCommandMessage(port);
482+
if (willTrace() && nextMessage != null || batchNum > 1) {
483+
getLogger().fine(format("Received response for batch %d", batchNum));
484+
}
480485

481-
if (hasError(commandResult)) {
482-
bulkWriteBatchCombiner.addErrorResult(getBulkWriteException(type, commandResult), indexMap);
483-
} else {
484-
bulkWriteBatchCombiner.addResult(getBulkWriteResult(type, commandResult), indexMap);
486+
if (hasError(commandResult)) {
487+
bulkWriteBatchCombiner.addErrorResult(getBulkWriteException(type, commandResult), indexMap);
488+
} else {
489+
bulkWriteBatchCombiner.addResult(getBulkWriteResult(type, commandResult), indexMap);
490+
}
491+
currentRangeStartIndex += itemCount;
492+
curMessage = nextMessage;
493+
} while (curMessage != null && !bulkWriteBatchCombiner.shouldStopSendingMoreBatches());
494+
495+
return bulkWriteBatchCombiner.getResult();
485496
}
486-
currentRangeStartIndex += itemCount;
487-
message = nextMessage;
488-
} while (message != null && !bulkWriteBatchCombiner.shouldStopSendingMoreBatches());
497+
});
489498

490-
return bulkWriteBatchCombiner.getResult();
491499
}
492500

493501
private boolean useWriteCommands(final WriteConcern concern, final DBPort port) {
@@ -513,38 +521,26 @@ private MongoNamespace getNamespace() {
513521
}
514522

515523
private BaseWriteCommandMessage sendWriteCommandMessage(final BaseWriteCommandMessage message, final int batchNum,
516-
final DBPort port) {
524+
final DBPort port) throws IOException {
517525
final PoolOutputBuffer buffer = new PoolOutputBuffer();
518526
try {
519527
BaseWriteCommandMessage nextMessage = message.encode(buffer);
520528
if (nextMessage != null || batchNum > 1) {
521529
getLogger().fine(format("Sending batch %d", batchNum));
522530
}
523-
db.getConnector().doOperation(getDB(), port, new DBPort.Operation<Void>() {
524-
@Override
525-
public Void execute() throws IOException {
526-
buffer.pipe(port.getOutputStream());
527-
return null;
528-
}
529-
});
531+
buffer.pipe(port.getOutputStream());
530532
return nextMessage;
531533
} finally {
532534
buffer.reset();
533535
}
534536
}
535537

536-
private CommandResult receiveWriteCommandMessage(final DBPort port) {
537-
return db.getConnector().doOperation(getDB(), port, new DBPort.Operation<CommandResult>() {
538-
@Override
539-
public CommandResult execute() throws IOException {
540-
Response response = new Response(port.getAddress(), null, port.getInputStream(),
541-
DefaultDBDecoder.FACTORY.create());
542-
CommandResult writeCommandResult = new CommandResult(port.getAddress());
543-
writeCommandResult.putAll(response.get(0));
544-
writeCommandResult.throwOnError();
545-
return writeCommandResult;
546-
}
547-
});
538+
private CommandResult receiveWriteCommandMessage(final DBPort port) throws IOException {
539+
Response response = new Response(port.getAddress(), null, port.getInputStream(), DefaultDBDecoder.FACTORY.create());
540+
CommandResult writeCommandResult = new CommandResult(port.getAddress());
541+
writeCommandResult.putAll(response.get(0));
542+
writeCommandResult.throwOnError();
543+
return writeCommandResult;
548544
}
549545

550546

src/main/com/mongodb/DBTCPConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public WriteResult execute() throws IOException {
204204
<T> T doOperation(final DB db, final DBPort port, final DBPort.Operation<T> operation){
205205
try {
206206
port.checkAuth( db.getMongo() );
207-
return operation.execute();
207+
return port.doOperation(operation);
208208
}
209209
catch ( MongoException re ){
210210
throw re;

0 commit comments

Comments
 (0)