Skip to content

Commit 0b95a2f

Browse files
committed
Use the AsyncReadBinding's readPreference with the ReadConnectionSource
JAVA-1954 JAVA-1944
1 parent f96a436 commit 0b95a2f

File tree

2 files changed

+135
-2
lines changed

2 files changed

+135
-2
lines changed

driver-core/src/main/com/mongodb/operation/CommandOperationHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ static <D, T> void executeWrappedCommandProtocolAsync(final String database, fin
212212
final Function<D, T> transformer,
213213
final SingleResultCallback<T> callback) {
214214
binding.getReadConnectionSource(new CommandProtocolExecutingCallback<D, T>(database, command, new NoOpFieldNameValidator(),
215-
decoder, primary(), transformer,
215+
decoder, binding.getReadPreference(), transformer,
216216
errorHandlingCallback(callback)));
217217
}
218218

driver-core/src/test/unit/com/mongodb/operation/CommandOperationHelperSpecification.groovy

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,31 @@
1515
*/
1616

1717
package com.mongodb.operation
18-
18+
import com.mongodb.Function
1919
import com.mongodb.MongoCommandException
20+
import com.mongodb.ReadPreference
2021
import com.mongodb.ServerAddress
22+
import com.mongodb.async.SingleResultCallback
23+
import com.mongodb.binding.AsyncConnectionSource
24+
import com.mongodb.binding.AsyncReadBinding
25+
import com.mongodb.binding.AsyncWriteBinding
26+
import com.mongodb.binding.ConnectionSource
27+
import com.mongodb.binding.ReadBinding
28+
import com.mongodb.binding.WriteBinding
29+
import com.mongodb.connection.AsyncConnection
30+
import com.mongodb.connection.ClusterId
31+
import com.mongodb.connection.Connection
32+
import com.mongodb.connection.ConnectionDescription
33+
import com.mongodb.connection.ServerId
2134
import org.bson.BsonBoolean
2235
import org.bson.BsonDocument
2336
import org.bson.BsonInt32
2437
import org.bson.BsonString
38+
import org.bson.codecs.Decoder
2539
import spock.lang.Specification
2640

41+
import static com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol
42+
import static com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocolAsync
2743
import static com.mongodb.operation.CommandOperationHelper.isNamespaceError
2844
import static com.mongodb.operation.CommandOperationHelper.rethrowIfNotNamespaceError
2945

