Skip to content

Commit 1a4742b

Browse files
CSHARP-1951: Use killCursors command.
1 parent b473de0 commit 1a4742b

File tree

4 files changed

+576
-27
lines changed

4 files changed

+576
-27
lines changed

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class Feature
5353
private static readonly Feature __findCommand = new Feature("FindCommand", new SemanticVersion(3, 2, 0));
5454
private static readonly Feature __geoNearCommand = new Feature("GeoNearCommand", new SemanticVersion(1, 0, 0), new SemanticVersion(4, 1, 0, ""));
5555
private static readonly Feature __groupCommand = new Feature("GroupCommand", new SemanticVersion(1, 0, 0), new SemanticVersion(4, 0, 0, "rc1"));
56+
private static readonly Feature __killCursorsCommand = new Feature("KillCursorsCommand", new SemanticVersion(3, 2, 0));
5657
private static readonly Feature __listCollectionsCommand = new Feature("ListCollectionsCommand", new SemanticVersion(3, 0, 0));
5758
private static readonly Feature __listDatabasesFilter = new Feature("ListDatabasesFilter", new SemanticVersion(3, 4, 2));
5859
private static readonly Feature __listDatabasesNameOnlyOption = new Feature("ListDatabasesNameOnlyOption", new SemanticVersion(3, 4, 3));
@@ -221,6 +222,11 @@ public class Feature
221222
/// </summary>
222223
public static Feature GroupCommand => __groupCommand;
223224

225+
/// <summary>
226+
/// Get the killCursors command feature.
227+
/// </summary>
228+
public static Feature KillCursorsCommand => __killCursorsCommand;
229+
224230
/// <summary>
225231
/// Gets the index options defaults feature.
226232
/// </summary>

src/MongoDB.Driver.Core/Core/Operations/AsyncCursor.cs

Lines changed: 196 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
using MongoDB.Bson.Serialization;
2424
using MongoDB.Bson.Serialization.Serializers;
2525
using MongoDB.Driver.Core.Bindings;
26+
using MongoDB.Driver.Core.Connections;
2627
using MongoDB.Driver.Core.Events;
2728
using MongoDB.Driver.Core.Misc;
2829
using MongoDB.Driver.Core.WireProtocol;
@@ -48,6 +49,7 @@ public class AsyncCursor<TDocument> : IAsyncCursor<TDocument>
4849
private readonly int? _batchSize;
4950
private readonly CollectionNamespace _collectionNamespace;
5051
private IChannelSource _channelSource;
52+
private bool _closed;
5153
private int _count;
5254
private IReadOnlyList<TDocument> _currentBatch;
5355
private long _cursorId;
@@ -133,6 +135,39 @@ private int CalculateGetMoreProtocolNumberToReturn()
133135
return numberToReturn;
134136
}
135137

138+
/// <summary>
139+
/// Closes the cursor.
140+
/// </summary>
141+
/// <param name="cancellationToken">The cancellation token.</param>
142+
public void Close(CancellationToken cancellationToken = default(CancellationToken))
143+
{
144+
try
145+
{
146+
CloseIfNotAlreadyClosed(cancellationToken);
147+
}
148+
finally
149+
{
150+
Dispose();
151+
}
152+
}
153+
154+
/// <summary>
155+
/// Closes the cursor.
156+
/// </summary>
157+
/// <param name="cancellationToken">The cancellation token.</param>
158+
/// <returns>A task.</returns>
159+
public async Task CloseAsync(CancellationToken cancellationToken = default(CancellationToken))
160+
{
161+
try
162+
{
163+
await CloseIfNotAlreadyClosedAsync(cancellationToken).ConfigureAwait(false);
164+
}
165+
finally
166+
{
167+
Dispose();
168+
}
169+
}
170+
136171
private CursorBatch<TDocument> CreateCursorBatch(BsonDocument result)
137172
{
138173
var cursorDocument = result["cursor"].AsBsonDocument;
@@ -159,6 +194,17 @@ private BsonDocument CreateGetMoreCommand()
159194
return command;
160195
}
161196

