Skip to content

Commit 029db3d

Browse files
committed
fix unsolicited SUNSUBSCRIBE
1 parent 5aebe65 commit 029db3d

File tree

3 files changed

+170
-83
lines changed

3 files changed

+170
-83
lines changed

src/StackExchange.Redis/Message.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ protected override void WriteImpl(PhysicalConnection physical)
856856

857857
internal abstract class CommandChannelBase : Message
858858
{
859-
protected readonly RedisChannel Channel;
859+
internal readonly RedisChannel Channel;
860860

861861
protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, in RedisChannel channel) : base(db, flags, command)
862862
{

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 139 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ internal sealed partial class PhysicalConnection : IDisposable
2929

3030
private const int DefaultRedisDatabaseCount = 16;
3131

32-
private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage",
33-
subscribe = "subscribe", sunsubscribe = "sunsubscribe";
34-
3532
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
3633
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
3734

@@ -1674,32 +1671,122 @@ private enum PushKind
16741671
{
16751672
None,
16761673
Message,
1677-
SMessage,
16781674
PMessage,
1679-
Subscribe,
1680-
SUnsubscribe,
1675+
SMessage,
1676+
Subscribe = RedisCommand.SUBSCRIBE,
1677+
PSubscribe = RedisCommand.PSUBSCRIBE,
1678+
SSubscribe = RedisCommand.SSUBSCRIBE,
1679+
Unsubscribe = RedisCommand.UNSUBSCRIBE,
1680+
PUnsubscribe = RedisCommand.PUNSUBSCRIBE,
1681+
SUnsubscribe = RedisCommand.SUNSUBSCRIBE,
16811682
}
1682-
private static PushKind GetPushKind(in Sequence<RawResult> result)
1683+
private PushKind GetPushKind(in Sequence<RawResult> result, out RedisChannel channel)
16831684
{
16841685
var len = result.Length;
1685-
if (len >= 1)
1686-
{
1687-
ref readonly RawResult kind = ref result[0];
1688-
if (len >= 3)
1689-
{
1690-
if (kind.IsEqual(message)) return PushKind.Message;
1691-
if (kind.IsEqual(smessage)) return PushKind.SMessage;
1692-
if (len >= 4)
1686+
if (len >= 2) // always have at least the kind and the subscription channel
1687+
{
1688+
const int MAX_LEN = 16;
1689+
Debug.Assert(MAX_LEN >= Enumerable.Max(
1690+
[
1691+
PushMessage.Length, PushPMessage.Length, PushSMessage.Length,
1692+
PushSubscribe.Length, PushPSubscribe.Length, PushSSubscribe.Length,
1693+
PushUnsubscribe.Length, PushPUnsubscribe.Length, PushSUnsubscribe.Length,
1694+
]));
1695+
ref readonly RawResult pushKind = ref result[0];
1696+
var multiSegmentPayload = pushKind.Payload;
1697+
if (multiSegmentPayload.Length <= MAX_LEN)
1698+
{
1699+
var span = multiSegmentPayload.IsSingleSegment
1700+
? multiSegmentPayload.First.Span
1701+
: CopyTo(stackalloc byte[MAX_LEN], multiSegmentPayload);
1702+
1703+
var hash = FastHash.Hash64(span);
1704+
RedisChannel.RedisChannelOptions channelOptions = RedisChannel.RedisChannelOptions.None;
1705+
PushKind kind;
1706+
switch (hash)
1707+
{
1708+
case PushMessage.Hash when PushMessage.Is(hash, span) & len >= 3:
1709+
kind = PushKind.Message;
1710+
break;
1711+
case PushPMessage.Hash when PushPMessage.Is(hash, span) & len >= 4:
1712+
channelOptions = RedisChannel.RedisChannelOptions.Pattern;
1713+
kind = PushKind.PMessage;
1714+
break;
1715+
case PushSMessage.Hash when PushSMessage.Is(hash, span) & len >= 3:
1716+
channelOptions = RedisChannel.RedisChannelOptions.Sharded;
1717+
kind = PushKind.SMessage;
1718+
break;
1719+
case PushSubscribe.Hash when PushSubscribe.Is(hash, span):
1720+
kind = PushKind.Subscribe;
1721+
break;
1722+
case PushPSubscribe.Hash when PushPSubscribe.Is(hash, span):
1723+
channelOptions = RedisChannel.RedisChannelOptions.Pattern;
1724+
kind = PushKind.PSubscribe;
1725+
break;
1726+
case PushSSubscribe.Hash when PushSSubscribe.Is(hash, span):
1727+
channelOptions = RedisChannel.RedisChannelOptions.Sharded;
1728+
kind = PushKind.SSubscribe;
1729+
break;
1730+
case PushUnsubscribe.Hash when PushUnsubscribe.Is(hash, span):
1731+
kind = PushKind.Unsubscribe;
1732+
break;
1733+
case PushPUnsubscribe.Hash when PushPUnsubscribe.Is(hash, span):
1734+
channelOptions = RedisChannel.RedisChannelOptions.Pattern;
1735+
kind = PushKind.PUnsubscribe;
1736+
break;
1737+
case PushSUnsubscribe.Hash when PushSUnsubscribe.Is(hash, span):
1738+
channelOptions = RedisChannel.RedisChannelOptions.Sharded;
1739+
kind = PushKind.SUnsubscribe;
1740+
break;
1741+
default:
1742+
kind = PushKind.None;
1743+
break;
1744+
}
1745+
if (kind != PushKind.None)
16931746
{
1694-
if (kind.IsEqual(pmessage)) return PushKind.PMessage;
1747+
// the channel is always the second element
1748+
channel = result[1].AsRedisChannel(ChannelPrefix, channelOptions);
1749+
return kind;
16951750
}
1696-
if (kind.IsEqual(sunsubscribe)) return PushKind.SUnsubscribe;
16971751
}
1698-
if (kind.IsEqual(subscribe)) return PushKind.Subscribe;
16991752
}
1753+
channel = default;
17001754
return PushKind.None;
1755+
1756+
static ReadOnlySpan<byte> CopyTo(Span<byte> target, in ReadOnlySequence<byte> source)
1757+
{
1758+
source.CopyTo(target);
1759+
return target.Slice(0, (int)source.Length);
1760+
}
17011761
}
17021762

1763+
[FastHash("message")]
1764+
private static partial class PushMessage { }
1765+
1766+
[FastHash("pmessage")]
1767+
private static partial class PushPMessage { }
1768+
1769+
[FastHash("smessage")]
1770+
private static partial class PushSMessage { }
1771+
1772+
[FastHash("subscribe")]
1773+
private static partial class PushSubscribe { }
1774+
1775+
[FastHash("psubscribe")]
1776+
private static partial class PushPSubscribe { }
1777+
1778+
[FastHash("ssubscribe")]
1779+
private static partial class PushSSubscribe { }
1780+
1781+
[FastHash("unsubscribe")]
1782+
private static partial class PushUnsubscribe { }
1783+
1784+
[FastHash("punsubscribe")]
1785+
private static partial class PushPUnsubscribe { }
1786+
1787+
[FastHash("sunsubscribe")]
1788+
private static partial class PushSUnsubscribe { }
1789+
17031790
private void MatchResult(in RawResult result)
17041791
{
17051792
// check to see if it could be an out-of-band pubsub message
@@ -1710,7 +1797,7 @@ private void MatchResult(in RawResult result)
17101797

17111798
// out of band message does not match to a queued message
17121799
var items = result.GetItems();
1713-
var kind = GetPushKind(items);
1800+
var kind = GetPushKind(items, out var subscriptionChannel);
17141801
switch (kind)
17151802
{
17161803
case PushKind.Message:
@@ -1741,89 +1828,55 @@ private void MatchResult(in RawResult result)
17411828
}
17421829

17431830
// invoke the handlers
1744-
RedisChannel channel;
1745-
if (items[0].IsEqual(message))
1746-
{
1747-
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
1748-
Trace("MESSAGE: " + channel);
1749-
}
1750-
else // see check on outer-if that restricts to message / smessage
1751-
{
1752-
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
1753-
Trace("SMESSAGE: " + channel);
1754-
}
1755-
1756-
if (!channel.IsNull)
1831+
if (!subscriptionChannel.IsNull)
17571832
{
1833+
Trace($"{kind}: {subscriptionChannel}");
17581834
if (TryGetPubSubPayload(items[2], out var payload))
17591835
{
17601836
_readStatus = ReadStatus.InvokePubSub;
1761-
muxer.OnMessage(channel, channel, payload);
1837+
muxer.OnMessage(subscriptionChannel, subscriptionChannel, payload);
17621838
}
17631839
// could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507
17641840
else if (TryGetMultiPubSubPayload(items[2], out var payloads))
17651841
{
17661842
_readStatus = ReadStatus.InvokePubSub;
1767-
muxer.OnMessage(channel, channel, payloads);
1843+
muxer.OnMessage(subscriptionChannel, subscriptionChannel, payloads);
17681844
}
17691845
}
1770-
1771-
return; // AND STOP PROCESSING!
1846+
return; // and stop processing
17721847
case PushKind.PMessage:
17731848
_readStatus = ReadStatus.PubSubPMessage;
17741849

1775-
channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1776-
1777-
Trace("PMESSAGE: " + channel);
1778-
if (!channel.IsNull)
1850+
var messageChannel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
1851+
if (!messageChannel.IsNull)
17791852
{
1853+
Trace($"{kind}: {messageChannel} via {subscriptionChannel}");
17801854
if (TryGetPubSubPayload(items[3], out var payload))
17811855
{
1782-
var sub = items[1].AsRedisChannel(
1783-
ChannelPrefix,
1784-
RedisChannel.RedisChannelOptions.Pattern);
1785-
17861856
_readStatus = ReadStatus.InvokePubSub;
1787-
muxer.OnMessage(sub, channel, payload);
1857+
muxer.OnMessage(subscriptionChannel, messageChannel, payload);
17881858
}
17891859
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
17901860
{
1791-
var sub = items[1].AsRedisChannel(
1792-
ChannelPrefix,
1793-
RedisChannel.RedisChannelOptions.Pattern);
1794-
17951861
_readStatus = ReadStatus.InvokePubSub;
1796-
muxer.OnMessage(sub, channel, payloads);
1862+
muxer.OnMessage(subscriptionChannel, messageChannel, payloads);
17971863
}
17981864
}
1799-
1800-
break;
1801-
case PushKind.SUnsubscribe:
1802-
_readStatus = ReadStatus.PubSubSUnsubscribe;
1803-
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
1865+
return; // and stop processing
1866+
case PushKind.SUnsubscribe when !PeekChannelMessage(RedisCommand.SUNSUBSCRIBE, subscriptionChannel):
1867+
// then it was *unsolicited* - this probably means the slot was migrated
1868+
// (otherwise, we'll let the command-processor deal with it)
1869+
_readStatus = ReadStatus.PubSubUnsubscribe;
18041870
var server = BridgeCouldBeNull?.ServerEndPoint;
1805-
if (server is not null && muxer.TryGetSubscription(channel, out var subscription))
1871+
if (server is not null && muxer.TryGetSubscription(subscriptionChannel, out var subscription))
18061872
{
18071873
if (subscription.GetCurrentServer() == server)
18081874
{
1809-
// definitely isn't this connection any more, but we were listening
1810-
subscription.SetCurrentServer(null);
1811-
muxer.ReconfigureIfNeeded(server.EndPoint, fromBroadcast: true, nameof(sunsubscribe));
1875+
subscription.SetCurrentServer(null); // wipe
1876+
muxer.ReconfigureIfNeeded(server.EndPoint, fromBroadcast: true, PushSUnsubscribe.Text);
18121877
}
18131878
}
1814-
break;
1815-
}
1816-
1817-
switch (kind)
1818-
{
1819-
// we recognized it a RESP2 OOB, or it was explicitly *any* RESP3 push notification
1820-
// (even if we didn't recognize the kind) - we're done; unless it is "subscribe", which
1821-
// is *technically* a push, but we still want to treat it as a response to the original message
1822-
case PushKind.None when result.Resp3Type != ResultType.Push:
1823-
case PushKind.Subscribe:
1824-
break; // continue, try to match to a pending message
1825-
default:
1826-
return; // we're done with this message (RESP3 OOB, or something we recognized)
1879+
return; // and STOP PROCESSING; unsolicited
18271880
}
18281881
}
18291882
Trace("Matching result...");
@@ -1942,6 +1995,19 @@ static bool TryGetMultiPubSubPayload(in RawResult value, out Sequence<RawResult>
19421995
}
19431996
}
19441997

1998+
private bool PeekChannelMessage(RedisCommand command, RedisChannel channel)
1999+
{
2000+
Message? msg;
2001+
bool haveMsg;
2002+
lock (_writtenAwaitingResponse)
2003+
{
2004+
haveMsg = _writtenAwaitingResponse.TryPeek(out msg);
2005+
}
2006+
2007+
return haveMsg && msg is CommandChannelBase typed
2008+
&& typed.Command == command && typed.Channel == channel;
2009+
}
2010+
19452011
private volatile Message? _activeMessage;
19462012

19472013
internal void GetHeadMessages(out Message? now, out Message? next)
@@ -2235,7 +2301,7 @@ internal enum ReadStatus
22352301
MatchResultComplete,
22362302
ResetArena,
22372303
ProcessBufferComplete,
2238-
PubSubSUnsubscribe,
2304+
PubSubUnsubscribe,
22392305
NA = -1,
22402306
}
22412307
private volatile ReadStatus _readStatus;

