Skip to content

Commit 9884550

Browse files
committed
Fixed unordered bulk write operation to keep sending batches to the server even after an error in the current batch.
JAVA-1541
1 parent 6b4061e commit 9884550

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

src/main/com/mongodb/DBCollectionImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,15 @@ public List<Cursor> parallelScan(final ParallelScanOptions options) {
128128

129129
@Override
130130
BulkWriteResult executeBulkWriteOperation(final boolean ordered, final List<WriteRequest> writeRequests,
131-
final WriteConcern writeConcern, DBEncoder encoder) {
131+
WriteConcern writeConcern, DBEncoder encoder) {
132132
isTrue("no operations", !writeRequests.isEmpty());
133133

134134
if (writeConcern == null) {
135135
throw new IllegalArgumentException("Write concern can not be null");
136136
}
137137

138+
writeConcern = writeConcern.continueOnError(!ordered);
139+
138140
if (encoder == null) {
139141
encoder = DefaultDBEncoder.FACTORY.create();
140142
}

src/test/com/mongodb/BulkWriteOperationSpecification.groovy

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,7 @@ class BulkWriteOperationSpecification extends FunctionalSpecification {
781781
ordered << [true, false]
782782
}
783783

784+
784785
// just need to check one case here, since the others are checked above
785786
def 'should throw IllegalStateException when already executed with write concern'() {
786787
given:
@@ -826,6 +827,44 @@ class BulkWriteOperationSpecification extends FunctionalSpecification {
826827
ordered << [true, false]
827828
}
828829

830+
def 'should continue to execute batches after a failure if writes are unordered'() {
831+
given:
832+
collection.insert(new BasicDBObject('_id', 0))
833+
collection.insert(new BasicDBObject('_id', 1000))
834+
835+
when:
836+
def operation = initializeBulkOperation(false)
837+
for (int i = 0; i < 2000; i++) {
838+
operation.insert(new BasicDBObject('_id', i))
839+
}
840+
operation.execute(WriteConcern.ACKNOWLEDGED)
841+
842+
then:
843+
def ex = thrown(BulkWriteException)
844+
ex.writeErrors.size() == 2
845+
ex.getWriteResult().getInsertedCount() == 1998
846+
collection.count() == 2000
847+
}
848+
849+
def 'should stop executing batches after a failure if writes are ordered'() {
850+
given:
851+
collection.insert(new BasicDBObject('_id', 500))
852+
collection.insert(new BasicDBObject('_id', 1500))
853+
854+
when:
855+
def operation = initializeBulkOperation(true)
856+
for (int i = 0; i < 2000; i++) {
857+
operation.insert(new BasicDBObject('_id', i))
858+
}
859+
operation.execute(WriteConcern.ACKNOWLEDGED)
860+
861+
then:
862+
def ex = thrown(BulkWriteException)
863+
ex.writeErrors.size() == 1
864+
ex.getWriteResult().getInsertedCount() == 500
865+
collection.count() == 502
866+
}
867+
829868
private static void addWritesToOperation(BulkWriteOperation operation) {
830869
operation.find(new BasicDBObject('_id', 1)).updateOne(new BasicDBObject('$set', new BasicDBObject('x', 2)))
831870
operation.find(new BasicDBObject('_id', 2)).updateOne(new BasicDBObject('$set', new BasicDBObject('x', 3)))

0 commit comments

Comments
 (0)