Skip to content

Commit 8a20b80

Browse files
committed
JAVA-2041: Respect ReadConcern in DBCollection and DBCursor
1 parent dca4c5c commit 8a20b80

File tree

6 files changed

+452
-14
lines changed

6 files changed

+452
-14
lines changed

driver/src/main/com/mongodb/DB.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public class DB {
7272
private final Codec<DBObject> commandCodec;
7373
private volatile ReadPreference readPreference;
7474
private volatile WriteConcern writeConcern;
75+
private volatile ReadConcern readConcern;
7576

7677
DB(final Mongo mongo, final String name, final OperationExecutor executor) {
7778
if (!isValidName(name)) {
@@ -146,6 +147,28 @@ public WriteConcern getWriteConcern() {
146147
return writeConcern != null ? writeConcern : mongo.getWriteConcern();
147148
}
148149

150+
/**
151+
* Sets the read concern for this database.
152+
*
153+
* @param readConcern the read concern to use for this collection
154+
* @since 3.2
155+
* @mongodb.server.release 3.2
156+
*/
157+
void setReadConcern(final ReadConcern readConcern) {
158+
this.readConcern = readConcern;
159+
}
160+
161+
/**
162+
* Get the read concern for this database.
163+
*
164+
* @return the {@link com.mongodb.ReadConcern}
165+
* @since 3.2
166+
* @mongodb.server.release 3.2
167+
*/
168+
ReadConcern getReadConcern() {
169+
return readConcern != null ? readConcern : mongo.getReadConcern();
170+
}
171+
149172
/**
150173
* <p>Gets a collection with a given name. If the collection does not exist, a new collection is created.</p>
151174
*

driver/src/main/com/mongodb/DBCollection.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public class DBCollection {
121121
private final Bytes.OptionHolder optionHolder;
122122
private volatile ReadPreference readPreference;
123123
private volatile WriteConcern writeConcern;
124+
private volatile ReadConcern readConcern;
124125
private List<DBObject> hintFields;
125126
private DBEncoderFactory encoderFactory;
126127
private DBDecoderFactory decoderFactory;
@@ -743,7 +744,7 @@ public DBObject findOne(final DBObject query, final DBObject projection, final R
743744
*/
744745
public DBObject findOne(final DBObject query, final DBObject projection, final DBObject sort,
745746
final ReadPreference readPreference) {
746-
return findOne(query, projection, sort, readPreference, 0, MILLISECONDS);
747+
return findOne(query, projection, sort, readPreference, getReadConcern(), 0, MILLISECONDS);
747748
}
748749

749750
/**
@@ -753,16 +754,19 @@ public DBObject findOne(final DBObject query, final DBObject projection, final D
753754
* @param projection specifies which projection MongoDB will return from the documents in the result set.
754755
* @param sort A document whose fields specify the attributes on which to sort the result set.
755756
* @param readPreference {@code ReadPreference} to be used for this operation
757+
* @param readConcern {@code ReadConcern} to be used for this operation
756758
* @param maxTime the maximum time that the server will allow this operation to execute before killing it
757759
* @param maxTimeUnit the unit that maxTime is specified in
758760
* @return A document that satisfies the query specified as the argument to this method.
759761
* @mongodb.driver.manual tutorial/query-documents/ Querying
760762
* @since 2.12.0
761763
*/
762764
DBObject findOne(final DBObject query, final DBObject projection, final DBObject sort,
763-
final ReadPreference readPreference, final long maxTime, final TimeUnit maxTimeUnit) {
765+
final ReadPreference readPreference, final ReadConcern readConcern,
766+
final long maxTime, final TimeUnit maxTimeUnit) {
764767
FindOperation<DBObject> operation = new FindOperation<DBObject>(getNamespace(),
765768
objectCodec)
769+
.readConcern(readConcern)
766770
.projection(wrapAllowNull(projection))
767771
.sort(wrapAllowNull(sort))
768772
.limit(-1)
@@ -924,16 +928,18 @@ public long getCount(final DBObject query, final DBObject projection, final long
924928
*/
925929
public long getCount(final DBObject query, final DBObject projection, final long limit, final long skip,
926930
final ReadPreference readPreference) {
927-
return getCount(query, projection, limit, skip, readPreference, 0, MILLISECONDS);
931+
return getCount(query, limit, skip, readPreference, getReadConcern(), 0, MILLISECONDS);
928932
}
929933

930-
long getCount(final DBObject query, final DBObject projection, final long limit, final long skip,
931-
final ReadPreference readPreference, final long maxTime, final TimeUnit maxTimeUnit) {
932-
return getCount(query, projection, limit, skip, readPreference, maxTime, maxTimeUnit, null);
934+
long getCount(final DBObject query, final long limit, final long skip,
935+
final ReadPreference readPreference, final ReadConcern readConcern,
936+
final long maxTime, final TimeUnit maxTimeUnit) {
937+
return getCount(query, limit, skip, readPreference, readConcern, maxTime, maxTimeUnit, null);
933938
}
934939

935-
long getCount(final DBObject query, final DBObject projection, final long limit, final long skip,
936-
final ReadPreference readPreference, final long maxTime, final TimeUnit maxTimeUnit,
940+
long getCount(final DBObject query, final long limit, final long skip,
941+
final ReadPreference readPreference, final ReadConcern readConcern,
942+
final long maxTime, final TimeUnit maxTimeUnit,
937943
final BsonValue hint) {
938944

939945
if (limit > Integer.MAX_VALUE) {
@@ -945,6 +951,7 @@ long getCount(final DBObject query, final DBObject projection, final long limit,
945951
}
946952

947953
CountOperation operation = new CountOperation(getNamespace())
954+
.readConcern(readConcern)
948955
.hint(hint)
949956
.skip(skip)
950957
.limit(limit)
@@ -1105,7 +1112,9 @@ public List distinct(final String fieldName, final DBObject query) {
11051112
@SuppressWarnings("unchecked")
11061113
public List distinct(final String fieldName, final DBObject query, final ReadPreference readPreference) {
11071114
return new OperationIterable<BsonValue>(new DistinctOperation<BsonValue>(getNamespace(), fieldName,
1108-
new BsonValueCodec()).filter(wrap(query)),
1115+
new BsonValueCodec())
1116+
.readConcern(getReadConcern())
1117+
.filter(wrap(query)),
11091118
readPreference, executor).map(new Function<BsonValue, Object>() {
11101119
@Override
11111120
public Object apply(final BsonValue bsonValue) {
@@ -1186,7 +1195,7 @@ public MapReduceOutput mapReduce(final MapReduceCommand command) {
11861195
new BsonJavaScript(command.getMap()),
11871196
new BsonJavaScript(command.getReduce()),
11881197
getDefaultDBObjectCodec());
1189-
1198+
operation.readConcern(getReadConcern());
11901199
operation.filter(wrapAllowNull(command.getQuery()));
11911200
operation.limit(command.getLimit());
11921201
operation.maxTime(command.getMaxTime(MILLISECONDS), MILLISECONDS);
@@ -1360,6 +1369,7 @@ private Cursor aggregate(final List<? extends DBObject> pipeline, final Aggregat
13601369
}
13611370
} else {
13621371
AggregateOperation<DBObject> operation = new AggregateOperation<DBObject>(getNamespace(), stages, objectCodec)
1372+
.readConcern(getReadConcern())
13631373
.maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS)
13641374
.allowDiskUse(options.getAllowDiskUse())
13651375
.batchSize(options.getBatchSize())
@@ -1416,6 +1426,7 @@ public List<Cursor> parallelScan(final ParallelScanOptions options) {
14161426
ParallelCollectionScanOperation<DBObject> operation = new ParallelCollectionScanOperation<DBObject>(getNamespace(),
14171427
options.getNumCursors(),
14181428
objectCodec)
1429+
.readConcern(getReadConcern())
14191430
.batchSize(options.getBatchSize());
14201431
List<BatchCursor<DBObject>> mongoCursors = executor.execute(operation,
14211432
options.getReadPreference() != null ? options.getReadPreference()
@@ -1871,6 +1882,32 @@ public void setReadPreference(final ReadPreference preference) {
18711882
this.readPreference = preference;
18721883
}
18731884

1885+
/**
1886+
* Sets the read concern for this collection.
1887+
*
1888+
* @param readConcern the read concern to use for this collection
1889+
* @since 3.2
1890+
* @mongodb.server.release 3.2
1891+
*/
1892+
void setReadConcern(final ReadConcern readConcern) {
1893+
this.readConcern = readConcern;
1894+
}
1895+
1896+
/**
1897+
* Get the read concern for this collection.
1898+
*
1899+
* @return the {@link com.mongodb.ReadConcern}
1900+
* @since 3.2
1901+
* @mongodb.server.release 3.2
1902+
*/
1903+
ReadConcern getReadConcern() {
1904+
if (readConcern != null) {
1905+
return readConcern;
1906+
}
1907+
return database.getReadConcern();
1908+
}
1909+
1910+
18741911
/**
18751912
* Makes this query ok to run on a slave node
18761913
*

driver/src/main/com/mongodb/DBCursor.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public class DBCursor implements Cursor, Iterable<DBObject> {
6868
private final FindOptions findOptions;
6969
private int options;
7070
private ReadPreference readPreference;
71+
private ReadConcern readConcern;
7172
private Decoder<DBObject> resultDecoder;
7273
private DBDecoderFactory decoderFactory;
7374
private IteratorOrArray iteratorOrArray;
@@ -489,6 +490,7 @@ public DBObject explain() {
489490

490491
private FindOperation<DBObject> getQueryOperation(final FindOptions options, final Decoder<DBObject> decoder) {
491492
FindOperation<DBObject> operation = new FindOperation<DBObject>(collection.getNamespace(), decoder)
493+
.readConcern(getReadConcern())
492494
.filter(collection.wrapAllowNull(filter))
493495
.batchSize(options.getBatchSize())
494496
.skip(options.getSkip())
@@ -665,7 +667,7 @@ public List<DBObject> toArray(final int max) {
665667
* @see DBCursor#size
666668
*/
667669
public int count() {
668-
return (int) collection.getCount(getQuery(), getKeysWanted(), 0, 0, getReadPreferenceForCursor(),
670+
return (int) collection.getCount(getQuery(), 0, 0, getReadPreferenceForCursor(), getReadConcern(),
669671
findOptions.getMaxTime(MILLISECONDS), MILLISECONDS,
670672
collection.wrap(modifiers).get("$hint"));
671673
}
@@ -678,7 +680,7 @@ public int count() {
678680
*/
679681
public DBObject one() {
680682
return collection.findOne(getQuery(), getKeysWanted(), sort,
681-
getReadPreferenceForCursor(), findOptions.getMaxTime(MILLISECONDS), MILLISECONDS);
683+
getReadPreferenceForCursor(), getReadConcern(), findOptions.getMaxTime(MILLISECONDS), MILLISECONDS);
682684
}
683685

684686
/**
@@ -720,8 +722,8 @@ public int itcount() {
720722
* @see #count()
721723
*/
722724
public int size() {
723-
return (int) collection.getCount(getQuery(), getKeysWanted(), findOptions.getLimit(),
724-
findOptions.getSkip(), getReadPreference(),
725+
return (int) collection.getCount(getQuery(), findOptions.getLimit(),
726+
findOptions.getSkip(), getReadPreference(), getReadConcern(),
725727
findOptions.getMaxTime(MILLISECONDS), MILLISECONDS);
726728
}
727729

@@ -781,6 +783,33 @@ public ReadPreference getReadPreference() {
781783
return readPreference;
782784
}
783785

786+
787+
/**
788+
* Sets the read concern for this collection.
789+
*
790+
* @param readConcern the read concern to use for this collection
791+
* @since 3.2
792+
* @mongodb.server.release 3.2
793+
*/
794+
DBCursor setReadConcern(final ReadConcern readConcern) {
795+
this.readConcern = readConcern;
796+
return this;
797+
}
798+
799+
/**
800+
* Get the read concern for this collection.
801+
*
802+
* @return the {@link com.mongodb.ReadConcern}
803+
* @since 3.2
804+
* @mongodb.server.release 3.2
805+
*/
806+
ReadConcern getReadConcern() {
807+
if (readConcern != null) {
808+
return readConcern;
809+
}
810+
return collection.getReadConcern();
811+
}
812+
784813
/**
785814
* Sets the factory that will be used create a {@code DBDecoder} that will be used to decode BSON documents into DBObject instances.
786815
*

0 commit comments

Comments
 (0)