Skip to content

Commit b295d2a

Browse files
committed
JAVA-2227: When using aggregate helper methods that take an AggregationOptions parameter, ensure that slaveOk bit is set for non-primary read preferences
1 parent 28d1dcc commit b295d2a

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

src/main/com/mongodb/DBCollectionImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public Cursor aggregate(final List<DBObject> pipeline, final AggregationOptions
9797

9898
DBObject last = pipeline.get(pipeline.size() - 1);
9999
String outCollection = (String) last.get("$out");
100-
ReadPreference appliedReadPreference = outCollection != null ? ReadPreference.primary() : readPreference;
100+
final ReadPreference appliedReadPreference = outCollection != null ? ReadPreference.primary() : readPreference;
101101

102102
final DBPort port = db.getConnector().getPort(appliedReadPreference);
103103
try {
@@ -106,7 +106,8 @@ public Cursor aggregate(final List<DBObject> pipeline, final AggregationOptions
106106
CommandResult res = db.getConnector().doOperation(db, port, new DBPort.Operation<CommandResult>() {
107107
@Override
108108
public CommandResult execute() throws IOException {
109-
return port.runCommand(db, command, db.getMongo().getMaxBsonObjectSize() + QUERY_DOCUMENT_HEADROOM);
109+
return port.runCommand(db, command, appliedReadPreference,
110+
db.getMongo().getMaxBsonObjectSize() + QUERY_DOCUMENT_HEADROOM);
110111
}
111112
});
112113

src/main/com/mongodb/DBPort.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,14 @@ synchronized CommandResult runCommand( DB db , DBObject cmd ) throws IOException
211211
}
212212

213213
synchronized CommandResult runCommand(final DB db, final DBObject cmd, final int maxBsonObjectSize) throws IOException {
214+
return runCommand(db, cmd, null, maxBsonObjectSize);
215+
}
216+
217+
synchronized CommandResult runCommand(final DB db, final DBObject cmd, final ReadPreference readPreference,
218+
final int maxBsonObjectSize) throws IOException {
214219
isTrue("open", !closed);
215-
OutMessage msg = OutMessage.query( db.getCollection("$cmd") , 0 , 0 , -1 , cmd , null, maxBsonObjectSize);
220+
OutMessage msg = OutMessage.query( db.getCollection("$cmd") , 0 , 0 , -1 , cmd, null, readPreference,
221+
DefaultDBEncoder.FACTORY.create(), maxBsonObjectSize);
216222
try {
217223
return convertToCommandResult(cmd, call(msg, db.getCollection("$cmd"), null));
218224
} finally {

src/test/com/mongodb/AggregationTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.mongodb.AggregationOptions.OutputMode;
2020
import com.mongodb.util.TestCase;
2121
import org.junit.Before;
22-
import org.junit.Ignore;
2322
import org.junit.Test;
2423

2524
import java.net.UnknownHostException;
@@ -188,7 +187,7 @@ public void testDollarOutOnSecondary() throws UnknownHostException {
188187
checkServerVersion(2.6);
189188
assumeTrue(isReplicaSet(cleanupMongo));
190189

191-
ServerAddress primary = new ServerAddress("localhost");
190+
ServerAddress primary = new ServerAddress(getPrimaryAsString(cleanupMongo));
192191
MongoClient rsClient = new MongoClient(getMongoClientURI());
193192
DB rsDatabase = rsClient.getDB(database.getName());
194193
DBCollection aggCollection = rsDatabase.getCollection(collection.getName());
@@ -205,25 +204,27 @@ public void testDollarOutOnSecondary() throws UnknownHostException {
205204
}
206205

207206
@Test
208-
@Ignore
209207
public void testAggregateOnSecondary() throws UnknownHostException {
210208
checkServerVersion(2.6);
211209
assumeTrue(isReplicaSet(cleanupMongo));
212210

213-
ServerAddress primary = new ServerAddress("localhost");
214-
ServerAddress secondary = new ServerAddress("localhost", 27018);
211+
ServerAddress primary = new ServerAddress(getPrimaryAsString(cleanupMongo));
212+
ServerAddress secondary = new ServerAddress(getASecondaryAsString(cleanupMongo));
215213
MongoClient rsClient = new MongoClient(asList(primary, secondary));
216214
DB rsDatabase = rsClient.getDB(database.getName());
217215
rsDatabase.dropDatabase();
218216
DBCollection aggCollection = rsDatabase.getCollection(collection.getName());
219217
aggCollection.drop();
220218

221-
final List<DBObject> pipeline = new ArrayList<DBObject>(prepareData());
219+
List<DBObject> pipeline = new ArrayList<DBObject>(prepareData());
222220
AggregationOptions options = AggregationOptions.builder()
223221
.outputMode(OutputMode.INLINE)
224222
.build();
225-
Cursor cursor = verify(pipeline, options, ReadPreference.secondary(), aggCollection);
223+
Cursor cursor = aggCollection.aggregate(pipeline, options, ReadPreference.secondary());
226224
assertNotEquals(primary, cursor.getServerAddress());
225+
226+
AggregationOutput aggregationOutput = aggCollection.aggregate(pipeline, ReadPreference.secondary());
227+
assertNotEquals(primary, aggregationOutput.getServerUsed());
227228
}
228229

229230
@Test

0 commit comments

Comments
 (0)