Skip to content

Commit 0172e03

Browse files
vandyvillamgravell
andauthored
Support sharded pubsub commands (#2498)
* Support sharded pubsub * Support sharded pubsub * fix api * fix enum * fix api --------- Co-authored-by: Marc Gravell <[email protected]>
1 parent cfbd474 commit 0172e03

File tree

12 files changed

+80
-29
lines changed

12 files changed

+80
-29
lines changed

docs/Timeouts.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ By default Redis Timeout exception(s) includes useful information, which can hel
8888
|qs | Queue-Awaiting-Response : {int}|There are x operations currently awaiting replies from redis server.|
8989
|aw | Active-Writer: {bool}||
9090
|bw | Backlog-Writer: {enum} | Possible values are Inactive, Started, CheckingForWork, CheckingForTimeout, RecordingTimeout, WritingMessage, Flushing, MarkingInactive, RecordingWriteFailure, RecordingFault, SettingIdle, SpinningDown, Faulted|
91-
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
91+
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubSMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
9292
|ws | Write-State: {enum}| Possible values are Initializing, Idle, Writing, Flushing, Flushed, NA|
9393
|in | Inbound-Bytes : {long}|there are x bytes waiting to be read from the input stream from redis|
9494
|in-pipe | Inbound-Pipe-Bytes: {long}|Bytes waiting to be read|

src/StackExchange.Redis/CommandMap.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public sealed class CommandMap
3131

3232
RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!
3333

34-
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
34+
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,
3535

3636
RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,
3737

@@ -57,7 +57,9 @@ public sealed class CommandMap
5757

5858
RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!
5959

60-
RedisCommand.PSUBSCRIBE, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
60+
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,
61+
62+
RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,
6163

6264
RedisCommand.SCRIPT,
6365

src/StackExchange.Redis/Enums/RedisCommand.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,16 @@ internal enum RedisCommand
181181
SORT,
182182
SORT_RO,
183183
SPOP,
184+
SPUBLISH,
184185
SRANDMEMBER,
185186
SREM,
186187
STRLEN,
187188
SUBSCRIBE,
188189
SUNION,
189190
SUNIONSTORE,
190191
SSCAN,
192+
SSUBSCRIBE,
193+
SUNSUBSCRIBE,
191194
SWAPDB,
192195
SYNC,
193196

@@ -447,10 +450,13 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
447450
case RedisCommand.SMEMBERS:
448451
case RedisCommand.SMISMEMBER:
449452
case RedisCommand.SORT_RO:
453+
case RedisCommand.SPUBLISH:
450454
case RedisCommand.SRANDMEMBER:
455+
case RedisCommand.SSUBSCRIBE:
451456
case RedisCommand.STRLEN:
452457
case RedisCommand.SUBSCRIBE:
453458
case RedisCommand.SUNION:
459+
case RedisCommand.SUNSUBSCRIBE:
454460
case RedisCommand.SSCAN:
455461
case RedisCommand.SYNC:
456462
case RedisCommand.TIME:

src/StackExchange.Redis/Message.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,9 @@ internal static bool RequiresDatabase(RedisCommand command)
569569
case RedisCommand.SLAVEOF:
570570
case RedisCommand.SLOWLOG:
571571
case RedisCommand.SUBSCRIBE:
572+
case RedisCommand.SPUBLISH:
573+
case RedisCommand.SSUBSCRIBE:
574+
case RedisCommand.SUNSUBSCRIBE:
572575
case RedisCommand.SWAPDB:
573576
case RedisCommand.SYNC:
574577
case RedisCommand.TIME:

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable
2929

3030
private const int DefaultRedisDatabaseCount = 16;
3131

32-
private static readonly CommandBytes message = "message", pmessage = "pmessage";
32+
private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";
3333

3434
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
3535
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
@@ -1644,9 +1644,9 @@ private void MatchResult(in RawResult result)
16441644

16451645
// out of band message does not match to a queued message
16461646
var items = result.GetItems();
1647-
if (items.Length >= 3 && items[0].IsEqual(message))
1647+
if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))
16481648
{
1649-
_readStatus = ReadStatus.PubSubMessage;
1649+
_readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;
16501650

16511651
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
16521652
var configChanged = muxer.ConfigurationChangedChannel;
@@ -1668,8 +1668,14 @@ private void MatchResult(in RawResult result)
16681668
}
16691669