197+
private BsonDocument CreateKillCursorsCommand()
198+
{
199+
var command = new BsonDocument
200+
{
201+
{ "killCursors", _collectionNamespace.CollectionName },
202+
{ "cursors", new BsonArray { _cursorId } }
203+
};
204+
205+
return command;
206+
}
207+
162208
private CursorBatch<TDocument> ExecuteGetMoreCommand(IChannelHandle channel, CancellationToken cancellationToken)
163209
{
164210
var command = CreateGetMoreCommand();
@@ -227,6 +273,47 @@ private Task<CursorBatch<TDocument>> ExecuteGetMoreProtocolAsync(IChannelHandle
227273
cancellationToken);
228274
}
229275

276+
private void ExecuteKillCursorsCommand(IChannelHandle channel, CancellationToken cancellationToken)
277+
{
278+
var command = CreateKillCursorsCommand();
279+
var result = channel.Command(
280+
_channelSource.Session,
281+
null, // readPreference
282+
_collectionNamespace.DatabaseNamespace,
283+
command,
284+
null, // commandPayloads
285+
NoOpElementNameValidator.Instance,
286+
null, // additionalOptions
287+
null, // postWriteAction
288+
CommandResponseHandling.Return,
289+
BsonDocumentSerializer.Instance,
290+
_messageEncoderSettings,
291+
cancellationToken);
292+
293+
ThrowIfKillCursorsCommandFailed(result, channel.ConnectionDescription.ConnectionId);
294+
}
295+
296+
private async Task ExecuteKillCursorsCommandAsync(IChannelHandle channel, CancellationToken cancellationToken)
297+
{
298+
var command = CreateKillCursorsCommand();
299+
var result = await channel.CommandAsync(
300+
_channelSource.Session,
301+
null, // readPreference
302+
_collectionNamespace.DatabaseNamespace,
303+
command,
304+
null, // commandPayloads
305+
NoOpElementNameValidator.Instance,
306+
null, // additionalOptions
307+
null, // postWriteAction
308+
CommandResponseHandling.Return,
309+
BsonDocumentSerializer.Instance,
310+
_messageEncoderSettings,
311+
cancellationToken)
312+
.ConfigureAwait(false);
313+
314+
ThrowIfKillCursorsCommandFailed(result, channel.ConnectionDescription.ConnectionId);
315+
}
316+
230317
private void ExecuteKillCursorsProtocol(IChannelHandle channel, CancellationToken cancellationToken)
231318
{
232319
channel.KillCursors(
@@ -235,6 +322,14 @@ private void ExecuteKillCursorsProtocol(IChannelHandle channel, CancellationToke
235322
cancellationToken);
236323
}
237324

325+
private Task ExecuteKillCursorsProtocolAsync(IChannelHandle channel, CancellationToken cancellationToken)
326+
{
327+
return channel.KillCursorsAsync(
328+
new[] { _cursorId },
329+
_messageEncoderSettings,
330+
cancellationToken);
331+
}
332+
238333
/// <inheritdoc/>
239334
public void Dispose()
240335
{
@@ -252,27 +347,66 @@ protected virtual void Dispose(bool disposing)
252347
{
253348
if (!_disposed)
254349
{
255-
try
350+
CloseIfNotAlreadyClosedFromDispose();
351+
352+
if (_channelSource != null)
256353
{
257-
if (_cursorId != 0)
258-
{
259-
using (var source = new CancellationTokenSource(TimeSpan.FromSeconds(10)))
260-
{
261-
KillCursor(_cursorId, source.Token);
262-
}
263-
}
354+
_channelSource.Dispose();
264355
}
265-
catch
356+
_disposed = true;
357+
}
358+
}
359+
}
360+
361+
private void CloseIfNotAlreadyClosed(CancellationToken cancellationToken)
362+
{
363+
if (!_closed)
364+
{
365+
try
366+
{
367+
if (_cursorId != 0)
266368
{
267-
// ignore exceptions
369+
KillCursors(cancellationToken);
268370
}
269-
if (_channelSource != null)
371+
}
372+
finally
373+
{
374+
_closed = true;
375+
}
376+
}
377+
}
378+
379+
private async Task CloseIfNotAlreadyClosedAsync(CancellationToken cancellationToken)
380+
{
381+
if (!_closed)
382+
{
383+
try
384+
{
385+
if (_cursorId != 0)
270386
{
271-
_channelSource.Dispose();
387+
await KillCursorsAsync(cancellationToken).ConfigureAwait(false);
272388
}
273389
}
390+
finally
391+
{
392+
_closed = true;
393+
}
394+
}
395+
}
396+
397+
private void CloseIfNotAlreadyClosedFromDispose()
398+
{
399+
try
400+
{
401+
using (var source = new CancellationTokenSource(TimeSpan.FromSeconds(10)))
402+
{
403+
CloseIfNotAlreadyClosed(source.Token);
404+
}
405+
}
406+
catch
407+
{
408+
// ignore any exceptions from CloseIfNotAlreadyClosed when called from Dispose
274409
}
275-
_disposed = true;
276410
}
277411

278412
private void DisposeChannelSourceIfNoLongerNeeded()
@@ -316,21 +450,39 @@ private async Task<CursorBatch<TDocument>> GetNextBatchAsync(CancellationToken c
316450
}
317451
}
318452