@@ -89,4 +105,121 @@ class CommandOperationHelperSpecification extends Specification {
89105
.append('code', new BsonInt32(26)),
90106
new ServerAddress()), 'some value') == 'some value'
91107
}
108+
109+
def 'should set slaveOK to false when using WriteBinding'() {
110+
given:
111+
def dbName = "db"
112+
def command = new BsonDocument()
113+
def decoder = Stub(Decoder)
114+
def writeBinding = Mock(WriteBinding)
115+
def function = Mock(Function)
116+
def connectionSource = Mock(ConnectionSource)
117+
def connection = Mock(Connection)
118+
def connectionDescription = new ConnectionDescription(new ServerId(new ClusterId("cluster"), new ServerAddress("localhost")))
119+
120+
when:
121+
executeWrappedCommandProtocol(dbName, command, decoder, writeBinding, function)
122+
123+
then:
124+
1 * writeBinding.getWriteConnectionSource() >> connectionSource
125+
126+
then:
127+
1 * connectionSource.getConnection() >> connection
128+
1 * connection.getDescription() >> connectionDescription
129+
130+
then:
131+
1 * connection.command(dbName, command, false, _, decoder)
132+
133+
then:
134+
1 * connection.release()
135+
1 * function.apply(_)
136+
1 * connectionSource.release()
137+
}
138+
139+
def 'should use the ReadBindings readPreference to set slaveOK'() {
140+
given:
141+
def dbName = "db"
142+
def command = new BsonDocument()
143+
def decoder = Stub(Decoder)
144+
def readBinding = Mock(ReadBinding)
145+
def readPreference = Mock(ReadPreference)
146+
def function = Mock(Function)
147+
def connectionSource = Mock(ConnectionSource)
148+
def connection = Mock(Connection)
149+
def connectionDescription = new ConnectionDescription(new ServerId(new ClusterId("cluster"), new ServerAddress("localhost")))
150+
151+
when:
152+
executeWrappedCommandProtocol(dbName, command, decoder, readBinding, function)
153+
154+
then:
155+
1 * readBinding.getReadConnectionSource() >> connectionSource
156+
1 * readBinding.getReadPreference() >> readPreference
157+
158+
then:
159+
1 * connectionSource.getConnection() >> connection
160+
1 * connection.getDescription() >> connectionDescription
161+
162+
then:
163+
1 * readPreference.slaveOk >> true
164+
1 * connection.command(dbName, command, true, _, decoder)
165+
166+
then:
167+
1 * connection.release()
168+
1 * function.apply(_)
169+
1 * connectionSource.release()
170+
}
171+
172+
def 'should set slaveOK to false when using AsyncWriteBinding'() {
173+
given:
174+
def dbName = "db"
175+
def command = new BsonDocument()
176+
def decoder = Stub(Decoder)
177+
def asyncWriteBinding = Mock(AsyncWriteBinding)
178+
def function = Mock(Function)
179+
def callback = Stub(SingleResultCallback)
180+
def connectionSource = Mock(AsyncConnectionSource)
181+
def connection = Mock(AsyncConnection)
182+
def connectionDescription = new ConnectionDescription(new ServerId(new ClusterId("cluster"), new ServerAddress("localhost")))
183+
184+
when:
185+
executeWrappedCommandProtocolAsync(dbName, command, decoder, asyncWriteBinding, function, callback)
186+
187+
then:
188+
1 * asyncWriteBinding.getWriteConnectionSource(_) >> { it[0].onResult(connectionSource, null) }
189+
1 * connectionSource.getConnection(_) >> { it[0].onResult(connection, null)}
190+
1 * connection.getDescription() >> connectionDescription
191+
1 * connection.commandAsync(dbName, command, false, _, decoder, _) >> { it[5].onResult(1, null)}
192+
1 * connection.release()
193+
1 * connectionSource.release()
194+
}
195+
196+
def 'should use the AsyncReadBindings readPreference to set slaveOK'() {
197+
given:
198+
def dbName = "db"
199+
def command = new BsonDocument()
200+
def decoder = Stub(Decoder)
201+
def asyncReadBinding = Mock(AsyncReadBinding)
202+
def readPreference = Mock(ReadPreference)
203+
def function = Mock(Function)
204+
def callback = Stub(SingleResultCallback)
205+
def connectionSource = Mock(AsyncConnectionSource)
206+
def connection = Mock(AsyncConnection)
207+
def connectionDescription = new ConnectionDescription(new ServerId(new ClusterId("cluster"), new ServerAddress("localhost")))
208+
209+
when:
210+
executeWrappedCommandProtocolAsync(dbName, command, decoder, asyncReadBinding, function, callback)
211+
212+
then:
213+
1 * asyncReadBinding.getReadPreference() >> readPreference
214+
215+
then:
216+
1 * asyncReadBinding.getReadConnectionSource(_) >> { it[0].onResult(connectionSource, null) }
217+
1 * connectionSource.getConnection(_) >> { it[0].onResult(connection, null)}
218+
1 * connection.getDescription() >> connectionDescription
219+
1 * readPreference.slaveOk >> true
220+
1 * connection.commandAsync(dbName, command, true, _, decoder, _) >> { it[5].onResult(1, null)}
221+
1 * connection.release()
222+
1 * connectionSource.release()
223+
}
224+
92225
}

0 commit comments

Comments
 (0)