16701670
// invoke the handlers
1671-
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
1672-
Trace("MESSAGE: " + channel);
1671+
RedisChannel channel;
1672+
if (items[0].IsEqual(message)) {
1673+
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false);
1674+
Trace("MESSAGE: " + channel);
1675+
} else {
1676+
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: true);
1677+
Trace("SMESSAGE: " + channel);
1678+
}
16731679
if (!channel.IsNull)
16741680
{
16751681
if (TryGetPubSubPayload(items[2], out var payload))
@@ -1690,27 +1696,27 @@ private void MatchResult(in RawResult result)
16901696
{
16911697
_readStatus = ReadStatus.PubSubPMessage;
16921698

1693-
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
1699+
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false);
16941700
Trace("PMESSAGE: " + channel);
16951701
if (!channel.IsNull)
16961702
{
16971703
if (TryGetPubSubPayload(items[3], out var payload))
16981704
{
1699-
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
1705+
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false);
17001706
_readStatus = ReadStatus.InvokePubSub;
17011707
muxer.OnMessage(sub, channel, payload);
17021708
}
17031709
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
17041710
{
1705-
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
1711+
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false);
17061712
_readStatus = ReadStatus.InvokePubSub;
17071713
muxer.OnMessage(sub, channel, payloads);
17081714
}
17091715
}
17101716
return; // AND STOP PROCESSING!
17111717
}
17121718

1713-
// if it didn't look like "[p]message", then we still need to process the pending queue
1719+
// if it didn't look like "[p|s]message", then we still need to process the pending queue
17141720
}
17151721
Trace("Matching result...");
17161722

@@ -2110,6 +2116,7 @@ internal enum ReadStatus
21102116
MatchResult,
21112117
PubSubMessage,
21122118
PubSubPMessage,
2119+
PubSubSMessage,
21132120
Reconfigure,
21142121
InvokePubSub,
21152122
ResponseSequenceCheck, // high-integrity mode only

src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,12 +1309,15 @@ StackExchange.Redis.RedisChannel
13091309
StackExchange.Redis.RedisChannel.Equals(StackExchange.Redis.RedisChannel other) -> bool
13101310
StackExchange.Redis.RedisChannel.IsNullOrEmpty.get -> bool
13111311
StackExchange.Redis.RedisChannel.IsPattern.get -> bool
1312+
StackExchange.Redis.RedisChannel.IsSharded.get -> bool
13121313
StackExchange.Redis.RedisChannel.PatternMode
13131314
StackExchange.Redis.RedisChannel.PatternMode.Auto = 0 -> StackExchange.Redis.RedisChannel.PatternMode
13141315
StackExchange.Redis.RedisChannel.PatternMode.Literal = 1 -> StackExchange.Redis.RedisChannel.PatternMode
13151316
StackExchange.Redis.RedisChannel.PatternMode.Pattern = 2 -> StackExchange.Redis.RedisChannel.PatternMode
13161317
StackExchange.Redis.RedisChannel.RedisChannel() -> void
1318+
StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, bool isSharded) -> void
13171319
StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void
1320+
StackExchange.Redis.RedisChannel.RedisChannel(string! value, bool isSharded) -> void
13181321
StackExchange.Redis.RedisChannel.RedisChannel(string! value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void
13191322
StackExchange.Redis.RedisCommandException
13201323
StackExchange.Redis.RedisCommandException.RedisCommandException(string! message) -> void

src/StackExchange.Redis/RawResult.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,20 +161,20 @@ public bool MoveNext()
161161
}
162162
public ReadOnlySequence<byte> Current { get; private set; }
163163
}
164-
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode)
164+
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode, bool isSharded)
165165
{
166166
switch (Resp2TypeBulkString)
167167
{
168168
case ResultType.SimpleString:
169169
case ResultType.BulkString:
170170
if (channelPrefix == null)
171171
{
172-
return new RedisChannel(GetBlob(), mode);
172+
return isSharded ? new RedisChannel(GetBlob(), true) : new RedisChannel(GetBlob(), mode);
173173
}
174174
if (StartsWith(channelPrefix))
175175
{
176176
byte[] copy = Payload.Slice(channelPrefix.Length).ToArray();
177-
return new RedisChannel(copy, mode);
177+
return isSharded ? new RedisChannel(copy, true) : new RedisChannel(copy, mode);
178178
}
179179
return default;
180180
default:

src/StackExchange.Redis/RedisChannel.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace StackExchange.Redis
1010
{
1111
internal readonly byte[]? Value;
1212
internal readonly bool _isPatternBased;
13+
internal readonly bool _isSharded;
1314

1415
/// <summary>
1516
/// Indicates whether the channel-name is either null or a zero-length value.
@@ -21,6 +22,11 @@ namespace StackExchange.Redis
2122
/// </summary>
2223
public bool IsPattern => _isPatternBased;
2324

25+
/// <summary>
26+
/// Indicates whether this channel represents a shard channel (see <c>SSUBSCRIBE</c>)
27+
/// </summary>
28+
public bool IsSharded => _isSharded;
29+
2430
internal bool IsNull => Value == null;
2531

2632
/// <summary>
@@ -59,7 +65,7 @@ public static bool UseImplicitAutoPattern
5965
/// </summary>
6066
/// <param name="value">The name of the channel to create.</param>
6167
/// <param name="mode">The mode for name matching.</param>
62-
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { }
68+
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode), false) { }
6369

6470
/// <summary>
6571
/// Create a new redis channel from a string, explicitly controlling the pattern mode.
@@ -68,10 +74,25 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt
6874
/// <param name="mode">The mode for name matching.</param>
6975
public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { }
7076

71-
private RedisChannel(byte[]? value, bool isPatternBased)
77+
/// <summary>
78+
/// Create a new redis channel from a buffer, explicitly controlling the sharding mode.
79+
/// </summary>
80+
/// <param name="value">The name of the channel to create.</param>
81+
/// <param name="isSharded">Whether the channel is sharded.</param>
82+
public RedisChannel(byte[]? value, bool isSharded) : this(value, false, isSharded) {}
83+
84+
/// <summary>
85+
/// Create a new redis channel from a string, explicitly controlling the sharding mode.
86+
/// </summary>
87+
/// <param name="value">The string name of the channel to create.</param>
88+
/// <param name="isSharded">Whether the channel is sharded.</param>
89+
public RedisChannel(string value, bool isSharded) : this(value == null ? null : Encoding.UTF8.GetBytes(value), isSharded) {}
90+
91+
private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded)
7292
{
7393
Value = value;
7494
_isPatternBased = isPatternBased;
95+
_isSharded = isSharded;
7596
}
7697

