Skip to content

Commit f809794

Browse files
CSHARP-2092: Cursor iteration should complete (abnormally) when another thread closes the cursor.
1 parent 794058d commit f809794

File tree

3 files changed

+65
-3
lines changed

3 files changed

+65
-3
lines changed

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class Feature
7676
private static readonly Feature __scramSha256Authentication = new Feature("ScramSha256Authentication", new SemanticVersion(4, 0, 0, ""));
7777
private static readonly Feature __serverExtractsUsernameFromX509Certificate = new Feature("ServerExtractsUsernameFromX509Certificate", new SemanticVersion(3, 3, 12));
7878
private static readonly Feature __shardedTransactions = new Feature("ShardedTransactions", new SemanticVersion(4, 1, 6));
79+
private static readonly Feature __tailableCursor = new Feature("TailableCursor", new SemanticVersion(3, 2, 0));
7980
private static readonly Feature __transactions = new Feature("Transactions", new SemanticVersion(4, 0, 0));
8081
private static readonly Feature __userManagementCommands = new Feature("UserManagementCommands", new SemanticVersion(2, 6, 0));
8182
private static readonly Feature __views = new Feature("Views", new SemanticVersion(3, 3, 11));
@@ -347,6 +348,11 @@ public class Feature
347348
/// </summary>
348349
public static Feature ShardedTransactions => __shardedTransactions;
349350

351+
/// <summary>
352+
/// Gets the tailable cursor feature.
353+
/// </summary>
354+
public static Feature TailableCursor => __tailableCursor;
355+
350356
/// <summary>
351357
/// Gets the transactions feature.
352358
/// </summary>

src/MongoDB.Driver.Core/Core/Operations/AsyncCursor.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ private async Task KillCursorsAsync(CancellationToken cancellationToken)
555555
public bool MoveNext(CancellationToken cancellationToken)
556556
{
557557
ThrowIfDisposed();
558+
cancellationToken.ThrowIfCancellationRequested();
558559

559560
bool hasMore;
560561
if (TryMoveNext(out hasMore))
@@ -571,6 +572,7 @@ public bool MoveNext(CancellationToken cancellationToken)
571572
public async Task<bool> MoveNextAsync(CancellationToken cancellationToken)
572573
{
573574
ThrowIfDisposed();
575+
cancellationToken.ThrowIfCancellationRequested();
574576

575577
bool hasMore;
576578
if (TryMoveNext(out hasMore))

tests/MongoDB.Driver.Tests/AsyncCursorTests.cs

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
* limitations under the License.
1414
*/
1515

16+
using System;
1617
using System.Collections.Generic;
18+
using System.Threading;
1719
using FluentAssertions;
1820
using FluentAssertions.Common;
1921
using MongoDB.Bson;
@@ -78,17 +80,69 @@ public void KillCursor_should_actually_work()
7880
cursor = collection.FindSync("{}");
7981
cursor.MoveNext();
8082

81-
var cursorId = ((AsyncCursor<BsonDocument>) cursor)._cursorId();
83+
var cursorId = ((AsyncCursor<BsonDocument>)cursor)._cursorId();
8284
cursorId.Should().NotBe(0);
8385
cursor.Dispose();
8486

8587
var desiredResult = BsonDocument.Parse($"{{ \"cursorsKilled\" : [{cursorId}], \"cursorsNotFound\" : [], " +
8688
$"\"cursorsAlive\" : [], \"cursorsUnknown\" : [], \"ok\" : 1.0 }}");
87-
var result = ((CommandSucceededEvent) eventCapturer.Events[0]).Reply;
89+
var result = ((CommandSucceededEvent)eventCapturer.Events[0]).Reply;
8890
result.IsSameOrEqualTo(desiredResult);
8991
}
9092
}
9193

94+
[SkippableFact]
95+
public void Tailable_cursor_should_be_able_to_be_cancelled_from_a_different_thread_with_expected_result()
96+
{
97+
RequireServer.Check().Supports(Feature.TailableCursor);
98+
99+
string testCollectionName = "test";
100+
string testDatabaseName = "test";
101+
102+
var client = DriverTestConfiguration.Client;
103+
var database = client.GetDatabase(testDatabaseName);
104+
var collection = database.GetCollection<BsonDocument>(testCollectionName);
105+
106+
DropCollection(client, testDatabaseName, testCollectionName);
107+
var createCollectionOptions = new CreateCollectionOptions()
108+
{
109+
Capped = true,
110+
MaxSize = 1000
111+
};
112+
database.CreateCollection(testCollectionName, createCollectionOptions);
113+
collection.InsertOne(new BsonDocument()); // tailable cursors don't work on an empty collection
114+
115+
using (var cancellationTokenSource = new CancellationTokenSource())
116+
{
117+
var findOptions = new FindOptions<BsonDocument>()
118+
{
119+
BatchSize = 1,
120+
CursorType = CursorType.TailableAwait
121+
};
122+
var cursor = collection.FindSync(FilterDefinition<BsonDocument>.Empty, findOptions);
123+
var enumerator = cursor.ToEnumerable(cancellationTokenSource.Token).GetEnumerator();
124+
125+
var semaphore = new SemaphoreSlim(0);
126+
var thread = new Thread(() =>
127+
{
128+
semaphore.Wait();
129+
cancellationTokenSource.Cancel();
130+
});
131+
thread.Start();
132+
133+
var exception = Record.Exception((Action)(() =>
134+
{
135+
while (true)
136+
{
137+
_ = enumerator.MoveNext();
138+
semaphore.Release(1);
139+
}
140+
}));
141+
142+
exception.Should().BeAssignableTo<OperationCanceledException>();
143+
}
144+
}
145+
92146
//private methods
93147
private IMongoClient CreateClient()
94148
{
@@ -105,6 +159,6 @@ private void DropCollection(IMongoClient client, string databaseName, string col
105159
public static class AsyncCursorReflector
106160
{
107161
public static long _cursorId(this AsyncCursor<BsonDocument> obj) =>
108-
(long) Reflector.GetFieldValue(obj, nameof(_cursorId));
162+
(long)Reflector.GetFieldValue(obj, nameof(_cursorId));
109163
}
110164
}

0 commit comments

Comments
 (0)