Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Current package versions:
## Unreleased

- Fix [#2951](https://github.com/StackExchange/StackExchange.Redis/issues/2951) - sentinel reconnection failure ([#2956 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2956))
- Mitigate [#2955](https://github.com/StackExchange/StackExchange.Redis/issues/2955) (unbalanced pub/sub routing) ([#2958 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2958))

## 2.9.17

Expand Down
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,8 @@ protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, i

public override string CommandAndKey => Command + " " + Channel;

public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(Channel);
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
=> Channel.UseClusterRouting ? serverSelectionStrategy.HashSlot(Channel) : ServerSelectionStrategy.NoSlot;
}

internal abstract class CommandKeyBase : Message
Expand Down
4 changes: 4 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
#nullable enable
static StackExchange.Redis.RedisChannel.LiteralRouted(byte[]! value) -> StackExchange.Redis.RedisChannel
static StackExchange.Redis.RedisChannel.LiteralRouted(string! value) -> StackExchange.Redis.RedisChannel
static StackExchange.Redis.RedisChannel.PatternRouted(byte[]! value) -> StackExchange.Redis.RedisChannel
static StackExchange.Redis.RedisChannel.PatternRouted(string! value) -> StackExchange.Redis.RedisChannel
129 changes: 102 additions & 27 deletions src/StackExchange.Redis/RedisChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,20 @@ internal enum RedisChannelOptions
None = 0,
Pattern = 1 << 0,
Sharded = 1 << 1,
Routed = 1 << 2,
}

// we don't consider Routed for equality - it's an implementation detail, not a fundamental feature
private const RedisChannelOptions EqualityMask = ~RedisChannelOptions.Routed;

internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;

/// <summary>
/// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios,
/// or to scenarios using <see cref="RedisChannel.LiteralRouted(string)"/> / <see cref="RedisChannel.LiteralRouted(byte[])"/>.
/// </summary>
internal bool UseClusterRouting => (Options & (RedisChannelOptions.Sharded | RedisChannelOptions.Routed)) != 0;

/// <summary>
/// Indicates whether the channel-name is either null or a zero-length value.
/// </summary>
Expand Down Expand Up @@ -51,24 +61,68 @@ public static bool UseImplicitAutoPattern
private static PatternMode s_DefaultPatternMode = PatternMode.Auto;

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription.
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
/// environments, this channel will be freely routed to any applicable server - different client nodes
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
/// very few channels. In non-cluster environments, routing is not a consideration.
/// </summary>
public static RedisChannel Literal(string value) => new(value, RedisChannelOptions.None);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
/// environments, this channel will be freely routed to any applicable server - different client nodes
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
/// very few channels. In non-cluster environments, routing is not a consideration.
/// </summary>
public static RedisChannel Literal(byte[] value) => new(value, RedisChannelOptions.None);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
/// a consideration.
/// </summary>
public static RedisChannel LiteralRouted(string value) => new(value, RedisChannelOptions.Routed);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
/// a consideration.
/// </summary>
public static RedisChannel LiteralRouted(byte[] value) => new(value, RedisChannelOptions.Routed);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
/// environments, this channel will be freely routed to any applicable server - different client nodes
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
/// very few channels. In non-cluster environments, routing is not a consideration.
/// </summary>
public static RedisChannel Literal(string value) => new RedisChannel(value, PatternMode.Literal);
public static RedisChannel Pattern(string value) => new(value, RedisChannelOptions.Pattern);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription.
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
/// environments, this channel will be freely routed to any applicable server - different client nodes
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
/// very few channels. In non-cluster environments, routing is not a consideration.
/// </summary>
public static RedisChannel Literal(byte[] value) => new RedisChannel(value, PatternMode.Literal);
public static RedisChannel Pattern(byte[] value) => new(value, RedisChannelOptions.Pattern);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription.
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
/// a consideration.
/// </summary>
public static RedisChannel Pattern(string value) => new RedisChannel(value, PatternMode.Pattern);
public static RedisChannel PatternRouted(string value) => new(value, RedisChannelOptions.Pattern | RedisChannelOptions.Routed);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription.
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
/// a consideration.
/// </summary>
public static RedisChannel Pattern(byte[] value) => new RedisChannel(value, PatternMode.Pattern);
public static RedisChannel PatternRouted(byte[] value) => new(value, RedisChannelOptions.Pattern | RedisChannelOptions.Routed);

/// <summary>
/// Create a new redis channel from a buffer, explicitly controlling the pattern mode.
Expand All @@ -84,28 +138,45 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
/// <param name="mode">The mode for name matching.</param>
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
{
}

/// <summary>
/// Create a new redis channel from a buffer, representing a sharded channel.
/// Create a new redis channel from a buffer, representing a sharded channel. In cluster
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
/// a consideration.
/// </summary>
/// <param name="value">The name of the channel to create.</param>
/// <remarks>Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
/// using sharded channels must also be published with sharded channels (and vice versa).</remarks>
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);