7798
private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
@@ -123,7 +144,7 @@ private RedisChannel(byte[]? value, bool isPatternBased)
123144
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
124145
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
125146
public static bool operator ==(RedisChannel x, RedisChannel y) =>
126-
x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value);
147+
x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value) && x._isSharded == y._isSharded;
127148

128149
/// <summary>
129150
/// Indicate whether two channel names are equal.
@@ -171,10 +192,10 @@ private RedisChannel(byte[]? value, bool isPatternBased)
171192
/// Indicate whether two channel names are equal.
172193
/// </summary>
173194
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
174-
public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value);
195+
public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value) && _isSharded == other._isSharded;
175196

176197
/// <inheritdoc/>
177-
public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0);
198+
public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0) + (_isSharded ? 2 : 0);
178199

179200
/// <summary>
180201
/// Obtains a string representation of the channel name.
@@ -286,4 +307,4 @@ public static implicit operator RedisChannel(byte[]? key)
286307
private RedisChannel(byte[]? value) => throw new NotSupportedException();
287308
#endif
288309
}
289-
}
310+
}

src/StackExchange.Redis/RedisSubscriber.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,17 @@ public Subscription(CommandFlags flags)
183183
internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall)
184184
{
185185
var isPattern = channel._isPatternBased;
186+
var isSharded = channel._isSharded;
186187
var command = action switch
187188
{
188189
SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE,
189190
SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE,
190191

191-
SubscriptionAction.Subscribe when !isPattern => RedisCommand.SUBSCRIBE,
192-
SubscriptionAction.Unsubscribe when !isPattern => RedisCommand.UNSUBSCRIBE,
192+
SubscriptionAction.Subscribe when isSharded => RedisCommand.SSUBSCRIBE,
193+
SubscriptionAction.Unsubscribe when isSharded => RedisCommand.SUNSUBSCRIBE,
194+
195+
SubscriptionAction.Subscribe when !isPattern && !isSharded => RedisCommand.SUBSCRIBE,
196+
SubscriptionAction.Unsubscribe when !isPattern && !isSharded => RedisCommand.UNSUBSCRIBE,
193197
_ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"),
194198
};
195199

@@ -370,14 +374,14 @@ private static void ThrowIfNull(in RedisChannel channel)
370374
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
371375
{
372376
ThrowIfNull(channel);
373-
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
377+
var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
374378
return ExecuteSync(msg, ResultProcessor.Int64);
375379
}
376380

377381
public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
378382
{
379383
ThrowIfNull(channel);
380-
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
384+
var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
381385
return ExecuteAsync(msg, ResultProcessor.Int64);
382386
}
383387

@@ -515,6 +519,7 @@ private bool UnregisterSubscription(in RedisChannel channel, Action<RedisChannel
515519
return false;
516520
}
517521

522+
// TODO: We need a new api to support SUNSUBSCRIBE all. Calling this now would unsubscribe both sharded and unsharded channels.
518523
public void UnsubscribeAll(CommandFlags flags = CommandFlags.None)
519524
{
520525
// TODO: Unsubscribe variadic commands to reduce round trips

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
469469

470470
var newServer = message.Command switch
471471
{
472-
RedisCommand.SUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint,
473-
_ => null,
472+
RedisCommand.SUBSCRIBE or RedisCommand.SSUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint,
473+
_ => null
474474
};
475475
Subscription?.SetCurrentServer(newServer);
476476
return true;
@@ -1526,7 +1526,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
15261526
{
15271527
case ResultType.Array:
15281528
var final = result.ToArray(
1529-
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode),
1529+
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false),
15301530
new ChannelState(connection.ChannelPrefix, mode))!;
15311531

15321532
SetResult(message, final);

0 commit comments

Comments
 (0)