16
16
17
17
package com.mongodb.connection
18
18
19
+ import category.Slow
20
+ import com.mongodb.ClusterFixture
19
21
import com.mongodb.MongoClientException
20
22
import com.mongodb.MongoException
21
23
import com.mongodb.MongoInternalException
24
+ import com.mongodb.MongoInterruptedException
22
25
import com.mongodb.MongoTimeoutException
23
26
import com.mongodb.MongoWaitQueueFullException
24
27
import com.mongodb.ReadPreference
25
28
import com.mongodb.ServerAddress
26
29
import com.mongodb.selector.ReadPreferenceServerSelector
27
30
import com.mongodb.selector.ServerAddressSelector
28
31
import com.mongodb.selector.WritableServerSelector
32
+ import org.junit.experimental.categories.Category
29
33
import spock.lang.Specification
30
34
31
35
import java.util.concurrent.CountDownLatch
@@ -83,7 +87,7 @@ class BaseClusterSpecification extends Specification {
83
87
def cluster = new MultiServerCluster (new ClusterId (),
84
88
builder(). mode(MULTIPLE )
85
89
.hosts([firstServer, secondServer])
86
- .serverSelectionTimeout(1 , MILLISECONDS )
90
+ .serverSelectionTimeout(serverSelectionTimeoutMS , MILLISECONDS )
87
91
.build(),
88
92
factory)
89
93
@@ -98,7 +102,8 @@ class BaseClusterSpecification extends Specification {
98
102
99
103
then :
100
104
def e = thrown(MongoTimeoutException )
101
- e. getMessage(). startsWith(' Timed out after 1 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN' )
105
+ e. getMessage(). startsWith(" Timed out after ${ serverSelectionTimeoutMS} ms while waiting to connect. " +
106
+ ' Client view of cluster state is {type=UNKNOWN' )
102
107
e. getMessage(). contains(' {address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' +
103
108
' exception={com.mongodb.MongoInternalException: oops}}' );
104
109
e. getMessage(). contains(' {address=localhost:27018, type=UNKNOWN, state=CONNECTING}' );
@@ -108,17 +113,105 @@ class BaseClusterSpecification extends Specification {
108
113
109
114
then :
110
115
e = thrown(MongoTimeoutException )
111
- e. getMessage(). startsWith(' Timed out after 1 ms while waiting for a server that matches WritableServerSelector. ' +
112
- ' Client view of cluster state is {type=UNKNOWN' )
116
+ e. getMessage(). startsWith(" Timed out after ${ serverSelectionTimeoutMS } ms while waiting for a server " +
117
+ ' that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN' )
113
118
e. getMessage(). contains(' {address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' +
114
119
' exception={com.mongodb.MongoInternalException: oops}}' );
115
120
e. getMessage(). contains(' {address=localhost:27018, type=UNKNOWN, state=CONNECTING}' );
121
+
122
+ where :
123
+ serverSelectionTimeoutMS << [1 , 0 ]
116
124
}
117
125
118
- def ' should select server asynchronously ' () {
126
+ def ' should select server' () {
119
127
given :
120
128
def cluster = new MultiServerCluster (new ClusterId (),
121
129
builder(). mode(MULTIPLE )
130
+ .hosts([firstServer, secondServer, thirdServer])
131
+ .serverSelectionTimeout(serverSelectionTimeoutMS, SECONDS )
132
+ .build(),
133
+ factory)
134
+ factory. sendNotification(firstServer, REPLICA_SET_SECONDARY , allServers)
135
+ factory. sendNotification(secondServer, REPLICA_SET_SECONDARY , allServers)
136
+ factory. sendNotification(thirdServer, REPLICA_SET_PRIMARY , allServers)
137
+
138
+ expect :
139
+ cluster. selectServer(new ReadPreferenceServerSelector (ReadPreference . primary())). description. address == thirdServer
140
+
141
+ cleanup :
142
+ cluster?. close()
143
+
144
+ where :
145
+ serverSelectionTimeoutMS << [30 , 0 , -1 ]
146
+ }
147
+
148
+ @Category (Slow )
149
+ def ' should wait indefinitely for a server until interrupted' () {
150
+ given :
151
+ def cluster = new MultiServerCluster (new ClusterId (),
152
+ builder(). mode(MULTIPLE )
153
+ .hosts([firstServer, secondServer, thirdServer])
154
+ .serverSelectionTimeout(-1 , SECONDS )
155
+ .build(),
156
+ factory)
157
+
158
+ when :
159
+ def latch = new CountDownLatch (1 )
160
+ def thread = new Thread ({
161
+ try {
162
+ cluster. selectServer(new ReadPreferenceServerSelector (ReadPreference . primary()))
163
+ } catch (MongoInterruptedException e) {
164
+ latch. countDown()
165
+ }
166
+ })
167
+ thread. start()
168
+ sleep(1000 )
169
+ thread. interrupt()
170
+ latch. await(ClusterFixture . TIMEOUT , SECONDS )
171
+
172
+ then :
173
+ true
174
+
175
+ cleanup :
176
+ cluster?. close()
177
+ }
178
+
179
+ @Category (Slow )
180
+ def ' should wait indefinitely for a cluster description until interrupted' () {
181
+ given :
182
+ def cluster = new MultiServerCluster (new ClusterId (),
183
+ builder(). mode(MULTIPLE )
184
+ .hosts([firstServer, secondServer, thirdServer])
185
+ .serverSelectionTimeout(-1 , SECONDS )
186
+ .build(),
187
+ factory)
188
+
189
+ when :
190
+ def latch = new CountDownLatch (1 )
191
+ def thread = new Thread ({
192
+ try {
193
+ cluster. getDescription()
194
+ } catch (MongoInterruptedException e) {
195
+ latch. countDown()
196
+ }
197
+ })
198
+ thread. start()
199
+ sleep(1000 )
200
+ thread. interrupt()
201
+ latch. await(ClusterFixture . TIMEOUT , SECONDS )
202
+
203
+ then :
204
+ true
205
+
206
+ cleanup :
207
+ cluster?. close()
208
+ }
209
+
210
+ def ' should select server asynchronously when server is already available' () {
211
+ given :
212
+ def cluster = new MultiServerCluster (new ClusterId (),
213
+ builder(). mode(MULTIPLE )
214
+ .serverSelectionTimeout(serverSelectionTimeoutMS, MILLISECONDS )
122
215
.hosts([firstServer, secondServer, thirdServer])
123
216
.build(),
124
217
factory)
@@ -130,6 +223,22 @@ class BaseClusterSpecification extends Specification {
130
223
then :
131
224
server. description. address == firstServer
132
225
226
+ cleanup :
227
+ cluster?. close()
228
+
229
+ where :
230
+ serverSelectionTimeoutMS << [30 , 0 , -1 ]
231
+ }
232
+
233
+ def ' should select server asynchronously when server is not yet available' () {
234
+ given :
235
+ def cluster = new MultiServerCluster (new ClusterId (),
236
+ builder(). mode(MULTIPLE )
237
+ .serverSelectionTimeout(serverSelectionTimeoutMS, MILLISECONDS )
238
+ .hosts([firstServer, secondServer, thirdServer])
239
+ .build(),
240
+ factory)
241
+
133
242
when :
134
243
def secondServerLatch = selectServerAsync(cluster, secondServer)
135
244
def thirdServerLatch = selectServerAsync(cluster, thirdServer)
@@ -144,6 +253,9 @@ class BaseClusterSpecification extends Specification {
144
253
145
254
cleanup :
146
255
cluster?. close()
256
+
257
+ where :
258
+ serverSelectionTimeoutMS << [30 , -1 ]
147
259
}
148
260
149
261
def ' when selecting server asynchronously should send MongoClientException to callback if cluster is closed before success' () {
@@ -171,7 +283,7 @@ class BaseClusterSpecification extends Specification {
171
283
def cluster = new MultiServerCluster (new ClusterId (),
172
284
builder(). mode(MULTIPLE )
173
285
.hosts([firstServer, secondServer, thirdServer])
174
- .serverSelectionTimeout(100 , MILLISECONDS )
286
+ .serverSelectionTimeout(serverSelectionTimeoutMS , MILLISECONDS )
175
287
.build(),
176
288
factory)
177
289
@@ -183,6 +295,10 @@ class BaseClusterSpecification extends Specification {
183
295
184
296
cleanup :
185
297
cluster?. close()
298
+
299
+
300
+ where :
301
+ serverSelectionTimeoutMS << [100 , 0 ]
186
302
}
187
303
188
304
def ' when selecting server asynchronously should send MongoWaitQueueFullException to callback if there are too many waiters' () {
0 commit comments