Skip to content

Commit cb5e586

Browse files
committed
JAVA-2737: Always set slaveOk for direct connections to replica set members
* for OP_QUERY (for MongoDB versions < 3.6), that means settings the slaveOk wire protocol bit. * for OP_MSG (for MongoDB versions >= 3.6), that means including a $readPreference field with mode of "primaryPreferred".
1 parent 1b5b59f commit cb5e586

File tree

8 files changed

+195
-50
lines changed

8 files changed

+195
-50
lines changed

driver-core/src/main/com/mongodb/connection/CommandMessage.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,12 @@
3838
import java.util.Map;
3939

4040
import static com.mongodb.ReadPreference.primary;
41+
import static com.mongodb.ReadPreference.primaryPreferred;
4142
import static com.mongodb.assertions.Assertions.isTrue;
4243
import static com.mongodb.connection.BsonWriterHelper.writePayload;
44+
import static com.mongodb.connection.ClusterConnectionMode.MULTIPLE;
45+
import static com.mongodb.connection.ClusterConnectionMode.SINGLE;
46+
import static com.mongodb.connection.ServerType.SHARD_ROUTER;
4347

4448
/**
4549
* A command message that uses OP_MSG or OP_QUERY to send the command.
@@ -52,15 +56,18 @@ final class CommandMessage extends RequestMessage {
5256
private final SplittablePayload payload;
5357
private final FieldNameValidator payloadFieldNameValidator;
5458
private final boolean responseExpected;
59+
private final ClusterConnectionMode clusterConnectionMode;
5560

5661
CommandMessage(final MongoNamespace namespace, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
5762
final ReadPreference readPreference, final MessageSettings settings) {
58-
this(namespace, command, commandFieldNameValidator, readPreference, settings, true, null, null);
63+
this(namespace, command, commandFieldNameValidator, readPreference, settings, true, null, null,
64+
MULTIPLE);
5965
}
6066

6167
CommandMessage(final MongoNamespace namespace, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
6268
final ReadPreference readPreference, final MessageSettings settings, final boolean responseExpected,
63-
final SplittablePayload payload, final FieldNameValidator payloadFieldNameValidator) {
69+
final SplittablePayload payload, final FieldNameValidator payloadFieldNameValidator,
70+
final ClusterConnectionMode clusterConnectionMode) {
6471
super(namespace.getFullName(), getOpCode(settings), settings);
6572
this.namespace = namespace;
6673
this.command = command;
@@ -69,6 +76,7 @@ final class CommandMessage extends RequestMessage {
6976
this.responseExpected = responseExpected;
7077
this.payload = payload;
7178
this.payloadFieldNameValidator = payloadFieldNameValidator;
79+
this.clusterConnectionMode = clusterConnectionMode;
7280
}
7381

7482
BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
@@ -102,7 +110,7 @@ boolean containsPayload() {
102110

103111
boolean isResponseExpected() {
104112
isTrue("The message must be encoded before determining if a response is expected", getEncodingMetadata() != null);
105-
return calculateIsResponseExpected();
113+
return !useOpMsg() || requireOpMsgResponse();
106114
}
107115

108116
ReadPreference getReadPreference() {
@@ -134,9 +142,9 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu
134142
}
135143

136144
// Write the flag bits
137-
bsonOutput.writeInt32(flagPosition, getFlagBits());
145+
bsonOutput.writeInt32(flagPosition, getOpMsgFlagBits());
138146
} else {
139-
bsonOutput.writeInt32(0);
147+
bsonOutput.writeInt32(getOpQueryFlagBits());
140148
bsonOutput.writeCString(namespace.getFullName());
141149
bsonOutput.writeInt32(0);
142150
bsonOutput.writeInt32(-1);
@@ -167,20 +175,44 @@ private void addDocumentWithPayload(final BsonOutput bsonOutput, final int messa
167175
getCodec(commandToEncode).encode(bsonWriter, commandToEncode, EncoderContext.builder().build());
168176
}
169177

170-
private int getFlagBits() {
171-
if (calculateIsResponseExpected()) {
178+
private int getOpMsgFlagBits() {
179+
return getOpMsgResponseExpectedFlagBit();
180+
}
181+
182+
private int getOpMsgResponseExpectedFlagBit() {
183+
if (requireOpMsgResponse()) {
172184
return 0;
173185
} else {
174186
return 1 << 1;
175187
}
176188
}
177189

178-
private boolean calculateIsResponseExpected() {
179-
// If there is another message in the payload require that the response is acknowledged
180-
if (!responseExpected && useOpMsg() && payload != null && payload.hasAnotherSplit()) {
190+
private boolean requireOpMsgResponse() {
191+
if (responseExpected) {
181192
return true;
193+
} else {
194+
return payload != null && payload.hasAnotherSplit();
182195
}
183-
return responseExpected;
196+
}
197+
198+
private int getOpQueryFlagBits() {
199+
return getOpQuerySlaveOkFlagBit();
200+
}
201+
202+
private int getOpQuerySlaveOkFlagBit() {
203+
if (isSlaveOk()) {
204+
return 1 << 2;
205+
} else {
206+
return 0;
207+
}
208+
}
209+
210+
private boolean isSlaveOk() {
211+
return readPreference.isSlaveOk() || isDirectConnectionToNonShardRouter();
212+
}
213+
214+
private boolean isDirectConnectionToNonShardRouter() {
215+
return clusterConnectionMode == SINGLE && getSettings().getServerType() != SHARD_ROUTER;
184216
}
185217

186218
private boolean useOpMsg() {
@@ -206,10 +238,13 @@ private List<BsonElement> getExtraElements(final SessionContext sessionContext)
206238
}
207239
if (!isDefaultReadPreference(getReadPreference())) {
208240
extraElements.add(new BsonElement("$readPreference", getReadPreference().toDocument()));
241+
} else if (isDirectConnectionToNonShardRouter()) {
242+
extraElements.add(new BsonElement("$readPreference", primaryPreferred().toDocument()));
209243
}
210244
return extraElements;
211245
}
212246

247+
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
213248
private boolean isDefaultReadPreference(final ReadPreference readPreference) {
214249
return readPreference.equals(primary());
215250
}

driver-core/src/main/com/mongodb/connection/CommandProtocolImpl.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,19 @@ class CommandProtocolImpl<T> implements CommandProtocol<T> {
4141
private final FieldNameValidator payloadFieldNameValidator;
4242
private final Decoder<T> commandResultDecoder;
4343
private final boolean responseExpected;
44+
private final ClusterConnectionMode clusterConnectionMode;
4445
private SessionContext sessionContext;
4546

4647
CommandProtocolImpl(final String database, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
4748
final ReadPreference readPreference, final Decoder<T> commandResultDecoder) {
48-
this(database, command, commandFieldNameValidator, readPreference, commandResultDecoder, true, null, null);
49+
this(database, command, commandFieldNameValidator, readPreference, commandResultDecoder, true, null, null,
50+
ClusterConnectionMode.MULTIPLE);
4951
}
5052

5153
CommandProtocolImpl(final String database, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
5254
final ReadPreference readPreference, final Decoder<T> commandResultDecoder, final boolean responseExpected,
53-
final SplittablePayload payload, final FieldNameValidator payloadFieldNameValidator) {
55+
final SplittablePayload payload, final FieldNameValidator payloadFieldNameValidator,
56+
final ClusterConnectionMode clusterConnectionMode) {
5457
notNull("database", database);
5558
this.namespace = new MongoNamespace(notNull("database", database), MongoNamespace.COMMAND_COLLECTION_NAME);
5659
this.command = notNull("command", command);
@@ -60,6 +63,7 @@ class CommandProtocolImpl<T> implements CommandProtocol<T> {
6063
this.responseExpected = responseExpected;
6164
this.payload = payload;
6265
this.payloadFieldNameValidator = payloadFieldNameValidator;
66+
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
6367

6468
isTrueArgument("payloadFieldNameValidator cannot be null if there is a payload.",
6569
payload == null || payloadFieldNameValidator != null);
@@ -112,7 +116,8 @@ public CommandProtocolImpl<T> sessionContext(final SessionContext sessionContext
112116

113117
private CommandMessage getCommandMessage(final InternalConnection connection) {
114118
return new CommandMessage(namespace, command, commandFieldNameValidator, readPreference,
115-
getMessageSettings(connection.getDescription()), responseExpected, payload, payloadFieldNameValidator);
119+
getMessageSettings(connection.getDescription()), responseExpected, payload,
120+
payloadFieldNameValidator, clusterConnectionMode);
116121
}
117122

118123
private String getCommandName() {

driver-core/src/main/com/mongodb/connection/DefaultServerConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public <T> T command(final String database, final BsonDocument command, final Fi
124124
final boolean responseExpected, final SplittablePayload payload,
125125
final FieldNameValidator payloadFieldNameValidator) {
126126
return executeProtocol(new CommandProtocolImpl<T>(database, command, commandFieldNameValidator, readPreference,
127-
commandResultDecoder, responseExpected, payload, payloadFieldNameValidator), sessionContext);
127+
commandResultDecoder, responseExpected, payload, payloadFieldNameValidator, clusterConnectionMode), sessionContext);
128128
}
129129

130130
@Override
@@ -149,7 +149,8 @@ public <T> void commandAsync(final String database, final BsonDocument command,
149149
final SessionContext sessionContext, final boolean responseExpected, final SplittablePayload payload,
150150
final FieldNameValidator payloadFieldNameValidator, final SingleResultCallback<T> callback) {
151151
executeProtocolAsync(new CommandProtocolImpl<T>(database, command, commandFieldNameValidator, readPreference,
152-
commandResultDecoder, responseExpected, payload, payloadFieldNameValidator), sessionContext, callback);
152+
commandResultDecoder, responseExpected, payload, payloadFieldNameValidator, clusterConnectionMode),
153+
sessionContext, callback);
153154
}
154155

155156
@Override

driver-core/src/main/com/mongodb/connection/MessageSettings.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ final class MessageSettings {
3434
private final int maxMessageSize;
3535
private final int maxBatchCount;
3636
private final ServerVersion serverVersion;
37+
private final ServerType serverType;
3738

3839
/**
3940
* Gets the builder
@@ -53,6 +54,7 @@ public static final class Builder {
5354
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
5455
private int maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
5556
private ServerVersion serverVersion;
57+
private ServerType serverType;
5658

5759
/**
5860
* Build it.
@@ -100,6 +102,11 @@ public Builder serverVersion(final ServerVersion serverVersion) {
100102
this.serverVersion = serverVersion;
101103
return this;
102104
}
105+
106+
public Builder serverType(final ServerType serverType) {
107+
this.serverType = serverType;
108+
return this;
109+
}
103110
}
104111

105112
/**
@@ -133,10 +140,15 @@ public ServerVersion getServerVersion() {
133140
return serverVersion;
134141
}
135142

143+
public ServerType getServerType() {
144+
return serverType;
145+
}
146+
136147
private MessageSettings(final Builder builder) {
137148
this.maxDocumentSize = builder.maxDocumentSize;
138149
this.maxMessageSize = builder.maxMessageSize;
139150
this.maxBatchCount = builder.maxBatchCount;
140151
this.serverVersion = builder.serverVersion;
152+
this.serverType = builder.serverType;
141153
}
142154
}

driver-core/src/main/com/mongodb/connection/ProtocolHelper.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,12 @@ static MongoException getQueryFailureException(final BsonDocument errorDocument,
182182

183183
static MessageSettings getMessageSettings(final ConnectionDescription connectionDescription) {
184184
return MessageSettings.builder()
185-
.maxDocumentSize(connectionDescription.getMaxDocumentSize())
186-
.maxMessageSize(connectionDescription.getMaxMessageSize())
187-
.maxBatchCount(connectionDescription.getMaxBatchCount())
188-
.serverVersion(connectionDescription.getServerVersion())
189-
.build();
185+
.maxDocumentSize(connectionDescription.getMaxDocumentSize())
186+
.maxMessageSize(connectionDescription.getMaxMessageSize())
187+
.maxBatchCount(connectionDescription.getMaxBatchCount())
188+
.serverVersion(connectionDescription.getServerVersion())
189+
.serverType(connectionDescription.getServerType())
190+
.build();
190191
}
191192

192193
static void encodeMessage(final RequestMessage message, final BsonOutput bsonOutput) {

driver-core/src/main/com/mongodb/connection/ReplyHeader.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ final class ReplyHeader {
5050
private final long cursorId;
5151
private final int startingFrom;
5252
private final int numberReturned;
53+
private final int opMsgFlagBits;
5354

5455
ReplyHeader(final ByteBuf header, final MessageHeader messageHeader) {
5556
this(messageHeader.getMessageLength(), messageHeader.getOpCode(), messageHeader, header);
@@ -70,8 +71,8 @@ private ReplyHeader(final int messageLength, final int opCode, final MessageHead
7071
startingFrom = 0;
7172
numberReturned = 1;
7273

73-
header.getInt(); // ignore flag bits
74-
header.get(); // ignore payload type
74+
opMsgFlagBits = header.getInt();
75+
header.get(); // ignore payload type
7576
} else if (opCode == OP_REPLY.getValue()) {
7677
if (messageLength < TOTAL_REPLY_HEADER_LENGTH) {
7778
throw new MongoInternalException(format("The reply message length %d is less than the mimimum message length %d",
@@ -82,6 +83,7 @@ private ReplyHeader(final int messageLength, final int opCode, final MessageHead
8283
cursorId = header.getLong();
8384
startingFrom = header.getInt();
8485
numberReturned = header.getInt();
86+
opMsgFlagBits = 0;
8587

8688
if (numberReturned < 0) {
8789
throw new MongoInternalException(format("The reply message number of returned documents, %d, is less than 0",
@@ -187,4 +189,9 @@ public boolean isCursorNotFound() {
187189
public boolean isQueryFailure() {
188190
return (responseFlags & QUERY_FAILURE_RESPONSE_FLAG) == QUERY_FAILURE_RESPONSE_FLAG;
189191
}
192+
193+
// for unit testing
194+
int getOpMsgFlagBits() {
195+
return opMsgFlagBits;
196+
}
190197
}

0 commit comments

Comments
 (0)