Skip to content

Commit 6e6db71

Browse files
committed
more post-merge fixups
1 parent e7800ab commit 6e6db71

File tree

12 files changed

+120
-7
lines changed

12 files changed

+120
-7
lines changed

src/RESPite.StackExchange.Redis/RespContextServer.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ public Task<RedisResult> ExecuteAsync(
159159
ICollection<object> args,
160160
CommandFlags flags = CommandFlags.None) => throw new NotImplementedException();
161161

162+
public RedisResult Execute(int? database, string command, ICollection<object> args, CommandFlags flags = CommandFlags.None) =>
163+
throw new NotImplementedException();
164+
165+
public Task<RedisResult> ExecuteAsync(int? database, string command, ICollection<object> args, CommandFlags flags = CommandFlags.None)
166+
=> throw new NotImplementedException();
167+
162168
public void FlushAllDatabases(CommandFlags flags = CommandFlags.None) => throw new NotImplementedException();
163169

164170
public Task FlushAllDatabasesAsync(CommandFlags flags = CommandFlags.None) => throw new NotImplementedException();

src/RESPite.StackExchange.Redis/RespMultiplexer.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,16 @@ public IServer GetServer(EndPoint endpoint, object? asyncState = null)
254254
return GetServer(_connectionManager.GetNode(host, port), asyncState);
255255
}
256256

257+
public IServer GetServer(RedisKey key, object? asyncState = null, CommandFlags flags = CommandFlags.None)
258+
{
259+
if (key.IsNull) // just get anything
260+
{
261+
var node = _connectionManager.GetRandomNode();
262+
if (node is not null) return GetServer(node, asyncState);
263+
}
264+
throw new NotImplementedException();
265+
}
266+
257267
private IServer GetServer(Node node, object? asyncState)
258268
{
259269
if (asyncState is not null) ThrowNotSupported();

src/RESPite/Connections/RespConnectionManager.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Buffers;
22
using System.Diagnostics.CodeAnalysis;
3+
using System.Net;
34
using RESPite.Connections.Internal;
45
using RESPite.Internal;
56

@@ -99,6 +100,7 @@ internal Task ConnectAsync(RespConfiguration options, ReadOnlySpan<EndpointPair>
99100
{
100101
pending[i] = snapshot[i].ConnectAsync(log);
101102
}
103+
102104
return ConnectAsyncAwaited(pending, log, snapshot.Length);
103105
}
104106

@@ -230,4 +232,22 @@ internal Node GetNode(string host, int port)
230232
internal Node GetNode(string hostAndPort) => ConnectionFactory.TryParse(hostAndPort, out var host, out var port)
231233
? GetNode(host, port)
232234
: throw new ArgumentException($"Could not parse host and port from '{hostAndPort}'", nameof(hostAndPort));
235+
236+
internal Node? GetRandomNode()
237+
{
238+
var nodes = _nodes;
239+
if (nodes is { Length: > 0 })
240+
{
241+
var index = SharedRandom.Next(nodes.Length);
242+
return nodes[index];
243+
}
244+
245+
return null;
246+
}
247+
248+
#if NET5_0_OR_GREATER
249+
private static Random SharedRandom => Random.Shared;
250+
#else
251+
private static Random SharedRandom { get; } = new();
252+
#endif
233253
}

src/StackExchange.Redis/ConnectionMultiplexer.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ internal async Task MakePrimaryAsync(ServerEndPoint server, ReplicationChangeOpt
210210
{
211211
throw ExceptionFactory.AdminModeNotEnabled(RawConfig.IncludeDetailInExceptions, cmd, null, server);
212212
}
213-
var srv = new RedisServer(this, server, null);
213+
var srv = server.GetRedisServer(null);
214214
if (!srv.IsConnected)
215215
{
216216
throw ExceptionFactory.NoConnectionAvailable(this, null, server, GetServerSnapshot(), command: cmd);
@@ -1229,7 +1229,21 @@ public IServer GetServer(EndPoint? endpoint, object? asyncState = null)
12291229
throw new NotSupportedException($"The server API is not available via {RawConfig.Proxy}");
12301230
}
12311231
var server = servers[endpoint] as ServerEndPoint ?? throw new ArgumentException("The specified endpoint is not defined", nameof(endpoint));
1232-
return new RedisServer(this, server, asyncState);
1232+
return new RedisServer(server, asyncState);
1233+
}
1234+
1235+
/// <inheritdoc cref="IConnectionMultiplexer.GetServer(RedisKey, object, CommandFlags)"/>
1236+
#pragma warning disable RS0026
1237+
public IServer GetServer(RedisKey key, object? asyncState = null, CommandFlags flags = CommandFlags.None)
1238+
#pragma warning restore RS0026
1239+
{
1240+
// We'll spoof the GET command for this; we're not supporting ad-hoc access to the pub/sub channel, because: bad things.
1241+
// Any read-only-replica vs writable-primary concerns should be managed by the caller via "flags"; the default is PreferPrimary.
1242+
// Note that ServerSelectionStrategy treats "null" (default) keys as NoSlot, aka Any.
1243+
return (SelectServer(RedisCommand.GET, flags, key) ?? Throw()).GetRedisServer(asyncState);
1244+
1245+
[DoesNotReturn]
1246+
static ServerEndPoint Throw() => throw new InvalidOperationException("It was not possible to resolve a connection to the server owning the specified key");
12331247
}
12341248