tests/StackExchange.Redis.Tests/ClusterShardedTests.cs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ public async Task SubscribeToWrongServerAsync(bool sharded)
218218
// we should end up where we *actually sent the message* - there is no -MOVED
219219
Assert.Equal(serverEndpoint.EndPoint, actual);
220220
}
221+
222+
Log("Unsubscribing...");
221223
await queue.UnsubscribeAsync();
224+
Log("Unsubscribed.");
222225
}
223226

224227
[Fact]
@@ -244,6 +247,19 @@ public async Task KeepSubscribedThroughSlotMigrationAsync()
244247

245248
var oldServer = conn.GetServer(asKey); // this is where it *should* go
246249

250+
using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
251+
{
252+
// now publish... we *expect* things to have sorted themselves out
253+
var msg = Guid.NewGuid().ToString();
254+
var count = await subscriber.PublishAsync(channel, msg);
255+
Assert.Equal(1, count);
256+
257+
Log("Waiting for message on original subscription...");
258+
var received = await queue.ReadAsync(timeout.Token);
259+
Log($"Message received: {received.Message}");
260+
Assert.Equal(msg, (string)received.Message!);
261+
}
262+
247263
// now intentionally choose *a different* server
248264
var newServer = conn.GetServers().First(s => !Equals(s.EndPoint, oldServer.EndPoint));
249265

