Skip to content

Commit 45700f7

Browse files
committed
CSHARP-2267: ChangeStream cursor raises resumable exception when there is no event since last replica set election.
1 parent bc07e5f commit 45700f7

File tree

1 file changed

+15
-19
lines changed

1 file changed

+15
-19
lines changed

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamCursor.cs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,20 +81,18 @@ public void Dispose()
8181
public bool MoveNext(CancellationToken cancellationToken = default(CancellationToken))
8282
{
8383
bool hasMore;
84-
try
84+
while (true)
8585
{
86-
hasMore = _cursor.MoveNext(cancellationToken);
87-
}
88-
catch (Exception ex)
89-
{
90-
if (RetryabilityHelper.IsResumableChangeStreamException(ex))
86+
try
9187
{
92-
_cursor = _changeStreamOperation.Resume(_binding, cancellationToken);
9388
hasMore = _cursor.MoveNext(cancellationToken);
89+
break;
9490
}
95-
else
91+
catch (Exception ex) when (RetryabilityHelper.IsResumableChangeStreamException(ex))
9692
{
97-
throw;
93+
var newCursor = _changeStreamOperation.Resume(_binding, cancellationToken);
94+
_cursor.Dispose();
95+
_cursor = newCursor;
9896
}
9997
}
10098

@@ -104,22 +102,20 @@ public void Dispose()
104102

105103
/// <inheritdoc/>
106104
public async Task<bool> MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken))
107-
{
105+
{
108106
bool hasMore;
109-
try
107+
while (true)
110108
{
111-
hasMore = await _cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false);
112-
}
113-
catch (Exception ex)
114-
{
115-
if (RetryabilityHelper.IsResumableChangeStreamException(ex))
109+
try
116110
{
117-
_cursor = await _changeStreamOperation.ResumeAsync(_binding, cancellationToken).ConfigureAwait(false);
118111
hasMore = await _cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false);
112+
break;
119113
}
120-
else
114+
catch (Exception ex) when (RetryabilityHelper.IsResumableChangeStreamException(ex))
121115
{
122-
throw;
116+
var newCursor = await _changeStreamOperation.ResumeAsync(_binding, cancellationToken).ConfigureAwait(false);
117+
_cursor.Dispose();
118+
_cursor = newCursor;
123119
}
124120
}
125121

0 commit comments

Comments
 (0)