12351249
/// <summary>
@@ -1241,7 +1255,7 @@ public IServer[] GetServers()
12411255
var result = new IServer[snapshot.Length];
12421256
for (var i = 0; i < snapshot.Length; i++)
12431257
{
1244-
result[i] = new RedisServer(this, snapshot[i], null);
1258+
result[i] = snapshot[i].GetRedisServer(null);
12451259
}
12461260
return result;
12471261
}

src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,19 @@ public interface IConnectionMultiplexer : IDisposable, IAsyncDisposable
213213
/// <param name="asyncState">The async state to pass to the created <see cref="IServer"/>.</param>
214214
IServer GetServer(EndPoint endpoint, object? asyncState = null);
215215

216+
/// <summary>
217+
/// Gets a server that would be used for a given key and flags.
218+
/// </summary>
219+
/// <param name="key">The endpoint to get a server for. In a non-cluster environment, this parameter is ignored. A <see langword="default"/> key may be specified
220+
/// on cluster, which will return a connection to an arbitrary server matching the specified flags.</param>
221+
/// <param name="asyncState">The async state to pass to the created <see cref="IServer"/>.</param>
222+
/// <param name="flags">The command flags to use.</param>
223+
/// <remarks>This method is particularly useful when communicating with a cluster environment, to obtain a connection to the server that owns the specified key
224+
/// and ad-hoc commands with unusual routing requirements. Note that <see cref="GetDatabase"/> provides a connection that automatically routes commands by
225+
/// looking for <see cref="RedisKey"/> parameters, so this method is only necessary when used with commands that do not take a <see cref="RedisKey"/> parameter,
226+
/// but require consistent routing using key-like semantics.</remarks>
227+
IServer GetServer(RedisKey key, object? asyncState = null, CommandFlags flags = CommandFlags.None);
228+
216229
/// <summary>
217230
/// Obtain configuration APIs for all servers in this multiplexer.
218231
/// </summary>

src/StackExchange.Redis/Interfaces/IServer.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,13 @@ public partial interface IServer : IRedis
252252
/// <inheritdoc cref="Echo(RedisValue, CommandFlags)"/>
253253
Task<RedisValue> EchoAsync(RedisValue message, CommandFlags flags = CommandFlags.None);
254254

255+
#pragma warning disable RS0026, RS0027 // multiple overloads
255256
/// <summary>
256257
/// Execute an arbitrary command against the server; this is primarily intended for
257258
/// executing modules, but may also be used to provide access to new features that lack
258-
/// a direct API.
259+
/// a direct API. The command is assumed to be not database-specific. If this is not the case,
260+
/// <see cref="Execute(int?, string, ICollection{object}, CommandFlags)"/> should be used to
261+
/// specify the database (using <langword>null</langword> to use the configured default database).
259262
/// </summary>
260263
/// <param name="command">The command to run.</param>
261264
/// <param name="args">The arguments to pass for the command.</param>
@@ -280,6 +283,23 @@ public partial interface IServer : IRedis
280283

