Skip to content

Commit 73b8c5a

Browse files
committed
mitigate #2955
- by default: use round-robin (not channel-routing) for "non-sharded" pub/sub - add new API for channel-routed literals/wildcards - when publishing, if we're also subscribed: use that connection - randomize where the round-robin starts, to better randomize startup behaviour
1 parent 862a70e commit 73b8c5a

File tree

8 files changed

+167
-45
lines changed

8 files changed

+167
-45
lines changed

docs/ReleaseNotes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Current package versions:
99
## Unreleased
1010

1111
- Fix [#2951](https://github.com/StackExchange/StackExchange.Redis/issues/2951) - sentinel reconnection failure ([#2956 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2956))
12+
- Mitigate [#2955](https://github.com/StackExchange/StackExchange.Redis/issues/2955) (unbalanced pub/sub routing) ([#XXXX by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/XXXX))
1213

1314
## 2.9.17
1415

src/StackExchange.Redis/Message.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -866,7 +866,8 @@ protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, i
866866

867867
public override string CommandAndKey => Command + " " + Channel;
868868

869-
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(Channel);
869+
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
870+
=> Channel.UseClusterRouting ? serverSelectionStrategy.HashSlot(Channel) : ServerSelectionStrategy.NoSlot;
870871
}
871872

872873
internal abstract class CommandKeyBase : Message
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
#nullable enable
2+
static StackExchange.Redis.RedisChannel.LiteralRouted(byte[]! value) -> StackExchange.Redis.RedisChannel
3+
static StackExchange.Redis.RedisChannel.LiteralRouted(string! value) -> StackExchange.Redis.RedisChannel
4+
static StackExchange.Redis.RedisChannel.PatternRouted(byte[]! value) -> StackExchange.Redis.RedisChannel
5+
static StackExchange.Redis.RedisChannel.PatternRouted(string! value) -> StackExchange.Redis.RedisChannel

src/StackExchange.Redis/RedisChannel.cs

Lines changed: 102 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,20 @@ internal enum RedisChannelOptions
1818
None = 0,
1919
Pattern = 1 << 0,
2020
Sharded = 1 << 1,
21+
Routed = 1 << 2,
2122
}
2223

24+
// we don't consider Routed for equality - it's an implementation detail, not a fundamental feature
25+
private const RedisChannelOptions EqualityMask = ~RedisChannelOptions.Routed;
26+
2327
internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;
2428

29+
/// <summary>
30+
/// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios,
31+
/// or to scenarios using <see cref="RedisChannel.LiteralRouted(string)"/> / <see cref="RedisChannel.LiteralRouted(byte[])"/>.
32+
/// </summary>
33+
internal bool UseClusterRouting => (Options & (RedisChannelOptions.Sharded | RedisChannelOptions.Routed)) != 0;
34+
2535
/// <summary>
2636
/// Indicates whether the channel-name is either null or a zero-length value.
2737
/// </summary>
@@ -51,24 +61,68 @@ public static bool UseImplicitAutoPattern
5161
private static PatternMode s_DefaultPatternMode = PatternMode.Auto;
5262

5363
/// <summary>
54-
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription.
64+
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
65+
/// environments, this channel will be freely routed to any applicable server - different client nodes
66+
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
67+
/// very few channels. In non-cluster environments, routing is not a consideration.
68+
/// </summary>
69+
public static RedisChannel Literal(string value) => new(value, RedisChannelOptions.None);
70+
71+
/// <summary>
72+
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
73+
/// environments, this channel will be freely routed to any applicable server - different client nodes
74+
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
75+
/// very few channels. In non-cluster environments, routing is not a consideration.
76+
/// </summary>
77+
public static RedisChannel Literal(byte[] value) => new(value, RedisChannelOptions.None);
78+
79+
/// <summary>
80+
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
81+
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
82+
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
83+
/// a consideration.
84+
/// </summary>
85+
public static RedisChannel LiteralRouted(string value) => new(value, RedisChannelOptions.Routed);
86+
87+
/// <summary>
88+
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
89+
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
90+
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
91+
/// a consideration.
92+
/// </summary>
93+
public static RedisChannel LiteralRouted(byte[] value) => new(value, RedisChannelOptions.Routed);
94+
95+
/// <summary>
96+
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
97+
/// environments, this channel will be freely routed to any applicable server - different client nodes
98+
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
99+
/// very few channels. In non-cluster environments, routing is not a consideration.
55100
/// </summary>
56-
public static RedisChannel Literal(string value) => new RedisChannel(value, PatternMode.Literal);
101+
public static RedisChannel Pattern(string value) => new(value, RedisChannelOptions.Pattern);
57102