/// <summary>
/// Create a new redis channel from a string, representing a sharded channel.
/// Create a new redis channel from a string, representing a sharded channel. In cluster
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
/// a consideration.
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);
/// <remarks>Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
/// using sharded channels must also be published with sharded channels (and vice versa).</remarks>
public static RedisChannel Sharded(string value) => new(value, RedisChannelOptions.Sharded);

internal RedisChannel(byte[]? value, RedisChannelOptions options)
{
Value = value;
Options = options;
}

internal RedisChannel(string? value, RedisChannelOptions options)
{
Value = value is null ? null : Encoding.UTF8.GetBytes(value);
Options = options;
}

private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
{
PatternMode.Auto => value != null && Array.IndexOf(value, (byte)'*') >= 0,
Expand Down Expand Up @@ -155,15 +226,17 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(RedisChannel x, RedisChannel y) =>
x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);
(x.Options & EqualityMask) == (y.Options & EqualityMask)
&& RedisValue.Equals(x.Value, y.Value);

/// <summary>
/// Indicate whether two channel names are equal.
/// </summary>
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(string x, RedisChannel y) =>
RedisValue.Equals(x == null ? null : Encoding.UTF8.GetBytes(x), y.Value);
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
RedisValue.Equals(x is null ? null : Encoding.UTF8.GetBytes(x), y.Value);

/// <summary>
/// Indicate whether two channel names are equal.
Expand All @@ -178,7 +251,8 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(RedisChannel x, string y) =>
RedisValue.Equals(x.Value, y == null ? null : Encoding.UTF8.GetBytes(y));
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
RedisValue.Equals(x.Value, y is null ? null : Encoding.UTF8.GetBytes(y));

/// <summary>
/// Indicate whether two channel names are equal.
Expand All @@ -203,10 +277,11 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
/// Indicate whether two channel names are equal.
/// </summary>
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);
public bool Equals(RedisChannel other) => (Options & EqualityMask) == (other.Options & EqualityMask)
&& RedisValue.Equals(Value, other.Value);

/// <inheritdoc/>
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)(Options & EqualityMask);

/// <summary>
/// Obtains a string representation of the channel name.
Expand Down Expand Up @@ -266,23 +341,21 @@ public enum PatternMode
[Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)]
public static implicit operator RedisChannel(string key)
{
if (key == null) return default;
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (key is null) return default;
return new RedisChannel(Encoding.UTF8.GetBytes(key), s_DefaultPatternMode);
}

/// <summary>
/// Create a channel name from a <see cref="T:byte[]"/>.
/// Create a channel name from a <c>byte[]</c>.
/// </summary>
/// <param name="key">The byte array to get a channel from.</param>
[Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)]
public static implicit operator RedisChannel(byte[]? key)
{
if (key == null) return default;
return new RedisChannel(key, s_DefaultPatternMode);
}
=> key is null ? default : new RedisChannel(key, s_DefaultPatternMode);

/// <summary>
/// Obtain the channel name as a <see cref="T:byte[]"/>.
/// Obtain the channel name as a <c>byte[]</c>.
/// </summary>
/// <param name="key">The channel to get a byte[] from.</param>
public static implicit operator byte[]?(RedisChannel key) => key.Value;
Expand All @@ -294,7 +367,7 @@ public static implicit operator RedisChannel(byte[]? key)
public static implicit operator string?(RedisChannel key)
{
var arr = key.Value;
if (arr == null)
if (arr is null)
{
return null;
}
Expand All @@ -303,9 +376,7 @@ public static implicit operator RedisChannel(byte[]? key)
return Encoding.UTF8.GetString(arr);
}
catch (Exception e) when // Only catch exception throwed by Encoding.UTF8.GetString
(e is DecoderFallbackException
|| e is ArgumentException
|| e is ArgumentNullException)
(e is DecoderFallbackException or ArgumentException or ArgumentNullException)
{
return BitConverter.ToString(arr);
}
Expand All @@ -316,8 +387,12 @@ public static implicit operator RedisChannel(byte[]? key)
// giving due consideration to the default pattern mode (UseImplicitAutoPattern)
// (since we don't ship them, we don't need them in release)
[Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)]
// ReSharper disable once UnusedMember.Local
// ReSharper disable once UnusedParameter.Local
private RedisChannel(string value) => throw new NotSupportedException();
[Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)]
// ReSharper disable once UnusedMember.Local
// ReSharper disable once UnusedParameter.Local
private RedisChannel(byte[]? value) => throw new NotSupportedException();
#endif
}
Expand Down
6 changes: 4 additions & 2 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1871,14 +1871,16 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteSync(msg, ResultProcessor.Int64);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}

public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteAsync(msg, ResultProcessor.Int64);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}

public RedisResult Execute(string command, params object[] args)
Expand Down
Loading
Loading