diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 3a50f1ef3..ce759ab42 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -13,6 +13,7 @@ Current package versions: - Add `Condition.SortedSet[Not]ContainsStarting` condition for transactions ([#2638 by ArnoKoll](https://github.com/StackExchange/StackExchange.Redis/pull/2638)) - Add support for XPENDING Idle time filter ([#2822 by david-brink-talogy](https://github.com/StackExchange/StackExchange.Redis/pull/2822)) - Improve `double` formatting performance on net8+ ([#2928 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2928)) +- Add `GetServer(RedisKey, ...)` API ([#2936 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2936)) ## 2.8.58 diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index bf1b75e25..bf6b66674 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -210,7 +210,7 @@ internal async Task MakePrimaryAsync(ServerEndPoint server, ReplicationChangeOpt { throw ExceptionFactory.AdminModeNotEnabled(RawConfig.IncludeDetailInExceptions, cmd, null, server); } - var srv = new RedisServer(this, server, null); + var srv = server.GetRedisServer(null); if (!srv.IsConnected) { throw ExceptionFactory.NoConnectionAvailable(this, null, server, GetServerSnapshot(), command: cmd); @@ -1229,7 +1229,21 @@ public IServer GetServer(EndPoint? endpoint, object? asyncState = null) throw new NotSupportedException($"The server API is not available via {RawConfig.Proxy}"); } var server = servers[endpoint] as ServerEndPoint ?? throw new ArgumentException("The specified endpoint is not defined", nameof(endpoint)); - return new RedisServer(this, server, asyncState); + return server.GetRedisServer(asyncState); + } + + /// +#pragma warning disable RS0026 + public IServer GetServer(RedisKey key, object? asyncState = null, CommandFlags flags = CommandFlags.None) +#pragma warning restore RS0026 + { + // We'll spoof the GET command for this; we're not supporting ad-hoc access to the pub/sub channel, because: bad things. + // Any read-only-replica vs writable-primary concerns should be managed by the caller via "flags"; the default is PreferPrimary. + // Note that ServerSelectionStrategy treats "null" (default) keys as NoSlot, aka Any. + return (SelectServer(RedisCommand.GET, flags, key) ?? Throw()).GetRedisServer(asyncState); + + [DoesNotReturn] + static ServerEndPoint Throw() => throw new InvalidOperationException("It was not possible to resolve a connection to the server owning the specified key"); } /// @@ -1241,7 +1255,7 @@ public IServer[] GetServers() var result = new IServer[snapshot.Length]; for (var i = 0; i < snapshot.Length; i++) { - result[i] = new RedisServer(this, snapshot[i], null); + result[i] = snapshot[i].GetRedisServer(null); } return result; } diff --git a/src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs b/src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs index b4bdb0950..96b4ce8f6 100644 --- a/src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs @@ -8,299 +8,311 @@ using StackExchange.Redis.Profiling; using static StackExchange.Redis.ConnectionMultiplexer; -namespace StackExchange.Redis +namespace StackExchange.Redis; + +internal interface IInternalConnectionMultiplexer : IConnectionMultiplexer { - internal interface IInternalConnectionMultiplexer : IConnectionMultiplexer - { - bool AllowConnect { get; set; } - - bool IgnoreConnect { get; set; } - - ReadOnlySpan GetServerSnapshot(); - ServerEndPoint GetServerEndPoint(EndPoint endpoint); - - ConfigurationOptions RawConfig { get; } - - long? GetConnectionId(EndPoint endPoint, ConnectionType type); - - ServerSelectionStrategy ServerSelectionStrategy { get; } - - int GetSubscriptionsCount(); - ConcurrentDictionary GetSubscriptions(); - - ConnectionMultiplexer UnderlyingMultiplexer { get; } - } - - /// - /// Represents the abstract multiplexer API. - /// - public interface IConnectionMultiplexer : IDisposable, IAsyncDisposable - { - /// - /// Gets the client-name that will be used on all new connections. - /// - string ClientName { get; } - - /// - /// Gets the configuration of the connection. - /// - string Configuration { get; } - - /// - /// Gets the timeout associated with the connections. - /// - int TimeoutMilliseconds { get; } - - /// - /// The number of operations that have been performed on all connections. - /// - long OperationCount { get; } - - /// - /// Gets or sets whether asynchronous operations should be invoked in a way that guarantees their original delivery order. - /// - [Obsolete("Not supported; if you require ordered pub/sub, please see " + nameof(ChannelMessageQueue), false)] - [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] - bool PreserveAsyncOrder { get; set; } - - /// - /// Indicates whether any servers are connected. - /// - bool IsConnected { get; } - - /// - /// Indicates whether any servers are connecting. - /// - bool IsConnecting { get; } - - /// - /// Should exceptions include identifiable details? (key names, additional annotations). - /// - [Obsolete($"Please use {nameof(ConfigurationOptions)}.{nameof(ConfigurationOptions.IncludeDetailInExceptions)} instead - this will be removed in 3.0.")] - [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] - bool IncludeDetailInExceptions { get; set; } - - /// - /// Limit at which to start recording unusual busy patterns (only one log will be retained at a time. - /// Set to a negative value to disable this feature). - /// - int StormLogThreshold { get; set; } - - /// - /// Register a callback to provide an on-demand ambient session provider based on the calling context. - /// The implementing code is responsible for reliably resolving the same provider - /// based on ambient context, or returning null to not profile. - /// - /// The profiling session provider. - void RegisterProfiler(Func profilingSessionProvider); - - /// - /// Get summary statistics associates with this server. - /// - ServerCounters GetCounters(); - - /// - /// A server replied with an error message. - /// - event EventHandler ErrorMessage; - - /// - /// Raised whenever a physical connection fails. - /// - event EventHandler ConnectionFailed; - - /// - /// Raised whenever an internal error occurs (this is primarily for debugging). - /// - event EventHandler InternalError; - - /// - /// Raised whenever a physical connection is established. - /// - event EventHandler ConnectionRestored; - - /// - /// Raised when configuration changes are detected. - /// - event EventHandler ConfigurationChanged; - - /// - /// Raised when nodes are explicitly requested to reconfigure via broadcast. - /// This usually means primary/replica changes. - /// - event EventHandler ConfigurationChangedBroadcast; - - /// - /// Raised when server indicates a maintenance event is going to happen. - /// - event EventHandler ServerMaintenanceEvent; - - /// - /// Gets all endpoints defined on the multiplexer. - /// - /// Whether to return only the explicitly configured endpoints. - EndPoint[] GetEndPoints(bool configuredOnly = false); - - /// - /// Wait for a given asynchronous operation to complete (or timeout). - /// - /// The task to wait on. - void Wait(Task task); - - /// - /// Wait for a given asynchronous operation to complete (or timeout). - /// - /// The type in . - /// The task to wait on. - T Wait(Task task); - - /// - /// Wait for the given asynchronous operations to complete (or timeout). - /// - /// The tasks to wait on. - void WaitAll(params Task[] tasks); - - /// - /// Raised when a hash-slot has been relocated. - /// - event EventHandler HashSlotMoved; - - /// - /// Compute the hash-slot of a specified key. - /// - /// The key to get a slot ID for. - int HashSlot(RedisKey key); - - /// - /// Obtain a pub/sub subscriber connection to the specified server. - /// - /// The async state to pass to the created . - ISubscriber GetSubscriber(object? asyncState = null); - - /// - /// Obtain an interactive connection to a database inside redis. - /// - /// The database ID to get. - /// The async state to pass to the created . - IDatabase GetDatabase(int db = -1, object? asyncState = null); - - /// - /// Obtain a configuration API for an individual server. - /// - /// The host to get a server for. - /// The specific port for to get a server for. - /// The async state to pass to the created . - IServer GetServer(string host, int port, object? asyncState = null); - - /// - /// Obtain a configuration API for an individual server. - /// - /// The "host:port" string to get a server for. - /// The async state to pass to the created . - IServer GetServer(string hostAndPort, object? asyncState = null); - - /// - /// Obtain a configuration API for an individual server. - /// - /// The host to get a server for. - /// The specific port for to get a server for. - IServer GetServer(IPAddress host, int port); - - /// - /// Obtain a configuration API for an individual server. - /// - /// The endpoint to get a server for. - /// The async state to pass to the created . - IServer GetServer(EndPoint endpoint, object? asyncState = null); - - /// - /// Obtain configuration APIs for all servers in this multiplexer. - /// - IServer[] GetServers(); - - /// - /// Reconfigure the current connections based on the existing configuration. - /// - /// The log to write output to. - Task ConfigureAsync(TextWriter? log = null); - - /// - /// Reconfigure the current connections based on the existing configuration. - /// - /// The log to write output to. - bool Configure(TextWriter? log = null); - - /// - /// Provides a text overview of the status of all connections. - /// - string GetStatus(); - - /// - /// Provides a text overview of the status of all connections. - /// - /// The log to write output to. - void GetStatus(TextWriter log); - - /// - /// See . - /// - string ToString(); - - /// - /// Close all connections and release all resources associated with this object. - /// - /// Whether to allow in-queue commands to complete first. - void Close(bool allowCommandsToComplete = true); - - /// - /// Close all connections and release all resources associated with this object. - /// - /// Whether to allow in-queue commands to complete first. - Task CloseAsync(bool allowCommandsToComplete = true); - - /// - /// Obtains the log of unusual busy patterns. - /// - string? GetStormLog(); - - /// - /// Resets the log of unusual busy patterns. - /// - void ResetStormLog(); - - /// - /// Request all compatible clients to reconfigure or reconnect. - /// - /// The command flags to use. - /// The number of instances known to have received the message (however, the actual number can be higher; returns -1 if the operation is pending). - long PublishReconfigure(CommandFlags flags = CommandFlags.None); - - /// - /// Request all compatible clients to reconfigure or reconnect. - /// - /// The command flags to use. - /// The number of instances known to have received the message (however, the actual number can be higher). - Task PublishReconfigureAsync(CommandFlags flags = CommandFlags.None); - - /// - /// Get the hash-slot associated with a given key, if applicable; this can be useful for grouping operations. - /// - /// The key to get a the slot for. - int GetHashSlot(RedisKey key); - - /// - /// Write the configuration of all servers to an output stream. - /// - /// The destination stream to write the export to. - /// The options to use for this export. - void ExportConfiguration(Stream destination, ExportOptions options = ExportOptions.All); - - /// - /// Append a usage-specific modifier to the advertised library name; suffixes are de-duplicated - /// and sorted alphabetically (so adding 'a', 'b' and 'a' will result in suffix '-a-b'). - /// Connections will be updated as necessary (RESP2 subscription - /// connections will not show updates until those connections next connect). - /// - void AddLibraryNameSuffix(string suffix); - } + bool AllowConnect { get; set; } + + bool IgnoreConnect { get; set; } + + ReadOnlySpan GetServerSnapshot(); + ServerEndPoint GetServerEndPoint(EndPoint endpoint); + + ConfigurationOptions RawConfig { get; } + + long? GetConnectionId(EndPoint endPoint, ConnectionType type); + + ServerSelectionStrategy ServerSelectionStrategy { get; } + + int GetSubscriptionsCount(); + ConcurrentDictionary GetSubscriptions(); + + ConnectionMultiplexer UnderlyingMultiplexer { get; } +} + +/// +/// Represents the abstract multiplexer API. +/// +public interface IConnectionMultiplexer : IDisposable, IAsyncDisposable +{ + /// + /// Gets the client-name that will be used on all new connections. + /// + string ClientName { get; } + + /// + /// Gets the configuration of the connection. + /// + string Configuration { get; } + + /// + /// Gets the timeout associated with the connections. + /// + int TimeoutMilliseconds { get; } + + /// + /// The number of operations that have been performed on all connections. + /// + long OperationCount { get; } + + /// + /// Gets or sets whether asynchronous operations should be invoked in a way that guarantees their original delivery order. + /// + [Obsolete("Not supported; if you require ordered pub/sub, please see " + nameof(ChannelMessageQueue), false)] + [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] + bool PreserveAsyncOrder { get; set; } + + /// + /// Indicates whether any servers are connected. + /// + bool IsConnected { get; } + + /// + /// Indicates whether any servers are connecting. + /// + bool IsConnecting { get; } + + /// + /// Should exceptions include identifiable details? (key names, additional annotations). + /// + [Obsolete($"Please use {nameof(ConfigurationOptions)}.{nameof(ConfigurationOptions.IncludeDetailInExceptions)} instead - this will be removed in 3.0.")] + [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] + bool IncludeDetailInExceptions { get; set; } + + /// + /// Limit at which to start recording unusual busy patterns (only one log will be retained at a time. + /// Set to a negative value to disable this feature). + /// + int StormLogThreshold { get; set; } + + /// + /// Register a callback to provide an on-demand ambient session provider based on the calling context. + /// The implementing code is responsible for reliably resolving the same provider + /// based on ambient context, or returning null to not profile. + /// + /// The profiling session provider. + void RegisterProfiler(Func profilingSessionProvider); + + /// + /// Get summary statistics associates with this server. + /// + ServerCounters GetCounters(); + + /// + /// A server replied with an error message. + /// + event EventHandler ErrorMessage; + + /// + /// Raised whenever a physical connection fails. + /// + event EventHandler ConnectionFailed; + + /// + /// Raised whenever an internal error occurs (this is primarily for debugging). + /// + event EventHandler InternalError; + + /// + /// Raised whenever a physical connection is established. + /// + event EventHandler ConnectionRestored; + + /// + /// Raised when configuration changes are detected. + /// + event EventHandler ConfigurationChanged; + + /// + /// Raised when nodes are explicitly requested to reconfigure via broadcast. + /// This usually means primary/replica changes. + /// + event EventHandler ConfigurationChangedBroadcast; + + /// + /// Raised when server indicates a maintenance event is going to happen. + /// + event EventHandler ServerMaintenanceEvent; + + /// + /// Gets all endpoints defined on the multiplexer. + /// + /// Whether to return only the explicitly configured endpoints. + EndPoint[] GetEndPoints(bool configuredOnly = false); + + /// + /// Wait for a given asynchronous operation to complete (or timeout). + /// + /// The task to wait on. + void Wait(Task task); + + /// + /// Wait for a given asynchronous operation to complete (or timeout). + /// + /// The type in . + /// The task to wait on. + T Wait(Task task); + + /// + /// Wait for the given asynchronous operations to complete (or timeout). + /// + /// The tasks to wait on. + void WaitAll(params Task[] tasks); + + /// + /// Raised when a hash-slot has been relocated. + /// + event EventHandler HashSlotMoved; + + /// + /// Compute the hash-slot of a specified key. + /// + /// The key to get a slot ID for. + int HashSlot(RedisKey key); + + /// + /// Obtain a pub/sub subscriber connection to the specified server. + /// + /// The async state to pass to the created . + ISubscriber GetSubscriber(object? asyncState = null); + + /// + /// Obtain an interactive connection to a database inside redis. + /// + /// The database ID to get. + /// The async state to pass to the created . + IDatabase GetDatabase(int db = -1, object? asyncState = null); + + /// + /// Obtain a configuration API for an individual server. + /// + /// The host to get a server for. + /// The specific port for to get a server for. + /// The async state to pass to the created . + IServer GetServer(string host, int port, object? asyncState = null); + + /// + /// Obtain a configuration API for an individual server. + /// + /// The "host:port" string to get a server for. + /// The async state to pass to the created . + IServer GetServer(string hostAndPort, object? asyncState = null); + + /// + /// Obtain a configuration API for an individual server. + /// + /// The host to get a server for. + /// The specific port for to get a server for. + IServer GetServer(IPAddress host, int port); + + /// + /// Obtain a configuration API for an individual server. + /// + /// The endpoint to get a server for. + /// The async state to pass to the created . + IServer GetServer(EndPoint endpoint, object? asyncState = null); + + /// + /// Gets a server that would be used for a given key and flags. + /// + /// The endpoint to get a server for. In a non-cluster environment, this parameter is ignored. A key may be specified + /// on cluster, which will return a connection to an arbitrary server matching the specified flags. + /// The async state to pass to the created . + /// The command flags to use. + /// This method is particularly useful when communicating with a cluster environment, to obtain a connection to the server that owns the specified key + /// and ad-hoc commands with unusual routing requirements. Note that provides a connection that automatically routes commands by + /// looking for parameters, so this method is only necessary when used with commands that do not take a parameter, + /// but require consistent routing using key-like semantics. + IServer GetServer(RedisKey key, object? asyncState = null, CommandFlags flags = CommandFlags.None); + + /// + /// Obtain configuration APIs for all servers in this multiplexer. + /// + IServer[] GetServers(); + + /// + /// Reconfigure the current connections based on the existing configuration. + /// + /// The log to write output to. + Task ConfigureAsync(TextWriter? log = null); + + /// + /// Reconfigure the current connections based on the existing configuration. + /// + /// The log to write output to. + bool Configure(TextWriter? log = null); + + /// + /// Provides a text overview of the status of all connections. + /// + string GetStatus(); + + /// + /// Provides a text overview of the status of all connections. + /// + /// The log to write output to. + void GetStatus(TextWriter log); + + /// + /// See . + /// + string ToString(); + + /// + /// Close all connections and release all resources associated with this object. + /// + /// Whether to allow in-queue commands to complete first. + void Close(bool allowCommandsToComplete = true); + + /// + /// Close all connections and release all resources associated with this object. + /// + /// Whether to allow in-queue commands to complete first. + Task CloseAsync(bool allowCommandsToComplete = true); + + /// + /// Obtains the log of unusual busy patterns. + /// + string? GetStormLog(); + + /// + /// Resets the log of unusual busy patterns. + /// + void ResetStormLog(); + + /// + /// Request all compatible clients to reconfigure or reconnect. + /// + /// The command flags to use. + /// The number of instances known to have received the message (however, the actual number can be higher; returns -1 if the operation is pending). + long PublishReconfigure(CommandFlags flags = CommandFlags.None); + + /// + /// Request all compatible clients to reconfigure or reconnect. + /// + /// The command flags to use. + /// The number of instances known to have received the message (however, the actual number can be higher). + Task PublishReconfigureAsync(CommandFlags flags = CommandFlags.None); + + /// + /// Get the hash-slot associated with a given key, if applicable; this can be useful for grouping operations. + /// + /// The key to get a the slot for. + int GetHashSlot(RedisKey key); + + /// + /// Write the configuration of all servers to an output stream. + /// + /// The destination stream to write the export to. + /// The options to use for this export. + void ExportConfiguration(Stream destination, ExportOptions options = ExportOptions.All); + + /// + /// Append a usage-specific modifier to the advertised library name; suffixes are de-duplicated + /// and sorted alphabetically (so adding 'a', 'b' and 'a' will result in suffix '-a-b'). + /// Connections will be updated as necessary (RESP2 subscription + /// connections will not show updates until those connections next connect). + /// + void AddLibraryNameSuffix(string suffix); } diff --git a/src/StackExchange.Redis/Interfaces/IServer.cs b/src/StackExchange.Redis/Interfaces/IServer.cs index 4971c7f18..fd090514d 100644 --- a/src/StackExchange.Redis/Interfaces/IServer.cs +++ b/src/StackExchange.Redis/Interfaces/IServer.cs @@ -266,6 +266,7 @@ public partial interface IServer : IRedis /// Task ExecuteAsync(string command, params object[] args); +#pragma warning disable RS0026, RS0027 // multiple overloads /// /// Execute an arbitrary command against the server; this is primarily intended for /// executing modules, but may also be used to provide access to new features that lack @@ -280,6 +281,23 @@ public partial interface IServer : IRedis /// Task ExecuteAsync(string command, ICollection args, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026, RS0027 + + /// + /// Execute an arbitrary database-specific command against the server; this is primarily intended for + /// executing modules, but may also be used to provide access to new features that lack + /// a direct API. + /// + /// The database ID; if , the configured default database is used. + /// The command to run. + /// The arguments to pass for the command. + /// The flags to use for this operation. + /// A dynamic representation of the command's result. + /// This API should be considered an advanced feature; inappropriate use can be harmful. + RedisResult Execute(int? database, string command, ICollection args, CommandFlags flags = CommandFlags.None); + + /// + Task ExecuteAsync(int? database, string command, ICollection args, CommandFlags flags = CommandFlags.None); /// /// Delete all the keys of all databases on the server. diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 66c49976e..e82af2bee 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -1957,4 +1957,8 @@ StackExchange.Redis.RedisValue.CopyTo(System.Span destination) -> int StackExchange.Redis.RedisValue.GetByteCount() -> int StackExchange.Redis.RedisValue.GetLongByteCount() -> long static StackExchange.Redis.Condition.SortedSetContainsStarting(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue prefix) -> StackExchange.Redis.Condition! -static StackExchange.Redis.Condition.SortedSetNotContainsStarting(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue prefix) -> StackExchange.Redis.Condition! \ No newline at end of file +static StackExchange.Redis.Condition.SortedSetNotContainsStarting(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue prefix) -> StackExchange.Redis.Condition! +StackExchange.Redis.ConnectionMultiplexer.GetServer(StackExchange.Redis.RedisKey key, object? asyncState = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.IServer! +StackExchange.Redis.IConnectionMultiplexer.GetServer(StackExchange.Redis.RedisKey key, object? asyncState = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.IServer! +StackExchange.Redis.IServer.Execute(int? database, string! command, System.Collections.Generic.ICollection! args, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisResult! +StackExchange.Redis.IServer.ExecuteAsync(int? database, string! command, System.Collections.Generic.ICollection! args, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! diff --git a/src/StackExchange.Redis/RedisServer.cs b/src/StackExchange.Redis/RedisServer.cs index af734b0f5..3bc306c69 100644 --- a/src/StackExchange.Redis/RedisServer.cs +++ b/src/StackExchange.Redis/RedisServer.cs @@ -16,9 +16,9 @@ internal sealed class RedisServer : RedisBase, IServer { private readonly ServerEndPoint server; - internal RedisServer(ConnectionMultiplexer multiplexer, ServerEndPoint server, object? asyncState) : base(multiplexer, asyncState) + internal RedisServer(ServerEndPoint server, object? asyncState) : base(server.Multiplexer, asyncState) { - this.server = server ?? throw new ArgumentNullException(nameof(server)); + this.server = server; // definitely can't be null because .Multiplexer in base call } int IServer.DatabaseCount => server.Databases; @@ -1045,6 +1045,20 @@ public Task ExecuteAsync(string command, ICollection args, return ExecuteAsync(msg, ResultProcessor.ScriptResult, defaultValue: RedisResult.NullSingle); } + public RedisResult Execute(int? database, string command, ICollection args, CommandFlags flags = CommandFlags.None) + { + var db = multiplexer.ApplyDefaultDatabase(database ?? -1); + var msg = new RedisDatabase.ExecuteMessage(multiplexer?.CommandMap, db, flags, command, args); + return ExecuteSync(msg, ResultProcessor.ScriptResult, defaultValue: RedisResult.NullSingle); + } + + public Task ExecuteAsync(int? database, string command, ICollection args, CommandFlags flags = CommandFlags.None) + { + var db = multiplexer.ApplyDefaultDatabase(database ?? -1); + var msg = new RedisDatabase.ExecuteMessage(multiplexer?.CommandMap, db, flags, command, args); + return ExecuteAsync(msg, ResultProcessor.ScriptResult, defaultValue: RedisResult.NullSingle); + } + /// /// For testing only. /// diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index e62dc9f43..af98af0f7 100644 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -71,6 +71,12 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) } } + private RedisServer? _defaultServer; + public RedisServer GetRedisServer(object? asyncState) + => asyncState is null + ? (_defaultServer ??= new RedisServer(this, null)) // reuse and memoize + : new RedisServer(this, asyncState); + public EndPoint EndPoint { get; } public ClusterConfiguration? ClusterConfiguration { get; private set; } diff --git a/tests/StackExchange.Redis.Tests/GetServerTests.cs b/tests/StackExchange.Redis.Tests/GetServerTests.cs new file mode 100644 index 000000000..50cb9e7ef --- /dev/null +++ b/tests/StackExchange.Redis.Tests/GetServerTests.cs @@ -0,0 +1,150 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace StackExchange.Redis.Tests; + +public abstract class GetServerTestsBase(ITestOutputHelper output, SharedConnectionFixture fixture) + : TestBase(output, fixture) +{ + protected abstract bool IsCluster { get; } + + [Fact] + public async Task GetServersMemoization() + { + await using var conn = Create(); + + var servers0 = conn.GetServers(); + var servers1 = conn.GetServers(); + + // different array, exact same contents + Assert.NotSame(servers0, servers1); + Assert.NotEmpty(servers0); + Assert.NotNull(servers0); + Assert.NotNull(servers1); + Assert.Equal(servers0.Length, servers1.Length); + for (int i = 0; i < servers0.Length; i++) + { + Assert.Same(servers0[i], servers1[i]); + } + } + + [Fact] + public async Task GetServerByEndpointMemoization() + { + await using var conn = Create(); + var ep = conn.GetEndPoints().First(); + + IServer x = conn.GetServer(ep), y = conn.GetServer(ep); + Assert.Same(x, y); + + object asyncState = "whatever"; + x = conn.GetServer(ep, asyncState); + y = conn.GetServer(ep, asyncState); + Assert.NotSame(x, y); + } + + [Fact] + public async Task GetServerByKeyMemoization() + { + await using var conn = Create(); + RedisKey key = Me(); + string value = $"{key}:value"; + await conn.GetDatabase().StringSetAsync(key, value); + + IServer x = conn.GetServer(key), y = conn.GetServer(key); + Assert.False(y.IsReplica, "IsReplica"); + Assert.Same(x, y); + + y = conn.GetServer(key, flags: CommandFlags.DemandMaster); + Assert.Same(x, y); + + // async state demands separate instance + y = conn.GetServer(key, "async state", flags: CommandFlags.DemandMaster); + Assert.NotSame(x, y); + + // primary and replica should be different + y = conn.GetServer(key, flags: CommandFlags.DemandReplica); + Assert.NotSame(x, y); + Assert.True(y.IsReplica, "IsReplica"); + + // replica again: same + var z = conn.GetServer(key, flags: CommandFlags.DemandReplica); + Assert.Same(y, z); + + // check routed correctly + var actual = (string?)await x.ExecuteAsync(null, "get", [key], CommandFlags.NoRedirect); + Assert.Equal(value, actual); // check value against primary + + // for replica, don't check the value, because of replication delay - just: no error + _ = y.ExecuteAsync(null, "get", [key], CommandFlags.NoRedirect); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task GetServerWithDefaultKey(bool explicitNull) + { + await using var conn = Create(); + bool isCluster = conn.ServerSelectionStrategy.ServerType == ServerType.Cluster; + Assert.Equal(IsCluster, isCluster); // check our assumptions! + + // we expect explicit null and default to act the same, but: check + RedisKey key = explicitNull ? RedisKey.Null : default(RedisKey); + + IServer primary = conn.GetServer(key); + Assert.False(primary.IsReplica); + + IServer replica = conn.GetServer(key, flags: CommandFlags.DemandReplica); + Assert.True(replica.IsReplica); + + // check multiple calls + HashSet uniques = []; + for (int i = 0; i < 100; i++) + { + uniques.Add(conn.GetServer(key)); + } + + if (isCluster) + { + Assert.True(uniques.Count > 1); // should be able to get arbitrary servers + } + else + { + Assert.Single(uniques); + } + + uniques.Clear(); + for (int i = 0; i < 100; i++) + { + uniques.Add(conn.GetServer(key, flags: CommandFlags.DemandReplica)); + } + + if (isCluster) + { + Assert.True(uniques.Count > 1); // should be able to get arbitrary servers + } + else + { + Assert.Single(uniques); + } + } +} + +[RunPerProtocol] +public class GetServerTestsCluster(ITestOutputHelper output, SharedConnectionFixture fixture) : GetServerTestsBase(output, fixture) +{ + protected override string GetConfiguration() => TestConfig.Current.ClusterServersAndPorts; + + protected override bool IsCluster => true; +} + +[RunPerProtocol] +public class GetServerTestsStandalone(ITestOutputHelper output, SharedConnectionFixture fixture) : GetServerTestsBase(output, fixture) +{ + protected override string GetConfiguration() => // we want to test flags usage including replicas + TestConfig.Current.PrimaryServerAndPort + "," + TestConfig.Current.ReplicaServerAndPort; + + protected override bool IsCluster => false; +} diff --git a/tests/StackExchange.Redis.Tests/Helpers/SharedConnectionFixture.cs b/tests/StackExchange.Redis.Tests/Helpers/SharedConnectionFixture.cs index cf6c7d326..9656ee45b 100644 --- a/tests/StackExchange.Redis.Tests/Helpers/SharedConnectionFixture.cs +++ b/tests/StackExchange.Redis.Tests/Helpers/SharedConnectionFixture.cs @@ -197,7 +197,7 @@ public event EventHandler ServerMaintenanceEvent public IServer GetServer(IPAddress host, int port) => _inner.GetServer(host, port); public IServer GetServer(EndPoint endpoint, object? asyncState = null) => _inner.GetServer(endpoint, asyncState); - + public IServer GetServer(RedisKey key, object? asyncState = null, CommandFlags flags = CommandFlags.None) => _inner.GetServer(key, asyncState, flags); public IServer[] GetServers() => _inner.GetServers(); public string GetStatus() => _inner.GetStatus(); diff --git a/tests/StackExchange.Redis.Tests/KeyTests.cs b/tests/StackExchange.Redis.Tests/KeyTests.cs index 31cd87d79..6df725e21 100644 --- a/tests/StackExchange.Redis.Tests/KeyTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyTests.cs @@ -182,8 +182,8 @@ public async Task KeyEncoding() db.KeyDelete(key, CommandFlags.FireAndForget); db.StringSet(key, "new value", flags: CommandFlags.FireAndForget); - Assert.Equal("embstr", db.KeyEncoding(key)); - Assert.Equal("embstr", await db.KeyEncodingAsync(key)); + Assert.True(db.KeyEncoding(key) is "embstr" or "raw"); + Assert.True(await db.KeyEncodingAsync(key) is "embstr" or "raw"); db.KeyDelete(key, CommandFlags.FireAndForget); db.ListLeftPush(key, "new value", flags: CommandFlags.FireAndForget); diff --git a/tests/StackExchange.Redis.Tests/LoggerTests.cs b/tests/StackExchange.Redis.Tests/LoggerTests.cs index bc097d24c..e001250b0 100644 --- a/tests/StackExchange.Redis.Tests/LoggerTests.cs +++ b/tests/StackExchange.Redis.Tests/LoggerTests.cs @@ -52,7 +52,7 @@ public class TestWrapperLoggerFactory(ILogger logger) : ILoggerFactory { public TestWrapperLogger Logger { get; } = new TestWrapperLogger(logger); - public void AddProvider(ILoggerProvider provider) => throw new NotImplementedException(); + public void AddProvider(ILoggerProvider provider) { } public ILogger CreateLogger(string categoryName) => Logger; public void Dispose() { } } @@ -81,9 +81,9 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except private class TestMultiLogger(params ILogger[] loggers) : ILogger { #if NET8_0_OR_GREATER - public IDisposable? BeginScope(TState state) where TState : notnull => throw new NotImplementedException(); + public IDisposable? BeginScope(TState state) where TState : notnull => null; #else - public IDisposable BeginScope(TState state) => throw new NotImplementedException(); + public IDisposable BeginScope(TState state) => null!; #endif public bool IsEnabled(LogLevel logLevel) => true; public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) @@ -105,9 +105,9 @@ public TestLogger(LogLevel logLevel, TextWriter output) => (_logLevel, _output) = (logLevel, output); #if NET8_0_OR_GREATER - public IDisposable? BeginScope(TState state) where TState : notnull => throw new NotImplementedException(); + public IDisposable? BeginScope(TState state) where TState : notnull => null; #else - public IDisposable BeginScope(TState state) => throw new NotImplementedException(); + public IDisposable BeginScope(TState state) => null!; #endif public bool IsEnabled(LogLevel logLevel) => logLevel >= _logLevel; public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter)