Skip to content

Commit 3659832

Browse files
committed
CSHARP-3397: Support $merge and $out executing on secondaries.
1 parent 95f8358 commit 3659832

32 files changed

+2104
-698
lines changed

src/MongoDB.Driver.Core/Core/Bindings/ChannelReadWriteBinding.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
using System;
1717
using System.Threading;
1818
using System.Threading.Tasks;
19-
using MongoDB.Driver.Core.Clusters;
20-
using MongoDB.Driver.Core.Connections;
2119
using MongoDB.Driver.Core.Misc;
2220
using MongoDB.Driver.Core.Servers;
2321

@@ -94,13 +92,25 @@ public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellation
9492
return GetChannelSourceHelper();
9593
}
9694

95+
/// <inheritdoc/>
96+
public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
97+
{
98+
return GetWriteChannelSource(cancellationToken); // ignore mayUseSecondary
99+
}
100+
97101
/// <inheritdoc/>
98102
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken)
99103
{
100104
ThrowIfDisposed();
101105
return Task.FromResult(GetChannelSourceHelper());
102106
}
103107

108+
/// <inheritdoc/>
109+
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
110+
{
111+
return GetWriteChannelSourceAsync(cancellationToken); // ignore mayUseSecondary
112+
}
113+
104114
// private methods
105115
private IChannelSourceHandle GetChannelSourceHelper()
106116
{

src/MongoDB.Driver.Core/Core/Bindings/ChannelSourceReadWriteBinding.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
using System;
1717
using System.Threading;
1818
using System.Threading.Tasks;
19-
using MongoDB.Driver.Core.Clusters;
2019
using MongoDB.Driver.Core.Misc;
2120

2221
namespace MongoDB.Driver.Core.Bindings
@@ -81,13 +80,25 @@ public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellation
8180
return GetChannelSourceHelper();
8281
}
8382

83+
/// <inheritdoc/>
84+
public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
85+
{
86+
return GetWriteChannelSource(cancellationToken); // ignore mayUseSecondary
87+
}
88+
8489
/// <inheritdoc/>
8590
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken)
8691
{
8792
ThrowIfDisposed();
8893
return Task.FromResult(GetChannelSourceHelper());
8994
}
9095

96+
/// <inheritdoc/>
97+
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
98+
{
99+
return GetWriteChannelSourceAsync(cancellationToken); // ignore mayUseSecondary
100+
}
101+
91102
/// <inheritdoc/>
92103
public void Dispose()
93104
{

src/MongoDB.Driver.Core/Core/Bindings/IBinding.cs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
using System;
1717
using System.Threading;
1818
using System.Threading.Tasks;
19-
using MongoDB.Driver.Core.Clusters;
19+
using MongoDB.Driver.Core.Servers;
2020

2121
namespace MongoDB.Driver.Core.Bindings
2222
{
@@ -75,12 +75,28 @@ public interface IWriteBinding : IBinding
7575
/// <returns>A channel source.</returns>
7676
IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken);
7777

78+
/// <summary>
79+
/// Gets a channel source for write operations that may use a secondary.
80+
/// </summary>
81+
/// <param name="mayUseSecondary">The may use secondary criteria.</param>
82+
/// <param name="cancellationToken">The cancellation token.</param>
83+
/// <returns>A channel source.</returns>
84+
IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken);
85+
7886
/// <summary>
7987
/// Gets a channel source for write operations.
8088
/// </summary>
8189
/// <param name="cancellationToken">The cancellation token.</param>
8290
/// <returns>A channel source.</returns>
8391
Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken);
92+
93+
/// <summary>
94+
/// Gets a channel source for write operations that may use a secondary.
95+
/// </summary>
96+
/// <param name="mayUseSecondary">The may use secondary criteria.</param>
97+
/// <param name="cancellationToken">The cancellation token.</param>
98+
/// <returns>A channel source.</returns>
99+
Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken);
84100
}
85101

86102
/// <summary>
@@ -125,4 +141,27 @@ public interface IReadWriteBindingHandle : IReadWriteBinding, IReadBindingHandle
125141
/// <returns>A read-write binding handle.</returns>
126142
new IReadWriteBindingHandle Fork();
127143
}
144+
145+
/// <summary>
146+
/// Represents the criteria for using a secondary for operations that may use a secondary.
147+
/// </summary>
148+
public interface IMayUseSecondaryCriteria
149+
{
150+
/// <summary>
151+
/// The effective read preference (initially the same as ReadPreference but possibly altered by the server selector).
152+
/// </summary>
153+
ReadPreference EffectiveReadPreference { get; set; }
154+
155+
/// <summary>
156+
/// The read preference.
157+
/// </summary>
158+
ReadPreference ReadPreference { get; }
159+
160+
/// <summary>
161+
/// Whether a particular secondary can be used.
162+
/// </summary>
163+
/// <param name="server">The server.</param>
164+
/// <returns>True if the server can be used.</returns>
165+
bool CanUseSecondary(ServerDescription server);
166+
}
128167
}

src/MongoDB.Driver.Core/Core/Bindings/ReadWriteBindingHandle.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
using System.Threading.Tasks;
2222
using MongoDB.Driver.Core.Clusters;
2323
using MongoDB.Driver.Core.Misc;
24+
using MongoDB.Driver.Core.Operations;
2425

2526
namespace MongoDB.Driver.Core.Bindings
2627
{
@@ -83,13 +84,27 @@ public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellation
8384
return _reference.Instance.GetWriteChannelSource(cancellationToken);
8485
}
8586

87+
/// <inheritdoc/>
88+
public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
89+
{
90+
ThrowIfDisposed();
91+
return _reference.Instance.GetWriteChannelSource(mayUseSecondary, cancellationToken);
92+
}
93+
8694
/// <inheritdoc/>
8795
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken)
8896
{
8997
ThrowIfDisposed();
9098
return _reference.Instance.GetWriteChannelSourceAsync(cancellationToken);
9199
}
92100

101+
/// <inheritdoc/>
102+
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
103+
{
104+
ThrowIfDisposed();
105+
return _reference.Instance.GetWriteChannelSourceAsync(mayUseSecondary, cancellationToken);
106+
}
107+
93108
/// <inheritdoc/>
94109
public void Dispose()
95110
{

src/MongoDB.Driver.Core/Core/Bindings/SingleServerReadWriteBinding.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,25 @@ public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellation
8888
return GetChannelSourceHelper();
8989
}
9090

91+
/// <inheritdoc/>
92+
public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
93+
{
94+
return GetWriteChannelSource(cancellationToken); // ignore mayUseSecondary
95+
}
96+
9197
/// <inheritdoc/>
9298
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken)
9399
{
94100
ThrowIfDisposed();
95101
return Task.FromResult(GetChannelSourceHelper());
96102
}
97103

104+
/// <inheritdoc/>
105+
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
106+
{
107+
return GetWriteChannelSourceAsync(cancellationToken); // ignore mayUseSecondary
108+
}
109+
98110
private IChannelSourceHandle GetChannelSourceHelper()
99111
{
100112
return new ChannelSourceHandle(new ServerChannelSource(_server, _session.Fork()));

src/MongoDB.Driver.Core/Core/Bindings/SplitReadWriteBinding.cs

Lines changed: 0 additions & 124 deletions
This file was deleted.

src/MongoDB.Driver.Core/Core/Bindings/WritableServerBinding.cs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,34 +65,60 @@ public IChannelSourceHandle GetReadChannelSource(CancellationToken cancellationT
6565
ThrowIfDisposed();
6666
var server = _cluster.SelectServerAndPinIfNeeded(_session, WritableServerSelector.Instance, cancellationToken);
6767

68-
return GetChannelSourceHelper(server);
68+
return CreateServerChannelSource(server);
6969
}
7070

7171
/// <inheritdoc/>
7272
public async Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken cancellationToken)
7373
{
7474
ThrowIfDisposed();
7575
var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, WritableServerSelector.Instance, cancellationToken).ConfigureAwait(false);
76-
return GetChannelSourceHelper(server);
76+
return CreateServerChannelSource(server);
7777
}
7878

7979
/// <inheritdoc/>
8080
public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken)
8181
{
8282
ThrowIfDisposed();
8383
var server = _cluster.SelectServerAndPinIfNeeded(_session, WritableServerSelector.Instance, cancellationToken);
84-
return GetChannelSourceHelper(server);
84+
return CreateServerChannelSource(server);
85+
}
86+
87+
/// <inheritdoc/>
88+
public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
89+
{
90+
if (IsSessionPinnedToServer())
91+
{
92+
throw new InvalidOperationException($"This overload of {nameof(GetWriteChannelSource)} cannot be called when pinned to a server.");
93+
}
94+
95+
var selector = new WritableServerSelector(mayUseSecondary);
96+
var server = _cluster.SelectServer(selector, cancellationToken);
97+
return CreateServerChannelSource(server);
8598
}
8699

87100
/// <inheritdoc/>
88101
public async Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken)
89102
{
90103
ThrowIfDisposed();
91104
var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, WritableServerSelector.Instance, cancellationToken).ConfigureAwait(false);
92-
return GetChannelSourceHelper(server);
105+
return CreateServerChannelSource(server);
93106
}
94107

95-
private IChannelSourceHandle GetChannelSourceHelper(IServer server)
108+
/// <inheritdoc/>
109+
public async Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
110+
{
111+
if (IsSessionPinnedToServer())
112+
{
113+
throw new InvalidOperationException($"This overload of {nameof(GetWriteChannelSource)} cannot be called when pinned to a server.");
114+
}
115+
116+
var selector = new WritableServerSelector(mayUseSecondary);
117+
var server = await _cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false);
118+
return CreateServerChannelSource(server);
119+
}
120+
121+
private IChannelSourceHandle CreateServerChannelSource(IServer server)
96122
{
97123
return new ChannelSourceHandle(new ServerChannelSource(server, _session.Fork()));
98124
}
@@ -107,6 +133,11 @@ public void Dispose()
107133
}
108134
}
109135

136+
private bool IsSessionPinnedToServer()
137+
{
138+
return _session.IsInTransaction && _session.CurrentTransaction.PinnedServer != null;
139+
}
140+
110141
private void ThrowIfDisposed()
111142
{
112143
if (_disposed)

0 commit comments

Comments
 (0)