17
17
package com .mongodb .async .client ;
18
18
19
19
20
+ import com .mongodb .MongoBulkWriteException ;
20
21
import com .mongodb .MongoException ;
21
22
import com .mongodb .ReadConcern ;
22
23
import com .mongodb .ReadConcernLevel ;
23
24
import com .mongodb .ReadPreference ;
24
25
import com .mongodb .WriteConcern ;
25
26
import com .mongodb .async .FutureResultCallback ;
27
+ import com .mongodb .bulk .BulkWriteError ;
26
28
import com .mongodb .bulk .BulkWriteResult ;
27
29
import com .mongodb .bulk .BulkWriteUpsert ;
28
30
import com .mongodb .client .model .BulkWriteOptions ;
61
63
import java .lang .reflect .InvocationTargetException ;
62
64
import java .lang .reflect .Method ;
63
65
import java .util .ArrayList ;
66
+ import java .util .Collections ;
64
67
import java .util .List ;
65
68
66
69
import static java .lang .String .format ;
@@ -145,7 +148,8 @@ BsonDocument toResult(final UpdateResult updateResult) {
145
148
return toResult (resultDoc );
146
149
}
147
150
148
- BsonDocument toResult (final BulkWriteResult bulkWriteResult , final List <WriteModel <BsonDocument >> writeModels ) {
151
+ BsonDocument toResult (final BulkWriteResult bulkWriteResult , final List <? extends WriteModel <BsonDocument >> writeModels ,
152
+ final List <BulkWriteError > writeErrors ) {
149
153
150
154
BsonDocument resultDoc = new BsonDocument ();
151
155
if (bulkWriteResult .wasAcknowledged ()) {
@@ -155,8 +159,7 @@ BsonDocument toResult(final BulkWriteResult bulkWriteResult, final List<WriteMod
155
159
BsonDocument insertedIds = new BsonDocument ();
156
160
for (int i = 0 ; i < writeModels .size (); i ++) {
157
161
WriteModel <BsonDocument > cur = writeModels .get (i );
158
- // TODO: Any need to suport InsertManyModel, and if so, how to represent it?
159
- if (cur instanceof InsertOneModel ) {
162
+ if (cur instanceof InsertOneModel && writeSuccessful (i , writeErrors )) {
160
163
InsertOneModel <BsonDocument > insertOneModel = (InsertOneModel <BsonDocument >) cur ;
161
164
insertedIds .put (Integer .toString (i ), insertOneModel .getDocument ().get ("_id" ));
162
165
}
@@ -179,6 +182,15 @@ BsonDocument toResult(final BulkWriteResult bulkWriteResult, final List<WriteMod
179
182
return toResult (resultDoc );
180
183
}
181
184
185
+ private boolean writeSuccessful (final int index , final List <BulkWriteError > writeErrors ) {
186
+ for (BulkWriteError cur : writeErrors ) {
187
+ if (cur .getIndex () == index ) {
188
+ return false ;
189
+ }
190
+ }
191
+ return true ;
192
+ }
193
+
182
194
BsonDocument toResult (@ Nullable final BsonValue results ) {
183
195
return new BsonDocument ("result" , results != null ? results : BsonNull .VALUE );
184
196
}
@@ -485,27 +497,40 @@ BsonDocument getInsertManyResult(final BsonDocument collectionOptions, final Bso
485
497
documents .add (document .asDocument ());
486
498
}
487
499
FutureResultCallback <Void > futureResultCallback = new FutureResultCallback <Void >();
488
- if (clientSession == null ) {
489
- getCollection (collectionOptions )
490
- .insertMany (documents ,
491
- new InsertManyOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
492
- .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
493
- futureResultCallback );
494
- } else {
495
- getCollection (collectionOptions )
496
- .insertMany (clientSession ,
497
- documents ,
498
- new InsertManyOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
499
- .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
500
- futureResultCallback );
501
- }
502
- futureResult (futureResultCallback );
503
500
504
- BsonDocument insertedIds = new BsonDocument ();
505
- for (int i = 0 ; i < documents .size (); i ++) {
506
- insertedIds .put (Integer .toString (i ), documents .get (i ).get ("_id" ));
501
+ try {
502
+ if (clientSession == null ) {
503
+ getCollection (collectionOptions )
504
+ .insertMany (documents ,
505
+ new InsertManyOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
506
+ .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
507
+ futureResultCallback );
508
+ } else {
509
+ getCollection (collectionOptions )
510
+ .insertMany (clientSession ,
511
+ documents ,
512
+ new InsertManyOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
513
+ .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
514
+ futureResultCallback );
515
+ }
516
+ futureResult (futureResultCallback );
517
+
518
+ BsonDocument insertedIds = new BsonDocument ();
519
+ for (int i = 0 ; i < documents .size (); i ++) {
520
+ insertedIds .put (Integer .toString (i ), documents .get (i ).get ("_id" ));
521
+ }
522
+ return toResult (new BsonDocument ("insertedIds" , insertedIds ));
523
+ } catch (MongoBulkWriteException e ) {
524
+ // Test results are expecting this to look just like bulkWrite error, so translate to InsertOneModel so the result
525
+ // translation code can be reused.
526
+ List <InsertOneModel <BsonDocument >> writeModels = new ArrayList <InsertOneModel <BsonDocument >>();
527
+ for (BsonValue document : arguments .getArray ("documents" )) {
528
+ writeModels .add (new InsertOneModel <BsonDocument >(document .asDocument ()));
529
+ }
530
+ BsonDocument result = toResult (e .getWriteResult (), writeModels , e .getWriteErrors ());
531
+ result .put ("error" , BsonBoolean .TRUE );
532
+ return result ;
507
533
}
508
- return toResult (new BsonDocument ("insertedIds" , insertedIds ));
509
534
}
510
535
511
536
BsonDocument getReplaceOneResult (final BsonDocument collectionOptions , final BsonDocument arguments ,
@@ -603,33 +628,41 @@ BsonDocument getBulkWriteResult(final BsonDocument collectionOptions, final Bson
603
628
requestArguments .getDocument ("update" ),
604
629
getUpdateOptions (requestArguments )));
605
630
} else if (name .equals ("deleteOne" )) {
606
- writeModels .add (new DeleteOneModel <BsonDocument >(requestArguments .getDocument ("filter" )));
631
+ writeModels .add (new DeleteOneModel <BsonDocument >(requestArguments .getDocument ("filter" ),
632
+ getDeleteOptions (requestArguments )));
607
633
} else if (name .equals ("deleteMany" )) {
608
- writeModels .add (new DeleteManyModel <BsonDocument >(requestArguments .getDocument ("filter" )));
634
+ writeModels .add (new DeleteManyModel <BsonDocument >(requestArguments .getDocument ("filter" ),
635
+ getDeleteOptions (requestArguments )));
609
636
} else if (name .equals ("replaceOne" )) {
610
637
writeModels .add (new ReplaceOneModel <BsonDocument >(requestArguments .getDocument ("filter" ),
611
- requestArguments .getDocument ("replacement" )));
638
+ requestArguments .getDocument ("replacement" ), getReplaceOptions ( requestArguments ) ));
612
639
} else {
613
640
throw new UnsupportedOperationException (format ("Unsupported write request type: %s" , name ));
614
641
}
615
642
}
616
643
617
- FutureResultCallback <BulkWriteResult > futureResultCallback = new FutureResultCallback <BulkWriteResult >();
618
- if (clientSession == null ) {
619
- getCollection (collectionOptions ).withWriteConcern (writeConcern )
620
- .bulkWrite (writeModels ,
621
- new BulkWriteOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
622
- .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
623
- futureResultCallback );
624
- } else {
625
- getCollection (collectionOptions ).withWriteConcern (writeConcern )
626
- .bulkWrite (clientSession ,
627
- writeModels ,
628
- new BulkWriteOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
629
- .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
630
- futureResultCallback );
644
+ try {
645
+ FutureResultCallback <BulkWriteResult > futureResultCallback = new FutureResultCallback <BulkWriteResult >();
646
+ if (clientSession == null ) {
647
+ getCollection (collectionOptions ).withWriteConcern (writeConcern )
648
+ .bulkWrite (writeModels ,
649
+ new BulkWriteOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
650
+ .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
651
+ futureResultCallback );
652
+ } else {
653
+ getCollection (collectionOptions ).withWriteConcern (writeConcern )
654
+ .bulkWrite (clientSession ,
655
+ writeModels ,
656
+ new BulkWriteOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
657
+ .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
658
+ futureResultCallback );
659
+ }
660
+ return toResult (futureResult (futureResultCallback ), writeModels , Collections .<BulkWriteError >emptyList ());
661
+ } catch (MongoBulkWriteException e ) {
662
+ BsonDocument result = toResult (e .getWriteResult (), writeModels , e .getWriteErrors ());
663
+ result .put ("error" , BsonBoolean .TRUE );
664
+ return result ;
631
665
}
632
- return toResult (futureResult (futureResultCallback ), writeModels );
633
666
}
634
667
635
668
Collation getCollation (final BsonDocument bsonCollation ) {
@@ -675,6 +708,28 @@ private UpdateOptions getUpdateOptions(final BsonDocument requestArguments) {
675
708
if (requestArguments .containsKey ("arrayFilters" )) {
676
709
options .arrayFilters (getArrayFilters (requestArguments .getArray ("arrayFilters" )));
677
710
}
711
+ if (requestArguments .containsKey ("collation" )) {
712
+ options .collation (getCollation (requestArguments .getDocument ("collation" )));
713
+ }
714
+ return options ;
715
+ }
716
+
717
+ private DeleteOptions getDeleteOptions (final BsonDocument requestArguments ) {
718
+ DeleteOptions options = new DeleteOptions ();
719
+ if (requestArguments .containsKey ("collation" )) {
720
+ options .collation (getCollation (requestArguments .getDocument ("collation" )));
721
+ }
722
+ return options ;
723
+ }
724
+
725
+ private ReplaceOptions getReplaceOptions (final BsonDocument requestArguments ) {
726
+ ReplaceOptions options = new ReplaceOptions ();
727
+ if (requestArguments .containsKey ("upsert" )) {
728
+ options .upsert (true );
729
+ }
730
+ if (requestArguments .containsKey ("collation" )) {
731
+ options .collation (getCollation (requestArguments .getDocument ("collation" )));
732
+ }
678
733
return options ;
679
734
}
680
735
0 commit comments