@@ -312,16 +328,21 @@ void WriteLog(string caption, RedisResult result)
312328
await conn.ConfigureAsync();
313329
Assert.Equal(newServer, conn.GetServer(asKey));
314330

315-
// now publish... we *expect* things to have sorted themselves out
316-
var msg = Guid.NewGuid().ToString();
317-
var count = await subscriber.PublishAsync(channel, msg);
318-
Assert.Equal(1, count);
319-
320-
Log("Waiting for message...");
321-
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(20));
322-
var received = await queue.ReadAsync(timeout.Token);
323-
Assert.Equal(msg, (string)received.Message!);
331+
using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
332+
{
333+
// now publish... we *expect* things to have sorted themselves out
334+
var msg = Guid.NewGuid().ToString();
335+
var count = await subscriber.PublishAsync(channel, msg);
336+
Assert.Equal(1, count);
337+
338+
Log("Waiting for message on moved subscription...");
339+
var received = await queue.ReadAsync(timeout.Token);
340+
Log($"Message received: {received.Message}");
341+
Assert.Equal(msg, (string)received.Message!);
342+
}
324343

344+
Log("Unsubscribing...");
325345
await queue.UnsubscribeAsync();
346+
Log("Unsubscribed.");
326347
}
327348
}

0 commit comments

Comments
 (0)