Skip to content

Commit 9eaf604

Browse files
committed
CSHARP-4646: Support namespace queries in ChangeStreamDocument pipelines.
1 parent 4671451 commit 9eaf604

File tree

4 files changed

+134
-2
lines changed

4 files changed

+134
-2
lines changed

src/MongoDB.Driver.Core/ChangeStreamDocumentCollectionNamespaceSerializer.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
namespace MongoDB.Driver
2222
{
23-
internal class ChangeStreamDocumentCollectionNamespaceSerializer : SealedClassSerializerBase<CollectionNamespace>
23+
internal class ChangeStreamDocumentCollectionNamespaceSerializer : SealedClassSerializerBase<CollectionNamespace>, IBsonDocumentSerializer
2424
{
2525
#region static
2626
// private static fields
@@ -69,6 +69,18 @@ public override CollectionNamespace Deserialize(BsonDeserializationContext conte
6969
return null;
7070
}
7171

72+
public bool TryGetMemberSerializationInfo(string memberName, out BsonSerializationInfo serializationInfo)
73+
{
74+
if (memberName == "CollectionName")
75+
{
76+
serializationInfo = new BsonSerializationInfo("coll", StringSerializer.Instance, typeof(string));
77+
return true;
78+
}
79+
80+
serializationInfo = null;
81+
return false;
82+
}
83+
7284
protected override void SerializeValue(BsonSerializationContext context, BsonSerializationArgs args, CollectionNamespace value)
7385
{
7486
var writer = context.Writer;

src/MongoDB.Driver.Core/ChangeStreamDocumentDatabaseNamespaceSerializer.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
namespace MongoDB.Driver
2222
{
23-
internal class ChangeStreamDocumentDatabaseNamespaceSerializer : SealedClassSerializerBase<DatabaseNamespace>
23+
internal class ChangeStreamDocumentDatabaseNamespaceSerializer : SealedClassSerializerBase<DatabaseNamespace>, IBsonDocumentSerializer
2424
{
2525
#region static
2626
// private static fields
@@ -63,6 +63,18 @@ public override DatabaseNamespace Deserialize(BsonDeserializationContext context
6363
return null;
6464
}
6565

66+
public bool TryGetMemberSerializationInfo(string memberName, out BsonSerializationInfo serializationInfo)
67+
{
68+
if (memberName == "DatabaseName")
69+
{
70+
serializationInfo = new BsonSerializationInfo("db", StringSerializer.Instance, typeof(string));
71+
return true;
72+
}
73+
74+
serializationInfo = null;
75+
return false;
76+
}
77+
6678
protected override void SerializeValue(BsonSerializationContext context, BsonSerializationArgs args, DatabaseNamespace value)
6779
{
6880
var writer = context.Writer;

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class Feature
4747
private static readonly Feature __arrayFilters = new Feature("ArrayFilters", WireVersion.Server36);
4848
private static readonly Feature __bypassDocumentValidation = new Feature("BypassDocumentValidation", WireVersion.Server32);
4949
private static readonly Feature __changeStreamStage = new Feature("ChangeStreamStage", WireVersion.Server36);
50+
private static readonly Feature __changeStreamAllChangesForCluster = new Feature("ChangeStreamAllChangesForCluster", WireVersion.Server40);
51+
private static readonly Feature __changeStreamForDatabase = new Feature("ChangeStreamForDatabase", WireVersion.Server40);
5052
private static readonly Feature __changeStreamPostBatchResumeToken = new Feature("ChangeStreamPostBatchResumeToken", WireVersion.Server40);
5153
private static readonly Feature __changeStreamPrePostImages = new Feature("ChangeStreamPrePostImages", WireVersion.Server60);
5254
private static readonly Feature __clientSideEncryption = new Feature("ClientSideEncryption", WireVersion.Server42);
@@ -264,6 +266,16 @@ public class Feature
264266
[Obsolete("This property will be removed in a later release.")]
265267
public static Feature ChangeStreamStage => __changeStreamStage;
266268

269+
/// <summary>
270+
/// Gets the change stream all changes for cluster feature.
271+
/// </summary>
272+
public static Feature ChangeStreamAllChangesForCluster => __changeStreamAllChangesForCluster;
273+
274+
/// <summary>
275+
/// Gets the change stream for database feature.
276+
/// </summary>
277+
public static Feature ChangeStreamForDatabase => __changeStreamForDatabase;
278+
267279
/// <summary>
268280
/// Gets the change stream post batch resume token feature.
269281
/// </summary>
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/* Copyright 2010-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System.Collections.Generic;
17+
using MongoDB.Bson;
18+
using MongoDB.Bson.Serialization;
19+
using MongoDB.Bson.Serialization.Serializers;
20+
using MongoDB.Driver.Core.Clusters;
21+
using MongoDB.Driver.Core.Misc;
22+
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
23+
using MongoDB.Driver.Linq;
24+
using MongoDB.Driver.Tests.Linq.Linq3ImplementationTests;
25+
using Xunit;
26+
27+
namespace MongoDB.Driver.Tests.Jira.CSharp624
28+
{
29+
public class CSharp4646Tests : Linq3IntegrationTest
30+
{
31+
[Fact]
32+
public void Watch_client_filtering_on_database_name()
33+
{
34+
RequireServer.Check().ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded).Supports(Feature.ChangeStreamAllChangesForCluster);
35+
var client = DriverTestConfiguration.Client;
36+
37+
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
38+
.Match(x => x.DatabaseNamespace.DatabaseName.StartsWith("MyPrefix"));
39+
40+
var stages = RenderPipeline(pipeline);
41+
AssertStages(stages, "{ $match : { 'ns.db' : /^MyPrefix/s } }");
42+
43+
using var changeStream = client.Watch(pipeline);
44+
}
45+
46+
[Fact]
47+
public void Watch_client_filtering_on_collection_name()
48+
{
49+
RequireServer.Check().ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded).Supports(Feature.ChangeStreamAllChangesForCluster);
50+
var client = DriverTestConfiguration.Client;
51+
52+
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
53+
.Match(x => x.CollectionNamespace.CollectionName.StartsWith("MyPrefix"));
54+
55+
var stages = RenderPipeline(pipeline);
56+
AssertStages(stages, "{ $match : { 'ns.coll' : /^MyPrefix/s } }");
57+
58+
using var changeStream = client.Watch(pipeline);
59+
}
60+
61+
[Fact]
62+
public void Watch_database_filtering_on_collection_name()
63+
{
64+
RequireServer.Check().ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded).Supports(Feature.ChangeStreamForDatabase);
65+
var client = DriverTestConfiguration.Client;
66+
var database = client.GetDatabase("test");
67+
68+
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
69+
.Match(x => x.CollectionNamespace.CollectionName.StartsWith("MyPrefix"));
70+
71+
var stages = RenderPipeline(pipeline);
72+
AssertStages(stages, "{ $match : { 'ns.coll' : /^MyPrefix/s } }");
73+
74+
// some older versions of the server require the database to exist before you can watch it
75+
CreateDatabase(database);
76+
77+
using var changeStream = database.Watch(pipeline);
78+
}
79+
80+
private void CreateDatabase(IMongoDatabase database)
81+
{
82+
// the easiest way to create a database is to create a collection by inserting a document
83+
var collection = database.GetCollection<BsonDocument>(DriverTestConfiguration.CollectionNamespace.CollectionName);
84+
collection.InsertOne(new BsonDocument("_id", 1));
85+
}
86+
87+
private IList<BsonDocument> RenderPipeline(PipelineDefinition<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>> pipeline)
88+
{
89+
var serializerRegistry = BsonSerializer.SerializerRegistry;
90+
var documentSerializer = BsonDocumentSerializer.Instance;
91+
var changeStreamDocumentSerializer = new ChangeStreamDocumentSerializer<BsonDocument>(documentSerializer);
92+
var renderedPipeline = pipeline.Render(changeStreamDocumentSerializer, serializerRegistry, LinqProvider.V3);
93+
return renderedPipeline.Documents;
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)