Skip to content

Commit 42ad0f1

Browse files
committed
CSHARP-2023: Change Stream work in progress.
1 parent 0eae925 commit 42ad0f1

19 files changed

+979
-7
lines changed

src/MongoDB.Driver/AggregateFluent.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2010-2016 MongoDB Inc.
1+
/* Copyright 2010-2017 MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -98,6 +98,11 @@ public override IAggregateFluent<TNewResult> BucketAuto<TValue, TNewResult>(
9898
return WithPipeline(_pipeline.BucketAuto(groupBy, buckets, output, options));
9999
}
100100

101+
public override IAggregateFluent<ChangeStreamOutput<TResult>> ChangeStream(ChangeStreamOptions options = null)
102+
{
103+
return WithPipeline(_pipeline.ChangeStream(options));
104+
}
105+
101106
public override IAggregateFluent<AggregateCountResult> Count()
102107
{
103108
return WithPipeline(_pipeline.Count());

src/MongoDB.Driver/AggregateFluentBase.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2010-2016 MongoDB Inc.
1+
/* Copyright 2010-2017 MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -85,6 +85,12 @@ public virtual IAggregateFluent<TNewResult> BucketAuto<TValue, TNewResult>(
8585
throw new NotImplementedException();
8686
}
8787

88+
/// <inheritdoc />
89+
public virtual IAggregateFluent<ChangeStreamOutput<TResult>> ChangeStream(ChangeStreamOptions options = null)
90+
{
91+
throw new NotImplementedException();
92+
}
93+
8894
/// <inheritdoc />
8995
public virtual IAggregateFluent<AggregateCountResult> Count()
9096
{

src/MongoDB.Driver/ChangeStream.cs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/* Copyright 2017 MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using MongoDB.Bson;
17+
using MongoDB.Driver.Core.Misc;
18+
using System;
19+
using System.Collections.Generic;
20+
using System.Threading;
21+
using System.Threading.Tasks;
22+
23+
namespace MongoDB.Driver
24+
{
25+
/// <summary>
26+
/// Represents a change stream.
27+
/// </summary>
28+
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
29+
/// <seealso cref="MongoDB.Driver.IAsyncCursor{TOutput}" />
30+
public sealed class ChangeStream<TOutput> : IAsyncCursor<TOutput>
31+
{
32+
// private fields
33+
private IAsyncCursor<TOutput> _cursor;
34+
private readonly ChangeStreamOptions _options;
35+
private readonly IReadOnlyList<BsonDocument> _pipeline;
36+
private readonly ReadPreference _readPreference;
37+
private BsonDocument _resumeToken;
38+
39+
// public properties
40+
/// <inheritdoc/>
41+
public IEnumerable<TOutput> Current => null;
42+
43+
// constructors
44+
internal ChangeStream(
45+
IAsyncCursor<TOutput> cursor,
46+
IReadOnlyList<BsonDocument> pipeline,
47+
ChangeStreamOptions options,
48+
ReadPreference readPreference)
49+
{
50+
_cursor = Ensure.IsNotNull(cursor, nameof(cursor));
51+
_pipeline = pipeline;
52+
_options = options;
53+
_readPreference = readPreference;
54+
}
55+
56+
// public methods
57+
/// <inheritdoc/>
58+
public void Dispose()
59+
{
60+
}
61+
62+
/// <inheritdoc/>
63+
public bool MoveNext(CancellationToken cancellationToken = default(CancellationToken))
64+
{
65+
throw new NotImplementedException();
66+
}
67+
68+
/// <inheritdoc/>
69+
public Task<bool> MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken))
70+
{
71+
throw new NotImplementedException();
72+
}
73+
}
74+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/* Copyright 2017 MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
namespace MongoDB.Driver
17+
{
18+
/// <summary>
19+
/// Change stream FullDocument option.
20+
/// </summary>
21+
public enum ChangeStreamFullDocumentOption
22+
{
23+
/// <summary>
24+
/// Do not return the full document.
25+
/// </summary>
26+
None = 0,
27+
/// <summary>
28+
/// Lookup the full document and return it.
29+
/// </summary>
30+
Lookup
31+
}
32+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/* Copyright 2017 MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
namespace MongoDB.Driver
17+
{
18+
/// <summary>
19+
/// The operation type of a change stream result.
20+
/// </summary>
21+
public enum ChangeStreamOperationType
22+
{
23+
/// <summary>
24+
/// An insert operation type.
25+
/// </summary>
26+
Insert,
27+
/// <summary>
28+
/// An update operation type.
29+
/// </summary>
30+
Update,
31+
/// <summary>
32+
/// A replace operation type.
33+
/// </summary>
34+
Replace,
35+
/// <summary>
36+
/// A delete operation type.
37+
/// </summary>
38+
Delete,
39+
/// <summary>
40+
/// An invalidate operation type.
41+
/// </summary>
42+
Invalidate
43+
}
44+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/* Copyright 2017 MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using MongoDB.Bson.Serialization;
17+
using MongoDB.Bson.Serialization.Serializers;
18+
using System;
19+
20+
namespace MongoDB.Driver
21+
{
22+
/// <summary>
23+
/// A serializer for ChangeStreamOperationType values.
24+
/// </summary>
25+
public class ChangeStreamOperationTypeSerializer : StructSerializerBase<ChangeStreamOperationType>
26+
{
27+
/// <inheritdoc />
28+
public override ChangeStreamOperationType Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args)
29+
{
30+
var reader = context.Reader;
31+
32+
var stringValue = reader.ReadString();
33+
switch (stringValue)
34+
{
35+
case "delete": return ChangeStreamOperationType.Delete;
36+
case "insert": return ChangeStreamOperationType.Insert;
37+
case "invalidate": return ChangeStreamOperationType.Invalidate;
38+
case "replace": return ChangeStreamOperationType.Replace;
39+
case "update": return ChangeStreamOperationType.Update;
40+
default: throw new FormatException($"Invalid ChangeStreamOperationType: \"{stringValue}\".");
41+
}
42+
}
43+
44+
/// <inheritdoc />
45+
public override void Serialize(BsonSerializationContext context, BsonSerializationArgs args, ChangeStreamOperationType value)
46+
{
47+
var writer = context.Writer;
48+
49+
switch (value)
50+
{
51+
case ChangeStreamOperationType.Delete: writer.WriteString("delete"); break;
52+
case ChangeStreamOperationType.Insert: writer.WriteString("insert"); break;
53+
case ChangeStreamOperationType.Invalidate: writer.WriteString("invalidate"); break;
54+
case ChangeStreamOperationType.Replace: writer.WriteString("replace"); break;
55+
case ChangeStreamOperationType.Update: writer.WriteString("update"); break;
56+
default: throw new FormatException($"Invalid ChangeStreamOperationType: {value}.");
57+
}
58+
}
59+
}
60+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/* Copyright 2017 MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using MongoDB.Bson;
17+
using MongoDB.Driver.Core.Misc;
18+
using System;
19+
20+
namespace MongoDB.Driver
21+
{
22+
/// <summary>
23+
/// Options for an aggregate operation.
24+
/// </summary>
25+
public class ChangeStreamOptions
26+
{
27+
// private fields
28+
private int? _batchSize;
29+
private Collation _collation;
30+
private ChangeStreamFullDocumentOption? _fullDocument;
31+
private TimeSpan? _maxAwaitTime;
32+
private BsonDocument _resumeAfter;
33+
34+
// public properties
35+
/// <summary>
36+
/// Gets or sets the size of the batch.
37+
/// </summary>
38+
/// <value>
39+
/// The size of the batch.
40+
/// </value>
41+
public int? BatchSize
42+
{
43+
get { return _batchSize; }
44+
set { _batchSize = Ensure.IsNullOrGreaterThanZero(value, nameof(value)); }
45+
}
46+
47+
/// <summary>
48+
/// Gets or sets the collation.
49+
/// </summary>
50+
/// <value>
51+
/// The collation.
52+
/// </value>
53+
public Collation Collation
54+
{
55+
get { return _collation; }
56+
set { _collation = value; }
57+
}
58+
59+
/// <summary>
60+
/// Gets or sets the full document.
61+
/// </summary>
62+
/// <value>
63+
/// The full document.
64+
/// </value>
65+
public ChangeStreamFullDocumentOption? FullDocument
66+
{
67+
get { return _fullDocument; }
68+
set { _fullDocument = value; }
69+
}
70+
71+
/// <summary>
72+
/// Gets or sets the maximum await time.
73+
/// </summary>
74+
/// <value>
75+
/// The maximum await time.
76+
/// </value>
77+
public TimeSpan? MaxAwaitTime
78+
{
79+
get { return _maxAwaitTime; }
80+
set { _maxAwaitTime = Ensure.IsNullOrGreaterThanZero(value, nameof(value)); }
81+
}
82+
83+
/// <summary>
84+
/// Gets or sets the resume after.
85+
/// </summary>
86+
/// <value>
87+
/// The resume after.
88+
/// </value>
89+
public BsonDocument ResumeAfter
90+
{
91+
get { return _resumeAfter; }
92+
set { _resumeAfter = value; }
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)