CSHARP-5887: Simplify retryable read and writes#1882
CSHARP-5887: Simplify retryable read and writes#1882papafe wants to merge 3 commits intomongodb:mainfrom
Conversation
3321aad to
5a8fcca
Compare
| serverResponse = commandException.Result; | ||
| } | ||
| catch (Exception exception) | ||
| catch (Exception exception) when (!context.ErrorDuringLastChannelAcquisition) |
There was a problem hiding this comment.
This avoids an exception in EnsureCanProceedNextBatch and ToFinalResultsOrThrow, as the context does not have the channel anymore, as there was an exception.
This error was not visible before, as the channel acquisition was done outside of the try catch and it will just raise the exception on the whole method.
The question here is we want the non-retryable channel acquisition to make the whole method throw (like it was before), or we need to find another way to make EnsureCanProceedNextBatch and ToFinalResultsOrThrow work.
| { | ||
| _databaseNamespace = Ensure.IsNotNull(databaseNamespace, nameof(databaseNamespace)); | ||
| _command = Ensure.IsNotNull(command, nameof(command)); | ||
| _command = command; //can be null |
There was a problem hiding this comment.
This is used so that we can modify the command once the ReadCommandOperation has been created.
This is done because some operations need to have the operationContext to properly create the command, but now the operationContext is not available until later.
I'm not a fan of this implementation, to be honest.
We got two other possibilities in my opinion:
- We make
ReadCommandOperationequivalent toWriteCommandOperation, so it does not concern with retryability. Then we make a new class,RetryableReadCommandOperationthat has retryability and usesReadCommandOperationinside - Instead of having the possibility of setting the command, we can add a method to modify it when we got the operation context.
I think nr1 is more desirable, but requires further much more refactoring.
| { | ||
| documents = new BatchableSource<TDocument>(_documents.Items, _documents.Offset, _documents.ProcessedCount, canBeSplit: false); | ||
| documents = new BatchableSource<TDocument>(_documents.Items, _documents.Offset, _documents.Count, _documents.ProcessedCount, canBeSplit: false); | ||
| //TODO This looked wrong, is it tested somewhere? In the update command it was not even used. |
There was a problem hiding this comment.
It seems this was not working at all before, do we have tests?
There was a problem hiding this comment.
Agree, it looks like our retry was not working properly if an error occurs on non-first batch.
| ReplaceChannel(ChannelSource.GetChannel(operationContext)); | ||
|
|
||
| ChannelPinningHelper.PinChannellIfRequired(ChannelSource, Channel, | ||
| Binding.Session); //TODO We should do it only the first time, as an improvement we could pass the attempt number. |
There was a problem hiding this comment.
I don't think it's a problem to call this method multiple times, is it?
| { | ||
| HashSet<ServerDescription> deprioritizedServers = null; | ||
| var attempt = 1; | ||
| var totalAttempts = 0; |
There was a problem hiding this comment.
Renamed to be aligned with the write version, and starting from 0 to be more compatible with the future client backpressure integration
| #region static | ||
|
|
||
| public static RetryableReadContext Create(OperationContext operationContext, IReadBinding binding, bool retryRequested) | ||
| public static RetryableReadContext Create(IReadBinding binding, bool retryRequested) |
There was a problem hiding this comment.
Do we still need this factory method? As far as you've removed the connection resolving logic, I would say there is no reasons to keep it.
| /// Sets the command to be executed. This is used by derived classes that build commands dynamically. | ||
| /// </summary> | ||
| /// <param name="command">The command.</param> | ||
| protected void SetCommand(BsonDocument command) |
There was a problem hiding this comment.
I would suggest to remove the _command variable and introduce abstract CreateCommand method instead, so derived classes could create the command on demand.
| { | ||
| documents = new BatchableSource<TDocument>(_documents.Items, _documents.Offset, _documents.ProcessedCount, canBeSplit: false); | ||
| documents = new BatchableSource<TDocument>(_documents.Items, _documents.Offset, _documents.Count, _documents.ProcessedCount, canBeSplit: false); | ||
| //TODO This looked wrong, is it tested somewhere? In the update command it was not even used. |
There was a problem hiding this comment.
Agree, it looks like our retry was not working properly if an error occurs on non-first batch.
| } | ||
|
|
||
| private EventContext.OperationNameDisposer BeginOperation() => EventContext.BeginOperation("distinct"); | ||
| private EventContext.OperationIdDisposer BeginOperation() => EventContext.BeginOperation(null, "distinct"); |
There was a problem hiding this comment.
It is unclear to me the difference between the overloads
There was a problem hiding this comment.
Pull request overview
Simplifies retryable read/write execution by centralizing channel acquisition/replacement inside the retry executors/contexts, and introduces dynamic command creation for retryable read command operations.
Changes:
- Refactors
RetryableReadOperationExecutor/RetryableWriteOperationExecutorto acquire/replace channels per attempt and track last acquired server. - Removes
RetryableReadContext.Create*/RetryableWriteContext.Create*factories; callers now construct contexts directly. - Adds
ICommandCreator+ aReadCommandOperationoverload to build commands dynamically per attempt/connection (used by find/aggregate/count/distinct).
Reviewed changes
Copilot reviewed 27 out of 27 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/MongoDB.Driver.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs | Updates test helper to construct RetryableWriteContext and acquire a channel explicitly. |
| tests/MongoDB.Driver.Tests/Core/Operations/CommandOperationBaseTests.cs | Removes test asserting command must be non-null (aligning with dynamic-command support). |
| tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs | Updates helper methods to construct contexts and explicitly acquire channels sync/async. |
| src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs | Moves channel acquisition into executor loop; adds attempt counters and uses LastAcquiredServer. |
| src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs | Removes Create/CreateAsync; adds ErrorDuringLastChannelAcquisition and LastAcquiredServer. |
| src/MongoDB.Driver/Core/Operations/RetryableWriteCommandOperationBase.cs | Switches to direct context construction (executor now acquires channels). |
| src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs | Adjusts retry payload construction and payload variable used in the message section. |
| src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs | Moves channel acquisition into executor loop; uses LastAcquiredServer for deprioritization. |
| src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs | Removes Create/CreateAsync; tracks LastAcquiredServer; pins channel after acquisition. |
| src/MongoDB.Driver/Core/Operations/RetryableInsertCommandOperation.cs | Adjusts retry payload construction for insert retries. |
| src/MongoDB.Driver/Core/Operations/RetryableDeleteCommandOperation.cs | Adjusts retry payload construction for delete retries. |
| src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs | Adds dynamic-command constructor via ICommandCreator; sets command per attempt. |
| src/MongoDB.Driver/Core/Operations/ListIndexesUsingCommandOperation.cs | Switches to direct RetryableReadContext construction. |
| src/MongoDB.Driver/Core/Operations/ListIndexesOperation.cs | Switches to direct RetryableReadContext construction. |
| src/MongoDB.Driver/Core/Operations/ListCollectionsOperation.cs | Switches to direct RetryableReadContext construction. |
| src/MongoDB.Driver/Core/Operations/ICommandCreator.cs | Introduces interface for creating commands dynamically from session/connection info. |
| src/MongoDB.Driver/Core/Operations/FindOperation.cs | Implements ICommandCreator; passes creator into ReadCommandOperation. |
| src/MongoDB.Driver/Core/Operations/EstimatedDocumentCountOperation.cs | Switches to direct RetryableReadContext construction; updates BeginOperation overload usage. |
| src/MongoDB.Driver/Core/Operations/DistinctOperation.cs | Implements ICommandCreator; passes creator into ReadCommandOperation; updates BeginOperation. |
| src/MongoDB.Driver/Core/Operations/CountOperation.cs | Implements ICommandCreator; passes creator into ReadCommandOperation. |
| src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs | Allows null command to support dynamic command building; adds SetCommand. |
| src/MongoDB.Driver/Core/Operations/ClientBulkWriteOperation.cs | Constructs RetryableWriteContext directly; filters exception handling for acquisition failures. |
| src/MongoDB.Driver/Core/Operations/ChangeStreamOperation.cs | Switches to direct RetryableReadContext construction (aggregate operation will acquire channels). |
| src/MongoDB.Driver/Core/Operations/BulkUnmixedWriteOperationBase.cs | Constructs RetryableWriteContext directly; makes final result creation tolerant of null channel. |
| src/MongoDB.Driver/Core/Operations/BulkMixedWriteOperation.cs | Constructs RetryableWriteContext directly. |
| src/MongoDB.Driver/Core/Operations/AggregateOperation.cs | Implements ICommandCreator and uses dynamic ReadCommandOperation construction. |
| src/MongoDB.Driver/Core/Misc/BatchableSource.cs | Adds internal ctor to preserve processedCount; adjusts public ctor delegation/validation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| else | ||
| { | ||
| documents = new BatchableSource<TDocument>(_documents.Items, _documents.Offset, _documents.ProcessedCount, canBeSplit: false); | ||
| documents = new BatchableSource<TDocument>(_documents.Items, _documents.Offset, _documents.Count, _documents.ProcessedCount, canBeSplit: false); |
There was a problem hiding this comment.
On retries (attempt > 1), the batch should resend exactly the documents that were sent in the first attempt (i.e., ProcessedCount), and must not be split. Using _documents.Count here can resend additional documents (or produce an oversized message since canBeSplit: false), and can change the retried command payload for the same txnNumber. Create the retry batch with count equal to the first attempt's processed count and processedCount reset to 0.
| documents = new BatchableSource<TDocument>(_documents.Items, _documents.Offset, _documents.Count, _documents.ProcessedCount, canBeSplit: false); | |
| documents = new BatchableSource<TDocument>(_documents.Items, _documents.Offset, _documents.ProcessedCount, 0, canBeSplit: false); |
|
|
||
| attempt++; | ||
| deprioritizedServers ??= []; | ||
| deprioritizedServers.Add(context.LastAcquiredServer); |
There was a problem hiding this comment.
deprioritizedServers.Add(context.LastAcquiredServer) can add null (e.g., when channel acquisition fails before LastAcquiredServer is set). Passing a collection containing null into server selectors can trigger null-reference during endpoint projection. Only add to deprioritizedServers when the server is non-null.
| deprioritizedServers.Add(context.LastAcquiredServer); | |
| var lastAcquiredServer = context.LastAcquiredServer; | |
| if (lastAcquiredServer != null) | |
| { | |
| deprioritizedServers.Add(lastAcquiredServer); | |
| } |
There was a problem hiding this comment.
If channel acquisition fails before LastAcquiredServer is set, then we'll get a non-retryable exception, so the method will throw and we'll never reach here, but maybe it's a good idea to add this.
| deprioritizedServers ??= []; | ||
| deprioritizedServers.Add(context.LastAcquiredServer); | ||
| } |
There was a problem hiding this comment.
Same issue in async path: deprioritizedServers.Add(context.LastAcquiredServer) can add null when acquisition fails before LastAcquiredServer is set, and passing a collection with null into server selectors can cause null-reference during endpoint projection. Only add when non-null.
| deprioritizedServers ??= []; | ||
| deprioritizedServers.Add(context.LastAcquiredServer); | ||
| } |
There was a problem hiding this comment.
deprioritizedServers.Add(context.LastAcquiredServer) can add null when channel acquisition fails before LastAcquiredServer is set. This will later flow into DeprioritizedServersServerSelector, which enumerates the collection and dereferences each ServerDescription, causing a null-reference. Guard against null (only add when non-null).
| deprioritizedServers ??= []; | ||
| deprioritizedServers.Add(context.LastAcquiredServer); | ||
| } |
There was a problem hiding this comment.
deprioritizedServers.Add(context.LastAcquiredServer) can add null (e.g., when channel acquisition fails before LastAcquiredServer is set). Passing a collection containing null into server selectors can trigger null-reference during endpoint projection. Only add to deprioritizedServers when the server is non-null.
| { | ||
| attempt++; | ||
| } | ||
| _lastAcquiredServer = ChannelSource.ServerDescription; |
There was a problem hiding this comment.
Do we really need to store the last server? As far as I understood we need it only for servers deprioritization, but having it stored could lead to the situation when _lastAcquiredServer and Channel is out of sync. Even this version of code: If we run into an exception while the connection check out (line 66) - we will have no Channel assigned, but _lastAcquiredServer will have the just selected server, what is probably wrong - because we haven't try any operation against the server yet.
There was a problem hiding this comment.
I think that maybe the more correct name would be lastSelectedServer, I agree.
We use it for server deprioritization and also for checking if the server allows retries, so we should have it even if we get an exception during connection establishment / channel acquisition on line 66.
Another possibility would be to return a specific POCO from this method, that contains both the exception and the selected server. I tried with an out parameter when I was working with this, but that won't work with the async version.
| } | ||
| catch | ||
| { | ||
| _errorDuringLastChannelAcquisition = true; |
There was a problem hiding this comment.
Why do we need this variable/property?
There was a problem hiding this comment.
If we have an error during channel acquisition then we need to check the retryability according to the retryable reads spec, and this variable is used for that check.
With this variable we can keep one try-catch in RetryableWriteOperationExecutor.Execute, without the need to have multiple ones.
This is also used by ClientBulkWriteOperation, and there is a comment about I doubt I have regarding this.
866f8a9 to
0fc39ef
Compare
| } | ||
|
|
||
| public void AcquireOrReplaceChannel(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers) | ||
| public ServerDescription DoServerSelection(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers) |
There was a problem hiding this comment.
The naming is temporary, I'm not sure of how we want to call these two methods. If we want to go with our current internal naming the two new methods should be:
AcquireChannelSource and AcquireChannel.
| } | ||
|
|
||
| private EventContext.OperationNameDisposer BeginOperation() => EventContext.BeginOperation(OperationName); | ||
| private EventContext.OperationIdDisposer BeginOperation() => EventContext.BeginOperation(null, OperationName); |
There was a problem hiding this comment.
This is something strange. If we use this overloads, then the name of the command gets set properly.
This happens because before this PR, we were connecting to the server when creating the RetryableContext, and at that time the command name was set properly. Now this is done inside the retryable operation executor, and by that time (for example) ReadCommandOperation.Execute is called and that contains BeginOperation(null, null), that puts the command name to null. This means that the command name in ClusterSelectingServerEvent is null.
We need to understand if this is the way we want to go, and so do it for other read operations as well.
No description provided.