58103
/// <summary>
59-
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription.
104+
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
105+
/// environments, this channel will be freely routed to any applicable server - different client nodes
106+
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
107+
/// very few channels. In non-cluster environments, routing is not a consideration.
60108
/// </summary>
61-
public static RedisChannel Literal(byte[] value) => new RedisChannel(value, PatternMode.Literal);
109+
public static RedisChannel Pattern(byte[] value) => new(value, RedisChannelOptions.Pattern);
62110

63111
/// <summary>
64-
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription.
112+
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
113+
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
114+
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
115+
/// a consideration.
65116
/// </summary>
66-
public static RedisChannel Pattern(string value) => new RedisChannel(value, PatternMode.Pattern);
117+
public static RedisChannel PatternRouted(string value) => new(value, RedisChannelOptions.Pattern | RedisChannelOptions.Routed);
67118

68119
/// <summary>
69-
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription.
120+
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
121+
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
122+
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
123+
/// a consideration.
70124
/// </summary>
71-
public static RedisChannel Pattern(byte[] value) => new RedisChannel(value, PatternMode.Pattern);
125+
public static RedisChannel PatternRouted(byte[] value) => new(value, RedisChannelOptions.Pattern | RedisChannelOptions.Routed);
72126

73127
/// <summary>
74128
/// Create a new redis channel from a buffer, explicitly controlling the pattern mode.
@@ -84,28 +138,45 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt
84138
/// </summary>
85139
/// <param name="value">The string name of the channel to create.</param>
86140
/// <param name="mode">The mode for name matching.</param>
141+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
87142
public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
88143
{
89144
}
90145

91146
/// <summary>
92-
/// Create a new redis channel from a buffer, representing a sharded channel.
147+
/// Create a new redis channel from a buffer, representing a sharded channel. In cluster
148+
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
149+
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
150+
/// a consideration.
93151
/// </summary>
94152
/// <param name="value">The name of the channel to create.</param>
153+
/// <remarks>Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
154+
/// using sharded channels must also be published with sharded channels (and vice versa).</remarks>
95155
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);
96156

97157
/// <summary>
98-
/// Create a new redis channel from a string, representing a sharded channel.
158+
/// Create a new redis channel from a string, representing a sharded channel. In cluster
159+
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
160+
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
161+
/// a consideration.
99162
/// </summary>
100163
/// <param name="value">The string name of the channel to create.</param>
101-
public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);
164+
/// <remarks>Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
165+
/// using sharded channels must also be published with sharded channels (and vice versa).</remarks>
166+
public static RedisChannel Sharded(string value) => new(value, RedisChannelOptions.Sharded);
102167

103168
internal RedisChannel(byte[]? value, RedisChannelOptions options)
104169
{
105170
Value = value;
106171
Options = options;
107172
}
108173

174+
internal RedisChannel(string? value, RedisChannelOptions options)
175+
{
176+
Value = value is null ? null : Encoding.UTF8.GetBytes(value);
177+
Options = options;
178+
}
179+
109180
private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
110181
{
111182
PatternMode.Auto => value != null && Array.IndexOf(value, (byte)'*') >= 0,
@@ -155,15 +226,17 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
155226
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
156227
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
157228
public static bool operator ==(RedisChannel x, RedisChannel y) =>
158-
x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);
229+
(x.Options & EqualityMask) == (y.Options & EqualityMask)
230+
&& RedisValue.Equals(x.Value, y.Value);
159231

160232
/// <summary>
161233
/// Indicate whether two channel names are equal.
162234
/// </summary>
163235
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
164236
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
165237
public static bool operator ==(string x, RedisChannel y) =>
166-
RedisValue.Equals(x == null ? null : Encoding.UTF8.GetBytes(x), y.Value);
238+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
239+
RedisValue.Equals(x is null ? null : Encoding.UTF8.GetBytes(x), y.Value);
167240

