Skip to content

Commit 6f49b5c

Browse files
authored
Update server selection logic for $out/$merge aggregation pipelines (#818)
JAVA-4380
1 parent 5f2aa6c commit 6f49b5c

File tree

2 files changed

+123
-7
lines changed

2 files changed

+123
-7
lines changed

driver-core/src/main/com/mongodb/internal/selector/ReadPreferenceWithFallbackServerSelector.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.ReadPreference;
2020
import com.mongodb.connection.ClusterDescription;
21+
import com.mongodb.connection.ServerConnectionState;
2122
import com.mongodb.connection.ServerDescription;
2223
import com.mongodb.selector.ServerSelector;
2324

@@ -40,22 +41,22 @@ public ReadPreferenceWithFallbackServerSelector(final ReadPreference preferredRe
4041

4142
@Override
4243
public List<ServerDescription> select(final ClusterDescription clusterDescription) {
43-
if (maxWireVersion(clusterDescription) >= minWireVersion) {
44-
appliedReadPreference = preferredReadPreference;
45-
return new ReadPreferenceServerSelector(preferredReadPreference).select(clusterDescription);
46-
} else {
44+
if (clusterContainsOlderServers(clusterDescription)) {
4745
appliedReadPreference = fallbackReadPreference;
4846
return new ReadPreferenceServerSelector(fallbackReadPreference).select(clusterDescription);
47+
} else {
48+
appliedReadPreference = preferredReadPreference;
49+
return new ReadPreferenceServerSelector(preferredReadPreference).select(clusterDescription);
4950
}
5051
}
5152

5253
public ReadPreference getAppliedReadPreference() {
5354
return appliedReadPreference;
5455
}
5556

56-
private int maxWireVersion(final ClusterDescription clusterDescription) {
57+
private boolean clusterContainsOlderServers(final ClusterDescription clusterDescription) {
5758
return clusterDescription.getServerDescriptions().stream()
58-
.map(ServerDescription::getMaxWireVersion)
59-
.max(Integer::compareTo).orElse(0);
59+
.filter(serverDescription -> serverDescription.getState() == ServerConnectionState.CONNECTED)
60+
.anyMatch(serverDescription -> serverDescription.getMaxWireVersion() < minWireVersion);
6061
}
6162
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.selector;
18+
19+
import com.mongodb.ReadPreference;
20+
import com.mongodb.ServerAddress;
21+
import com.mongodb.connection.ClusterConnectionMode;
22+
import com.mongodb.connection.ClusterDescription;
23+
import com.mongodb.connection.ClusterType;
24+
import com.mongodb.connection.ServerDescription;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.util.List;
28+
29+
import static com.mongodb.connection.ServerConnectionState.CONNECTED;
30+
import static com.mongodb.connection.ServerConnectionState.CONNECTING;
31+
import static com.mongodb.connection.ServerDescription.builder;
32+
import static com.mongodb.connection.ServerType.REPLICA_SET_PRIMARY;
33+
import static com.mongodb.connection.ServerType.REPLICA_SET_SECONDARY;
34+
import static com.mongodb.internal.operation.ServerVersionHelper.FIVE_DOT_ZERO_WIRE_VERSION;
35+
import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_FOUR_WIRE_VERSION;
36+
import static java.util.Arrays.asList;
37+
import static java.util.Collections.emptyList;
38+
import static java.util.stream.Collectors.toList;
39+
import static org.junit.jupiter.api.Assertions.assertEquals;
40+
41+
public class ReadPreferenceWithFallbackServerSelectorTest {
42+
43+
@Test
44+
public void shouldSelectCorrectServersWhenAtLeastOneServerIsOlderThanMinimum() {
45+
ReadPreferenceWithFallbackServerSelector selector =
46+
new ReadPreferenceWithFallbackServerSelector(
47+
ReadPreference.secondary(), FIVE_DOT_ZERO_WIRE_VERSION, ReadPreference.primary());
48+
49+
ClusterDescription clusterDescription = new ClusterDescription(
50+
ClusterConnectionMode.MULTIPLE,
51+
ClusterType.REPLICA_SET,
52+
asList(
53+
builder().ok(true).state(CONNECTED).type(REPLICA_SET_PRIMARY).address(new ServerAddress("localhost:27017"))
54+
.maxWireVersion(FOUR_DOT_FOUR_WIRE_VERSION).build(),
55+
builder().ok(true).state(CONNECTED).type(REPLICA_SET_SECONDARY).address(new ServerAddress("localhost:27018"))
56+
.maxWireVersion(FIVE_DOT_ZERO_WIRE_VERSION).build()));
57+
assertEquals(clusterDescription.getServerDescriptions().stream()
58+
.filter(serverDescription -> serverDescription.getType() == REPLICA_SET_PRIMARY).collect(toList()),
59+
selector.select(clusterDescription));
60+
assertEquals(ReadPreference.primary(), selector.getAppliedReadPreference());
61+
}
62+
63+
@Test
64+
public void shouldSelectCorrectServersWhenAllServersAreAtLeastMinimum() {
65+
ReadPreferenceWithFallbackServerSelector selector =
66+
new ReadPreferenceWithFallbackServerSelector(
67+
ReadPreference.secondary(), FIVE_DOT_ZERO_WIRE_VERSION, ReadPreference.primary());
68+
69+
ClusterDescription clusterDescription = new ClusterDescription(
70+
ClusterConnectionMode.MULTIPLE,
71+
ClusterType.REPLICA_SET,
72+
asList(
73+
builder().ok(true).state(CONNECTED).type(REPLICA_SET_PRIMARY).address(new ServerAddress("localhost:27017"))
74+
.maxWireVersion(FIVE_DOT_ZERO_WIRE_VERSION).build(),
75+
builder().ok(true).state(CONNECTED).type(REPLICA_SET_SECONDARY).address(new ServerAddress("localhost:27018"))
76+
.maxWireVersion(FIVE_DOT_ZERO_WIRE_VERSION).build()));
77+
assertEquals(clusterDescription.getServerDescriptions().stream()
78+
.filter(serverDescription -> serverDescription.getType() == REPLICA_SET_SECONDARY).collect(toList()),
79+
selector.select(clusterDescription));
80+
assertEquals(ReadPreference.secondary(), selector.getAppliedReadPreference());
81+
}
82+
83+
@Test
84+
public void shouldSelectCorrectServersWhenNoServersHaveBeenDiscovered() {
85+
ReadPreferenceWithFallbackServerSelector selector =
86+
new ReadPreferenceWithFallbackServerSelector(
87+
ReadPreference.secondary(), FIVE_DOT_ZERO_WIRE_VERSION, ReadPreference.primary());
88+
89+
ClusterDescription clusterDescription = new ClusterDescription(
90+
ClusterConnectionMode.MULTIPLE,
91+
ClusterType.REPLICA_SET,
92+
asList(builder().ok(false).state(CONNECTING).address(new ServerAddress("localhost:27017")).build(),
93+
builder().ok(false).state(CONNECTING).address(new ServerAddress("localhost:27018")).build()));
94+
assertEquals(emptyList(), selector.select(clusterDescription));
95+
assertEquals(ReadPreference.secondary(), selector.getAppliedReadPreference());
96+
97+
// when there is one connecting server, and a primary and secondary with maxWireVersion >= minWireVersion, apply read preference
98+
clusterDescription = new ClusterDescription(
99+
ClusterConnectionMode.MULTIPLE,
100+
ClusterType.REPLICA_SET,
101+
asList(
102+
builder().ok(false).state(CONNECTING).address(new ServerAddress("localhost:27017")).build(),
103+
builder().ok(true).state(CONNECTED).type(REPLICA_SET_PRIMARY).address(new ServerAddress("localhost:27018"))
104+
.maxWireVersion(FIVE_DOT_ZERO_WIRE_VERSION)
105+
.build(),
106+
builder().ok(true).state(CONNECTED).type(REPLICA_SET_SECONDARY).address(new ServerAddress("localhost:27019"))
107+
.maxWireVersion(FIVE_DOT_ZERO_WIRE_VERSION)
108+
.build()));
109+
List<ServerDescription> serverDescriptionList = selector.select(clusterDescription);
110+
assertEquals(clusterDescription.getServerDescriptions().stream()
111+
.filter(serverDescription -> serverDescription.getType() == REPLICA_SET_SECONDARY).collect(toList()),
112+
serverDescriptionList);
113+
assertEquals(ReadPreference.secondary(), selector.getAppliedReadPreference());
114+
}
115+
}

0 commit comments

Comments
 (0)