44
44
import static java .util .Arrays .asList ;
45
45
import static org .bson .util .Assertions .isTrue ;
46
46
47
+ @ SuppressWarnings ("deprecation" )
47
48
class DBCollectionImpl extends DBCollection {
48
49
private final DBApiLayer db ;
49
50
private final String namespace ;
@@ -166,7 +167,6 @@ protected WriteResult insert(List<DBObject> list, boolean shouldApply , WriteCon
166
167
if (encoder == null )
167
168
encoder = DefaultDBEncoder .FACTORY .create ();
168
169
169
-
170
170
if ( willTrace () ) {
171
171
for (DBObject o : list ) {
172
172
trace ("save: " + namespace + " " + JSON .serialize (o ));
@@ -377,6 +377,10 @@ public void createIndex(final DBObject keys, final DBObject options, DBEncoder e
377
377
private BulkWriteResult insertWithCommandProtocol (final List <DBObject > list , final WriteConcern writeConcern ,
378
378
final DBEncoder encoder ,
379
379
final DBPort port ) {
380
+ for (DBObject o : list ) {
381
+ _checkObject (o , false , false );
382
+ }
383
+
380
384
BaseWriteCommandMessage message = new InsertCommandMessage (getNamespace (), writeConcern , list ,
381
385
DefaultDBEncoder .FACTORY .create (), encoder ,
382
386
getMessageSettings (port .getAddress ()));
@@ -483,6 +487,10 @@ public CommandResult execute() throws IOException {
483
487
484
488
private WriteResult insertWithWriteProtocol (final List <DBObject > list , final WriteConcern concern , final DBEncoder encoder ,
485
489
final DBPort port ) {
490
+ for (DBObject o : list ) {
491
+ _checkObject (o , false , false );
492
+ }
493
+
486
494
WriteResult last = null ;
487
495
488
496
int cur = 0 ;
@@ -534,15 +542,13 @@ private Logger getLogger() {
534
542
535
543
private class OrderedRunGenerator implements Iterable <Run > {
536
544
private final List <WriteRequest > writeRequests ;
537
- private final DBPort port ;
538
545
private final WriteConcern writeConcern ;
539
546
private final DBEncoder encoder ;
540
547
private final int maxBatchWriteSize ;
541
548
542
549
public OrderedRunGenerator (final List <WriteRequest > writeRequests , final WriteConcern writeConcern , final DBEncoder encoder ,
543
550
final DBPort port ) {
544
551
this .writeRequests = writeRequests ;
545
- this .port = port ;
546
552
this .writeConcern = writeConcern .continueOnError (false );
547
553
this .encoder = encoder ;
548
554
this .maxBatchWriteSize = db .getConnector ().getServerDescription (port .getAddress ()).getMaxWriteBatchSize ();
@@ -590,15 +596,13 @@ public void remove() {
590
596
591
597
private class UnorderedRunGenerator implements Iterable <Run > {
592
598
private final List <WriteRequest > writeRequests ;
593
- private final DBPort port ;
594
599
private final WriteConcern writeConcern ;
595
600
private final DBEncoder encoder ;
596
601
private final int maxBatchWriteSize ;
597
602
598
603
public UnorderedRunGenerator (final List <WriteRequest > writeRequests , final WriteConcern writeConcern ,
599
604
final DBEncoder encoder , final DBPort port ) {
600
605
this .writeRequests = writeRequests ;
601
- this .port = port ;
602
606
this .writeConcern = writeConcern .continueOnError (true );
603
607
this .encoder = encoder ;
604
608
this .maxBatchWriteSize = db .getConnector ().getServerDescription (port .getAddress ()).getMaxWriteBatchSize ();
@@ -707,6 +711,14 @@ private List<ModifyRequest> getWriteRequestsAsModifyRequests() {
707
711
}
708
712
709
713
BulkWriteResult executeUpdates (final List <ModifyRequest > updateRequests , final DBPort port ) {
714
+ for (ModifyRequest request : updateRequests ) {
715
+ for (String key : request .getUpdateDocument ().keySet ()) {
716
+ if (!key .startsWith ("$" )) {
717
+ throw new IllegalArgumentException ("Update document keys must start with $: " + key );
718
+ }
719
+ }
720
+ }
721
+
710
722
return new RunExecutor (port ) {
711
723
@ Override
712
724
BulkWriteResult executeWriteCommandProtocol () {
@@ -728,6 +740,10 @@ WriteRequest.Type getType() {
728
740
}
729
741
730
742
BulkWriteResult executeReplaces (final List <ModifyRequest > replaceRequests , final DBPort port ) {
743
+ for (ModifyRequest request : replaceRequests ) {
744
+ _checkObject (request .getUpdateDocument (), false , false );
745
+ }
746
+
731
747
return new RunExecutor (port ) {
732
748
@ Override
733
749
BulkWriteResult executeWriteCommandProtocol () {
0 commit comments