18
18
using System . Threading ;
19
19
using System . Threading . Tasks ;
20
20
using FluentAssertions ;
21
+ using MongoDB . Bson ;
22
+ using MongoDB . Bson . IO ;
23
+ using MongoDB . Bson . Serialization . Serializers ;
21
24
using MongoDB . Bson . TestHelpers . XunitExtensions ;
22
25
using MongoDB . Driver . Core . Bindings ;
23
26
using MongoDB . Driver . Core . Clusters ;
27
+ using MongoDB . Driver . Core . Clusters . ServerSelectors ;
24
28
using MongoDB . Driver . Core . Configuration ;
25
29
using MongoDB . Driver . Core . ConnectionPools ;
26
30
using MongoDB . Driver . Core . Connections ;
27
31
using MongoDB . Driver . Core . Events ;
32
+ using MongoDB . Driver . Core . TestHelpers . XunitExtensions ;
33
+ using MongoDB . Driver . Core . WireProtocol ;
34
+ using MongoDB . Driver . Core . WireProtocol . Messages . Encoders ;
28
35
using Moq ;
29
36
using Xunit ;
30
37
@@ -246,4 +253,93 @@ public void A_description_changed_event_with_a_heartbeat_exception_should_clear_
246
253
_mockConnectionPool. Verify ( p => p . Clear ( ) , Times . Once ) ;
247
254
}
248
255
}
256
+
257
+ public class ServerChannelTests
258
+ {
259
+ [ SkippableTheory ]
260
+ [ InlineData ( 1 , 2 , 2 ) ]
261
+ [ InlineData ( 2 , 1 , 2 ) ]
262
+ public void Command_should_send_the_greater_of_the_session_and_cluster_cluster_times( long sessionTimestamp , long clusterTimestamp , long expectedTimestamp )
263
+ {
264
+ RequireServer. Check ( ) . VersionGreaterThanOrEqualTo ( "3.6" ) . ClusterTypes ( ClusterType . ReplicaSet , ClusterType . Sharded ) ;
265
+ var sessionClusterTime = new BsonDocument( "clusterTime" , new BsonTimestamp ( sessionTimestamp ) ) ;
266
+ var clusterClusterTime = new BsonDocument( "clusterTime" , new BsonTimestamp ( clusterTimestamp ) ) ;
267
+ var expectedClusterTime = new BsonDocument( "clusterTime" , new BsonTimestamp ( expectedTimestamp ) ) ;
268
+
269
+ var eventCapturer = new EventCapturer( ) . Capture < CommandStartedEvent > ( e => e . CommandName == "ping" ) ;
270
+ using ( var cluster = CoreTestConfiguration. CreateCluster ( b => b . Subscribe ( eventCapturer ) ) )
271
+ using ( var session = cluster. StartSession ( ) )
272
+ {
273
+ var cancellationToken = CancellationToken . None ;
274
+ var server = ( Server ) cluster . SelectServer ( WritableServerSelector . Instance , cancellationToken ) ;
275
+ using ( var channel = server . GetChannel ( cancellationToken ) )
276
+ {
277
+ session . AdvanceClusterTime ( sessionClusterTime ) ;
278
+ server . ClusterClock . AdvanceClusterTime ( clusterClusterTime ) ;
279
+
280
+ var command = BsonDocument . Parse ( "{ ping : 1 }" ) ;
281
+ try
282
+ {
283
+ channel . Command < BsonDocument > (
284
+ session ,
285
+ ReadPreference . Primary ,
286
+ DatabaseNamespace . Admin ,
287
+ command ,
288
+ NoOpElementNameValidator . Instance ,
289
+ null , // additionalOptions
290
+ ( ) => CommandResponseHandling . Return ,
291
+ BsonDocumentSerializer . Instance ,
292
+ new MessageEncoderSettings ( ) ,
293
+ cancellationToken ) ;
294
+ }
295
+ catch ( MongoCommandException ex )
296
+ {
297
+ // we're expecting the command to fail because the $clusterTime we sent is not properly signed
298
+ // the point of this test is just to assert that the driver sent the higher of the session and cluster clusterTimes
299
+ ex . Message . Should ( ) . Contain ( "Missing expected field \" signature\" " ) ;
300
+ }
301
+ }
302
+ }
303
+
304
+ var commandStartedEvent = eventCapturer . Next ( ) . Should ( ) . BeOfType < CommandStartedEvent > ( ) . Subject ;
305
+ var actualCommand = commandStartedEvent . Command ;
306
+ var actualClusterTime = actualCommand [ "$clusterTime" ] . AsBsonDocument ;
307
+ actualClusterTime . Should ( ) . Be ( expectedClusterTime ) ;
308
+ }
309
+
310
+ [ SkippableFact ]
311
+ public void Command_should_update_the_session_and_cluster_cluster_times ( )
312
+ {
313
+ RequireServer . Check ( ) . VersionGreaterThanOrEqualTo ( "3.6" ) . ClusterTypes ( ClusterType . ReplicaSet , ClusterType . Sharded ) ;
314
+
315
+ var eventCapturer = new EventCapturer ( ) . Capture < CommandSucceededEvent > ( e => e . CommandName == "ping" ) ;
316
+ using ( var cluster = CoreTestConfiguration . CreateCluster ( b => b . Subscribe ( eventCapturer ) ) )
317
+ using ( var session = cluster . StartSession ( ) )
318
+ {
319
+ var cancellationToken = CancellationToken . None ;
320
+ var server = ( Server ) cluster . SelectServer ( WritableServerSelector . Instance , cancellationToken ) ;
321
+ using ( var channel = server . GetChannel ( cancellationToken ) )
322
+ {
323
+ var command = BsonDocument . Parse ( "{ ping : 1 }" ) ;
324
+ channel . Command < BsonDocument > (
325
+ session ,
326
+ ReadPreference . Primary ,
327
+ DatabaseNamespace . Admin ,
328
+ command ,
329
+ NoOpElementNameValidator . Instance ,
330
+ null , // additionalOptions
331
+ ( ) => CommandResponseHandling . Return ,
332
+ BsonDocumentSerializer . Instance ,
333
+ new MessageEncoderSettings ( ) ,
334
+ cancellationToken ) ;
335
+ }
336
+
337
+ var commandSucceededEvent = eventCapturer . Next ( ) . Should ( ) . BeOfType < CommandSucceededEvent > ( ) . Subject ;
338
+ var actualReply = commandSucceededEvent . Reply ;
339
+ var actualClusterTime = actualReply [ "$clusterTime" ] . AsBsonDocument ;
340
+ session . ClusterTime . Should ( ) . Be ( actualClusterTime ) ;
341
+ server . ClusterClock . ClusterTime . Should ( ) . Be ( actualClusterTime ) ;
342
+ }
343
+ }
344
+ }
249
345
}
0 commit comments