281284
/// <inheritdoc cref="Execute(string, ICollection{object}, CommandFlags)"/>
282285
Task<RedisResult> ExecuteAsync(string command, ICollection<object> args, CommandFlags flags = CommandFlags.None);
286+
#pragma warning restore RS0026, RS0027
287+
288+
/// <summary>
289+
/// Execute an arbitrary database-specific command against the server; this is primarily intended for
290+
/// executing modules, but may also be used to provide access to new features that lack
291+
/// a direct API.
292+
/// </summary>
293+
/// <param name="database">The database ID; if <see langword="null"/>, the configured default database is used.</param>
294+
/// <param name="command">The command to run.</param>
295+
/// <param name="args">The arguments to pass for the command.</param>
296+
/// <param name="flags">The flags to use for this operation.</param>
297+
/// <returns>A dynamic representation of the command's result.</returns>
298+
/// <remarks>This API should be considered an advanced feature; inappropriate use can be harmful.</remarks>
299+
RedisResult Execute(int? database, string command, ICollection<object> args, CommandFlags flags = CommandFlags.None);
300+
301+
/// <inheritdoc cref="Execute(int?, string, ICollection{object}, CommandFlags)"/>
302+
Task<RedisResult> ExecuteAsync(int? database, string command, ICollection<object> args, CommandFlags flags = CommandFlags.None);
283303

284304
/// <summary>
285305
/// Delete all the keys of all databases on the server.

src/StackExchange.Redis/RedisServer.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ internal sealed class RedisServer : RedisBase, IServer
1616
{
1717
private readonly ServerEndPoint server;
1818

19-
internal RedisServer(ConnectionMultiplexer multiplexer, ServerEndPoint server, object? asyncState) : base(multiplexer, asyncState)
19+
internal RedisServer(ServerEndPoint server, object? asyncState) : base(server.Multiplexer, asyncState)
2020
{
21-
this.server = server ?? throw new ArgumentNullException(nameof(server));
21+
this.server = server; // definitely can't be null because .Multiplexer in base call
2222
}
2323

2424
int IServer.DatabaseCount => server.Databases;
@@ -1045,6 +1045,20 @@ public Task<RedisResult> ExecuteAsync(string command, ICollection<object> args,
10451045
return ExecuteAsync(msg, ResultProcessor.ScriptResult, defaultValue: RedisResult.NullSingle);
10461046
}
10471047

1048+
public RedisResult Execute(int? database, string command, ICollection<object> args, CommandFlags flags = CommandFlags.None)
1049+
{
1050+
var db = multiplexer.ApplyDefaultDatabase(database ?? -1);
1051+
var msg = new RedisDatabase.ExecuteMessage(multiplexer?.CommandMap, db, flags, command, args);
1052+
return ExecuteSync(msg, ResultProcessor.ScriptResult, defaultValue: RedisResult.NullSingle);
1053+
}
1054+
1055+
public Task<RedisResult> ExecuteAsync(int? database, string command, ICollection<object> args, CommandFlags flags = CommandFlags.None)
1056+
{
1057+
var db = multiplexer.ApplyDefaultDatabase(database ?? -1);
1058+
var msg = new RedisDatabase.ExecuteMessage(multiplexer?.CommandMap, db, flags, command, args);
1059+
return ExecuteAsync(msg, ResultProcessor.ScriptResult, defaultValue: RedisResult.NullSingle);
1060+
}
1061+
10481062
/// <summary>
10491063
/// For testing only.
10501064
/// </summary>

src/StackExchange.Redis/ServerEndPoint.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
7171
}
7272
}
7373

74+
private RedisServer? _defaultServer;
75+
public RedisServer GetRedisServer(object? asyncState)
76+
=> asyncState is null
77+
? (_defaultServer ??= new RedisServer(this, null)) // reuse and memoize
78+
: new RedisServer(this, asyncState);
79+
7480
public EndPoint EndPoint { get; }
7581

7682
public ClusterConfiguration? ClusterConfiguration { get; private set; }

tests/BasicTest/RedisBenchmarks.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ public void StringGet()
191191
db.StringGet(StringKey_K);
192192
}
193193
}
194+
194195
#if !TEST_BASELINE
195196
/// <summary>
196197
/// Run StringSet lots of times.

tests/RESPite.Tests/ConnectionFixture.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ IServer IConnectionMultiplexer.GetServer(string hostAndPort, object? asyncState)
163163
IServer IConnectionMultiplexer.GetServer(EndPoint endpoint, object? asyncState) =>
164164
throw new NotImplementedException();
165165

166+
public IServer GetServer(RedisKey key, object? asyncState = null, CommandFlags flags = CommandFlags.None)
167+
=> throw new NotImplementedException();
168+
166169
IServer[] IConnectionMultiplexer.GetServers() => throw new NotImplementedException();
167170

168171
Task<bool> IConnectionMultiplexer.ConfigureAsync(TextWriter? log) => throw new NotImplementedException();

0 commit comments

Comments
 (0)