Skip to content

Commit 10f3540

Browse files
committed
CSHARP-2301: Changes to exactly when Watch should capture the original startAtOperationTime.
1 parent 4900985 commit 10f3540

File tree

1 file changed

+28
-6
lines changed

1 file changed

+28
-6
lines changed

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamOperation.cs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,11 +264,22 @@ public IAsyncCursor<TResult> Execute(IReadBinding binding, CancellationToken can
264264
throw new ArgumentException("The binding value passed to ChangeStreamOperation.Execute must implement IReadBindingHandle.", nameof(binding));
265265
}
266266

267-
var cursor = Resume(bindingHandle, cancellationToken);
268-
if (_startAtOperationTime == null && _resumeAfter == null)
267+
IAsyncCursor<RawBsonDocument> cursor;
268+
using (var channelSource = binding.GetReadChannelSource(cancellationToken))
269+
using (var channel = channelSource.GetChannel(cancellationToken))
270+
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
269271
{
270-
_startAtOperationTime = binding.Session.OperationTime;
272+
cursor = Resume(channelBinding, cancellationToken);
273+
if (_startAtOperationTime == null && _resumeAfter == null)
274+
{
275+
var maxWireVersion = channel.ConnectionDescription.IsMasterResult.MaxWireVersion;
276+
if (maxWireVersion >= 7)
277+
{
278+
_startAtOperationTime = binding.Session.OperationTime;
279+
}
280+
}
271281
}
282+
272283
return new ChangeStreamCursor<TResult>(cursor, _resultSerializer, bindingHandle.Fork(), this);
273284
}
274285

@@ -281,11 +292,22 @@ public async Task<IAsyncCursor<TResult>> ExecuteAsync(IReadBinding binding, Canc
281292
throw new ArgumentException("The binding value passed to ChangeStreamOperation.ExecuteAsync must implement IReadBindingHandle.", nameof(binding));
282293
}
283294

284-
var cursor = await ResumeAsync(bindingHandle, cancellationToken).ConfigureAwait(false);
285-
if (_startAtOperationTime == null && _resumeAfter == null)
295+
IAsyncCursor<RawBsonDocument> cursor;
296+
using (var channelSource = await binding.GetReadChannelSourceAsync(cancellationToken).ConfigureAwait(false))
297+
using (var channel = await channelSource.GetChannelAsync(cancellationToken).ConfigureAwait(false))
298+
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
286299
{
287-
_startAtOperationTime = binding.Session.OperationTime;
300+
cursor = await ResumeAsync(channelBinding, cancellationToken).ConfigureAwait(false);
301+
if (_startAtOperationTime == null && _resumeAfter == null)
302+
{
303+
var maxWireVersion = channel.ConnectionDescription.IsMasterResult.MaxWireVersion;
304+
if (maxWireVersion >= 7)
305+
{
306+
_startAtOperationTime = binding.Session.OperationTime;
307+
}
308+
}
288309
}
310+
289311
return new ChangeStreamCursor<TResult>(cursor, _resultSerializer, bindingHandle.Fork(), this);
290312
}
291313

0 commit comments

Comments
 (0)