Skip to content

Commit 99c2e19

Browse files
committed
JAVA-2815: Fix read concern regression
In implementing transactions, a regression in the async driver was introduced in sending read concern outside of transactions. This commit fixes the regression and adds an integration test for both the sync and async drivers.
1 parent eaf4292 commit 99c2e19

File tree

8 files changed

+227
-31
lines changed

8 files changed

+227
-31
lines changed

driver-async/src/main/com/mongodb/async/client/OperationExecutorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private AsyncReadWriteBinding getReadWriteBinding(final ReadPreference readPrefe
122122
@Nullable final ClientSession session, final boolean ownsSession) {
123123
notNull("readPreference", readPreference);
124124
AsyncReadWriteBinding readWriteBinding = new AsyncClusterBinding(mongoClient.getCluster(),
125-
getReadPreferenceForBinding(readPreference, session));
125+
getReadPreferenceForBinding(readPreference, session), readConcern);
126126
if (session != null) {
127127
if (!session.hasActiveTransaction() && session.getOptions().getAutoStartTransaction()) {
128128
session.startTransaction();
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.async.client;
18+
19+
import com.mongodb.ClusterFixture;
20+
import com.mongodb.ReadConcern;
21+
import com.mongodb.ReadPreference;
22+
import com.mongodb.async.SingleResultCallback;
23+
import com.mongodb.connection.TestCommandListener;
24+
import com.mongodb.event.CommandEvent;
25+
import com.mongodb.event.CommandStartedEvent;
26+
import org.bson.BsonDocument;
27+
import org.bson.BsonString;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
32+
import java.util.Arrays;
33+
import java.util.List;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.TimeUnit;
36+
37+
import static com.mongodb.ClusterFixture.isStandalone;
38+
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
39+
import static com.mongodb.async.client.Fixture.getDefaultDatabaseName;
40+
import static com.mongodb.client.CommandMonitoringTestHelper.assertEventsEquality;
41+
import static org.junit.Assume.assumeTrue;
42+
43+
public class ReadConcernTest {
44+
private TestCommandListener commandListener;
45+
private MongoClient mongoClient;
46+
47+
@Before
48+
public void setUp() {
49+
assumeTrue(canRunTests());
50+
commandListener = new TestCommandListener();
51+
mongoClient = MongoClients.create(Fixture.getMongoClientBuilderFromConnectionString()
52+
.addCommandListener(commandListener)
53+
.build());
54+
}
55+
56+
@After
57+
public void tearDown() {
58+
if (mongoClient != null) {
59+
mongoClient.close();
60+
}
61+
}
62+
63+
@Test
64+
public void shouldIncludeReadConcernInCommand() throws InterruptedException {
65+
final CountDownLatch latch = new CountDownLatch(1);
66+
mongoClient.getDatabase(getDefaultDatabaseName()).getCollection("test")
67+
.withReadConcern(ReadConcern.LOCAL).count(new SingleResultCallback<Long>() {
68+
@Override
69+
public void onResult(final Long result, final Throwable t) {
70+
latch.countDown();
71+
}
72+
});
73+
74+
latch.await(ClusterFixture.TIMEOUT, TimeUnit.SECONDS);
75+
76+
List<CommandEvent> events = commandListener.getCommandStartedEvents();
77+
78+
BsonDocument commandDocument = new BsonDocument("count", new BsonString("test"))
79+
.append("readConcern", ReadConcern.LOCAL.asDocument())
80+
.append("query", new BsonDocument());
81+
if (serverVersionAtLeast(3, 6)) {
82+
commandDocument.put("$db", new BsonString(getDefaultDatabaseName()));
83+
}
84+
if (isStandalone() && serverVersionAtLeast(3, 6)) {
85+
commandDocument.put("$readPreference", ReadPreference.primaryPreferred().toDocument());
86+
}
87+
assertEventsEquality(Arrays.<CommandEvent>asList(new CommandStartedEvent(1, null, getDefaultDatabaseName(),
88+
"count", commandDocument)), events,
89+
commandListener.getSessions());
90+
91+
}
92+
93+
private boolean canRunTests() {
94+
return serverVersionAtLeast(3, 2);
95+
}
96+
}

driver-async/src/test/functional/com/mongodb/async/client/TransactionsTest.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import com.mongodb.connection.SocketSettings;
3232
import com.mongodb.connection.TestCommandListener;
3333
import com.mongodb.event.CommandEvent;
34-
import com.mongodb.event.CommandStartedEvent;
3534
import com.mongodb.lang.Nullable;
3635
import org.bson.BsonArray;
3736
import org.bson.BsonBoolean;
@@ -305,7 +304,7 @@ public void execute() {
305304
// TODO: null operation may cause test failures, since it's used to grab the read preference
306305
// TODO: though read-pref.json doesn't declare expectations, so maybe not
307306
List<CommandEvent> expectedEvents = getExpectedEvents(definition.getArray("expectations"), databaseName, null);
308-
List<CommandEvent> events = getCommandStartedEvents();
307+
List<CommandEvent> events = commandListener.getCommandStartedEvents();
309308

310309
assertEventsEquality(expectedEvents, events, commandListener.getSessions());
311310
}
@@ -352,16 +351,6 @@ private ClientSession nonNullClientSession(@Nullable final ClientSession clientS
352351
return clientSession;
353352
}
354353

355-
private List<CommandEvent> getCommandStartedEvents() {
356-
List<CommandEvent> commandStartedEvents = new ArrayList<CommandEvent>();
357-
for (CommandEvent cur : commandListener.getEvents()) {
358-
if (cur instanceof CommandStartedEvent) {
359-
commandStartedEvents.add(cur);
360-
}
361-
}
362-
return commandStartedEvents;
363-
}
364-
365354
@Parameterized.Parameters(name = "{0}: {1}")
366355
public static Collection<Object[]> data() throws URISyntaxException, IOException {
367356
List<Object[]> data = new ArrayList<Object[]>();

driver-core/src/main/com/mongodb/binding/AsyncClusterBinding.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@
1616

1717
package com.mongodb.binding;
1818

19+
import com.mongodb.ReadConcern;
1920
import com.mongodb.ReadPreference;
2021
import com.mongodb.async.SingleResultCallback;
2122
import com.mongodb.connection.AsyncConnection;
2223
import com.mongodb.connection.Cluster;
2324
import com.mongodb.connection.Server;
2425
import com.mongodb.connection.ServerDescription;
25-
import com.mongodb.session.SessionContext;
26-
import com.mongodb.internal.connection.NoOpSessionContext;
26+
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
2727
import com.mongodb.selector.ReadPreferenceServerSelector;
2828
import com.mongodb.selector.ServerSelector;
2929
import com.mongodb.selector.WritableServerSelector;
30+
import com.mongodb.session.SessionContext;
3031

3132
import static com.mongodb.assertions.Assertions.notNull;
3233

@@ -39,16 +40,32 @@
3940
public class AsyncClusterBinding extends AbstractReferenceCounted implements AsyncReadWriteBinding {
4041
private final Cluster cluster;
4142
private final ReadPreference readPreference;
43+
private final ReadConcern readConcern;
4244

4345
/**
4446
* Creates an instance.
4547
*
4648
* @param cluster a non-null Cluster which will be used to select a server to bind to
4749
* @param readPreference a non-null ReadPreference for read operations
50+
* @deprecated Prefer {@link #AsyncClusterBinding(Cluster, ReadPreference, ReadConcern)}
4851
*/
52+
@Deprecated
4953
public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPreference) {
54+
this(cluster, readPreference, ReadConcern.DEFAULT);
55+
}
56+
57+
/**
58+
* Creates an instance.
59+
*
60+
* @param cluster a non-null Cluster which will be used to select a server to bind to
61+
* @param readPreference a non-null ReadPreference for read operations
62+
* @param readConcern a non-null read concern
63+
* @since 3.8
64+
*/
65+
public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPreference, final ReadConcern readConcern) {
5066
this.cluster = notNull("cluster", cluster);
5167
this.readPreference = notNull("readPreference", readPreference);
68+
this.readConcern = (notNull("readConcern", readConcern));
5269
}
5370

5471
@Override
@@ -64,7 +81,7 @@ public ReadPreference getReadPreference() {
6481

6582
@Override
6683
public SessionContext getSessionContext() {
67-
return NoOpSessionContext.INSTANCE;
84+
return new ReadConcernAwareNoOpSessionContext(readConcern);
6885
}
6986

7087
@Override
@@ -106,7 +123,7 @@ public ServerDescription getServerDescription() {
106123

107124
@Override
108125
public SessionContext getSessionContext() {
109-
return NoOpSessionContext.INSTANCE;
126+
return new ReadConcernAwareNoOpSessionContext(readConcern);
110127
}
111128

112129
@Override

driver-core/src/test/functional/com/mongodb/ClusterFixture.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public static AsyncSingleConnectionBinding getAsyncSingleConnectionBinding(final
254254
}
255255

256256
public static AsyncReadWriteBinding getAsyncBinding(final Cluster cluster) {
257-
return new AsyncClusterBinding(cluster, ReadPreference.primary());
257+
return new AsyncClusterBinding(cluster, ReadPreference.primary(), ReadConcern.DEFAULT);
258258
}
259259

260260
public static AsyncReadWriteBinding getAsyncBinding() {
@@ -267,7 +267,7 @@ public static AsyncReadWriteBinding getAsyncBinding(final ReadPreference readPre
267267

268268
public static AsyncReadWriteBinding getAsyncBinding(final Cluster cluster, final ReadPreference readPreference) {
269269
if (!asyncBindingMap.containsKey(readPreference)) {
270-
AsyncReadWriteBinding binding = new AsyncClusterBinding(cluster, readPreference);
270+
AsyncReadWriteBinding binding = new AsyncClusterBinding(cluster, readPreference, ReadConcern.DEFAULT);
271271
if (serverVersionAtLeast(3, 6)) {
272272
binding = new AsyncSessionBinding(binding);
273273
}

driver-core/src/test/functional/com/mongodb/connection/TestCommandListener.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ public List<CommandEvent> getEvents() {
7171
return events;
7272
}
7373

74+
public List<CommandEvent> getCommandStartedEvents() {
75+
List<CommandEvent> commandStartedEvents = new ArrayList<CommandEvent>();
76+
for (CommandEvent cur : getEvents()) {
77+
if (cur instanceof CommandStartedEvent) {
78+
commandStartedEvents.add(cur);
79+
}
80+
}
81+
return commandStartedEvents;
82+
}
83+
84+
7485
@Override
7586
public void commandStarted(final CommandStartedEvent event) {
7687
events.add(new CommandStartedEvent(event.getRequestId(), event.getConnectionDescription(), event.getDatabaseName(),
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.client;
18+
19+
import com.mongodb.Block;
20+
import com.mongodb.ReadConcern;
21+
import com.mongodb.ReadPreference;
22+
import com.mongodb.connection.SocketSettings;
23+
import com.mongodb.connection.TestCommandListener;
24+
import com.mongodb.event.CommandEvent;
25+
import com.mongodb.event.CommandStartedEvent;
26+
import org.bson.BsonDocument;
27+
import org.bson.BsonString;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
32+
import java.util.Arrays;
33+
import java.util.List;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import static com.mongodb.ClusterFixture.isStandalone;
37+
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
38+
import static com.mongodb.client.CommandMonitoringTestHelper.assertEventsEquality;
39+
import static com.mongodb.client.Fixture.getDefaultDatabaseName;
40+
import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
41+
import static org.junit.Assume.assumeTrue;
42+
43+
public class ReadConcernTest {
44+
private MongoClient mongoClient;
45+
private TestCommandListener commandListener;
46+
47+
@Before
48+
public void setUp() {
49+
assumeTrue(canRunTests());
50+
51+
commandListener = new TestCommandListener();
52+
mongoClient = MongoClients.create(getMongoClientSettingsBuilder()
53+
.addCommandListener(commandListener)
54+
.applyToSocketSettings(new Block<SocketSettings.Builder>() {
55+
@Override
56+
public void apply(final SocketSettings.Builder builder) {
57+
builder.readTimeout(5, TimeUnit.SECONDS);
58+
}
59+
})
60+
.build());
61+
}
62+
63+
@After
64+
public void cleanUp() {
65+
if (mongoClient != null) {
66+
mongoClient.close();
67+
}
68+
}
69+
70+
@Test
71+
public void shouldIncludeReadConcernInCommand() {
72+
mongoClient.getDatabase(getDefaultDatabaseName()).getCollection("test")
73+
.withReadConcern(ReadConcern.LOCAL).count();
74+
75+
List<CommandEvent> events = commandListener.getCommandStartedEvents();
76+
77+
BsonDocument commandDocument = new BsonDocument("count", new BsonString("test"))
78+
.append("readConcern", ReadConcern.LOCAL.asDocument())
79+
.append("query", new BsonDocument());
80+
if (serverVersionAtLeast(3, 6)) {
81+
commandDocument.put("$db", new BsonString(getDefaultDatabaseName()));
82+
}
83+
if (isStandalone() && serverVersionAtLeast(3, 6)) {
84+
commandDocument.put("$readPreference", ReadPreference.primaryPreferred().toDocument());
85+
}
86+
assertEventsEquality(Arrays.<CommandEvent>asList(new CommandStartedEvent(1, null, getDefaultDatabaseName(),
87+
"count", commandDocument)), events,
88+
commandListener.getSessions());
89+
}
90+
91+
private boolean canRunTests() {
92+
return serverVersionAtLeast(3, 2);
93+
}
94+
}

driver-sync/src/test/functional/com/mongodb/client/TransactionsTest.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import com.mongodb.connection.SocketSettings;
3232
import com.mongodb.connection.TestCommandListener;
3333
import com.mongodb.event.CommandEvent;
34-
import com.mongodb.event.CommandStartedEvent;
3534
import com.mongodb.lang.Nullable;
3635
import org.bson.BsonArray;
3736
import org.bson.BsonBoolean;
@@ -292,7 +291,7 @@ public void shouldPassAllOutcomes() {
292291
// TODO: null operation may cause test failures, since it's used to grab the read preference
293292
// TODO: though read-pref.json doesn't declare expectations, so maybe not
294293
List<CommandEvent> expectedEvents = getExpectedEvents(definition.getArray("expectations"), databaseName, null);
295-
List<CommandEvent> events = getCommandStartedEvents();
294+
List<CommandEvent> events = commandListener.getCommandStartedEvents();
296295

297296
assertEventsEquality(expectedEvents, events, commandListener.getSessions());
298297
}
@@ -339,16 +338,6 @@ private ClientSession nonNullClientSession(@Nullable final ClientSession clientS
339338
return clientSession;
340339
}
341340

342-
private List<CommandEvent> getCommandStartedEvents() {
343-
List<CommandEvent> commandStartedEvents = new ArrayList<CommandEvent>();
344-
for (CommandEvent cur : commandListener.getEvents()) {
345-
if (cur instanceof CommandStartedEvent) {
346-
commandStartedEvents.add(cur);
347-
}
348-
}
349-
return commandStartedEvents;
350-
}
351-
352341
@Parameterized.Parameters(name = "{0}: {1}")
353342
public static Collection<Object[]> data() throws URISyntaxException, IOException {
354343
List<Object[]> data = new ArrayList<Object[]>();

0 commit comments

Comments
 (0)