319-
private void KillCursor(long cursorId, CancellationToken cancellationToken)
453+
private void KillCursors(CancellationToken cancellationToken)
320454
{
321-
try
455+
using (EventContext.BeginOperation(_operationId))
456+
using (EventContext.BeginKillCursors(_collectionNamespace))
457+
using (var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10)))
458+
using (var channel = _channelSource.GetChannel(cancellationTokenSource.Token))
322459
{
323-
using (EventContext.BeginOperation(_operationId))
324-
using (EventContext.BeginKillCursors(_collectionNamespace))
325-
using (var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10)))
326-
using (var channel = _channelSource.GetChannel(cancellationTokenSource.Token))
460+
if (Feature.KillCursorsCommand.IsSupported(channel.ConnectionDescription.ServerVersion))
461+
{
462+
ExecuteKillCursorsCommand(channel, cancellationToken);
463+
}
464+
else
327465
{
328466
ExecuteKillCursorsProtocol(channel, cancellationToken);
329467
}
330468
}
331-
catch
469+
}
470+
471+
private async Task KillCursorsAsync(CancellationToken cancellationToken)
472+
{
473+
using (EventContext.BeginOperation(_operationId))
474+
using (EventContext.BeginKillCursors(_collectionNamespace))
475+
using (var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10)))
476+
using (var channel = await _channelSource.GetChannelAsync(cancellationTokenSource.Token).ConfigureAwait(false))
332477
{
333-
// ignore exceptions
478+
if (Feature.KillCursorsCommand.IsSupported(channel.ConnectionDescription.ServerVersion))
479+
{
480+
await ExecuteKillCursorsCommandAsync(channel, cancellationToken).ConfigureAwait(false);
481+
}
482+
else
483+
{
484+
await ExecuteKillCursorsProtocolAsync(channel, cancellationToken).ConfigureAwait(false);
485+
}
334486
}
335487
}
336488

@@ -393,6 +545,28 @@ private void ThrowIfDisposed()
393545
}
394546
}
395547

548+
private void ThrowIfKillCursorsCommandFailed(BsonDocument commandResult, ConnectionId connectionId)
549+
{
550+
if (!commandResult.GetValue("ok", false).ToBoolean())
551+
{
552+
throw new MongoCommandException(connectionId, "Kill cursors command returned an error.", commandResult);
553+
}
554+
else
555+
{
556+
var notFoundCursors = commandResult["cursorsNotFound"].AsBsonArray;
557+
if (notFoundCursors.Count > 0)
558+
{
559+
throw new MongoCursorNotFoundException(connectionId, _cursorId, commandResult);
560+
}
561+
562+
var killedCursors = commandResult["cursorsKilled"].AsBsonArray.Select(c => c.ToInt64());
563+
if (!killedCursors.Contains(_cursorId))
564+
{
565+
throw new MongoCommandException(connectionId, "Kill cursors command failed.", commandResult);
566+
}
567+
}
568+
}
569+
396570
private bool TryMoveNext(out bool hasMore)
397571
{
398572
hasMore = false;
@@ -419,4 +593,4 @@ private bool TryMoveNext(out bool hasMore)
419593
return false;
420594
}
421595
}
422-
}
596+
}

0 commit comments

Comments
 (0)