168241
/// <summary>
169242
/// Indicate whether two channel names are equal.
@@ -178,7 +251,8 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
178251
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
179252
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
180253
public static bool operator ==(RedisChannel x, string y) =>
181-
RedisValue.Equals(x.Value, y == null ? null : Encoding.UTF8.GetBytes(y));
254+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
255+
RedisValue.Equals(x.Value, y is null ? null : Encoding.UTF8.GetBytes(y));
182256

183257
/// <summary>
184258
/// Indicate whether two channel names are equal.
@@ -203,10 +277,11 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
203277
/// Indicate whether two channel names are equal.
204278
/// </summary>
205279
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
206-
public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);
280+
public bool Equals(RedisChannel other) => (Options & EqualityMask) == (other.Options & EqualityMask)
281+
&& RedisValue.Equals(Value, other.Value);
207282

208283
/// <inheritdoc/>
209-
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;
284+
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)(Options & EqualityMask);
210285

211286
/// <summary>
212287
/// Obtains a string representation of the channel name.
@@ -266,23 +341,21 @@ public enum PatternMode
266341
[Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)]
267342
public static implicit operator RedisChannel(string key)
268343
{
269-
if (key == null) return default;
344+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
345+
if (key is null) return default;
270346
return new RedisChannel(Encoding.UTF8.GetBytes(key), s_DefaultPatternMode);
271347
}
272348

273349
/// <summary>
274-
/// Create a channel name from a <see cref="T:byte[]"/>.
350+
/// Create a channel name from a <c>byte[]</c>.
275351
/// </summary>
276352
/// <param name="key">The byte array to get a channel from.</param>
277353
[Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)]
278354
public static implicit operator RedisChannel(byte[]? key)
279-
{
280-
if (key == null) return default;
281-
return new RedisChannel(key, s_DefaultPatternMode);
282-
}
355+
=> key is null ? default : new RedisChannel(key, s_DefaultPatternMode);
283356

284357
/// <summary>
285-
/// Obtain the channel name as a <see cref="T:byte[]"/>.
358+
/// Obtain the channel name as a <c>byte[]</c>.
286359
/// </summary>
287360
/// <param name="key">The channel to get a byte[] from.</param>
288361
public static implicit operator byte[]?(RedisChannel key) => key.Value;
@@ -294,7 +367,7 @@ public static implicit operator RedisChannel(byte[]? key)
294367
public static implicit operator string?(RedisChannel key)
295368
{
296369
var arr = key.Value;
297-
if (arr == null)
370+
if (arr is null)
298371
{
299372
return null;
300373
}
@@ -303,9 +376,7 @@ public static implicit operator RedisChannel(byte[]? key)
303376
return Encoding.UTF8.GetString(arr);
304377
}
305378
catch (Exception e) when // Only catch exception throwed by Encoding.UTF8.GetString
306-
(e is DecoderFallbackException
307-
|| e is ArgumentException
308-
|| e is ArgumentNullException)
379+
(e is DecoderFallbackException or ArgumentException or ArgumentNullException)
309380
{
310381
return BitConverter.ToString(arr);
311382
}
@@ -316,8 +387,12 @@ public static implicit operator RedisChannel(byte[]? key)
316387
// giving due consideration to the default pattern mode (UseImplicitAutoPattern)
317388
// (since we don't ship them, we don't need them in release)
318389
[Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)]
390+
// ReSharper disable once UnusedMember.Local
391+
// ReSharper disable once UnusedParameter.Local
319392
private RedisChannel(string value) => throw new NotSupportedException();
320393
[Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)]
394+
// ReSharper disable once UnusedMember.Local
395+
// ReSharper disable once UnusedParameter.Local
321396
private RedisChannel(byte[]? value) => throw new NotSupportedException();
322397
#endif
323398
}

src/StackExchange.Redis/RedisDatabase.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1871,14 +1871,16 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
18711871
{
18721872
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
18731873
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
1874-
return ExecuteSync(msg, ResultProcessor.Int64);
1874+
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
1875+
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
18751876
}
18761877

18771878
public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
18781879
{
18791880
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
18801881
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
1881-
return ExecuteAsync(msg, ResultProcessor.Int64);
1882+
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
1883+
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
18821884
}
18831885

18841886
public RedisResult Execute(string command, params object[] args)

0 commit comments

Comments
 (0)