Skip to content

Commit 0820d60

Browse files
committed
Adding support for cluster keyspace notifications to subscriber by allowing subscription to multiple nodes
1 parent fb4a630 commit 0820d60

File tree

2 files changed

+63
-12
lines changed

2 files changed

+63
-12
lines changed

StackExchange.Redis/StackExchange/Redis/RedisChannel.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ private RedisChannel(byte[] value, bool isPatternBased)
3838
{
3939
Value = value;
4040
IsPatternBased = isPatternBased;
41+
IsKeyspaceChannel = value != null && Encoding.UTF8.GetString(value).ToLower().StartsWith("__key");
4142
}
4243

4344
private static bool DeterminePatternBased(byte[] value, PatternMode mode)
@@ -182,6 +183,9 @@ internal void AssertNotNull()
182183

183184
internal RedisChannel Clone() => (byte[])Value?.Clone();
184185

186+
internal readonly bool IsPatternBased;
187+
internal readonly bool IsKeyspaceChannel;
188+
185189
/// <summary>
186190
/// The matching pattern for this channel
187191
/// </summary>

StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Net;
45
using System.Threading;
56
using System.Threading.Tasks;
@@ -144,7 +145,7 @@ internal long ValidateSubscriptions()
144145
private sealed class Subscription
145146
{
146147
private Action<RedisChannel, RedisValue> handler;
147-
private ServerEndPoint owner;
148+
private List<ServerEndPoint> owners = new List<ServerEndPoint>();
148149

149150
public Subscription(Action<RedisChannel, RedisValue> value) => handler = value;
150151

@@ -170,33 +171,80 @@ public bool Remove(Action<RedisChannel, RedisValue> value)
170171
}
171172

172173
public Task SubscribeToServer(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
174+
{
175+
// subscribe to all masters in cluster for keyspace/keyevent notifications
176+
if (channel.IsKeyspaceChannel) {
177+
return SubscribeToMasters(multiplexer, channel, flags, asyncState, internalCall);
178+
}
179+
return SubscribeToSingleServer(multiplexer, channel, flags, asyncState, internalCall);
180+
}
181+
182+
private Task SubscribeToSingleServer(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
173183
{
174184
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
175185
var selected = multiplexer.SelectServer(-1, cmd, flags, default(RedisKey));
176186

177-
if (selected == null || Interlocked.CompareExchange(ref owner, selected, null) != null) return null;
187+
lock (owners)
188+
{
189+
if (selected == null || owners.Contains(selected)) return null;
190+
owners.Add(selected);
191+
}
178192

179193
var msg = Message.Create(-1, flags, cmd, channel);
180-
181194
return selected.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
182195
}
183196

197+
private Task SubscribeToMasters(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
198+
{
199+
List<Task> subscribeTasks = new List<Task>();
200+
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
201+
var masters = multiplexer.GetServerSnapshot().Where(s => !s.IsSlave && s.EndPoint.Equals(s.ClusterConfiguration.Origin));
202+
203+
lock (owners)
204+
{
205+
foreach (var master in masters)
206+
{
207+
if (owners.Contains(master)) continue;
208+
owners.Add(master);
209+
var msg = Message.Create(-1, flags, cmd, channel);
210+
if (internalCall) msg.FlagsRaw = msg.FlagsRaw | (CommandFlags)128;
211+
subscribeTasks.Add(master.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState));
212+
}
213+
}
214+
215+
return Task.WhenAll(subscribeTasks);
216+
}
217+
184218
public Task UnsubscribeFromServer(RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
185219
{
186-
var oldOwner = Interlocked.Exchange(ref owner, null);
187-
if (oldOwner == null) return null;
220+
if (owners.Count == 0) return null;
188221

222+
List<Task> queuedTasks = new List<Task>();
189223
var cmd = channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE;
190224
var msg = Message.Create(-1, flags, cmd, channel);
191225
if (internalCall) msg.SetInternalCall();
192-
return oldOwner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
226+
foreach (var owner in owners)
227+
queuedTasks.Add(owner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState));
228+
owners.Clear();
229+
return Task.WhenAll(queuedTasks.ToArray());
193230
}
194231

195-
internal ServerEndPoint GetOwner() => Interlocked.CompareExchange(ref owner, null, null);
232+
internal ServerEndPoint GetOwner()
233+
{
234+
var owner = owners?[0]; // we subscribe to arbitrary server, so why not return one
235+
return Interlocked.CompareExchange(ref owner, null, null);
236+
}
196237

197238
internal void Resubscribe(RedisChannel channel, ServerEndPoint server)
198239
{
199-
if (server != null && Interlocked.CompareExchange(ref owner, server, server) == server)
240+
bool hasOwner;
241+
242+
lock (owners)
243+
{
244+
hasOwner = owners.Contains(server);
245+
}
246+
247+
if (server != null && hasOwner)
200248
{
201249
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
202250
var msg = Message.Create(-1, CommandFlags.FireAndForget, cmd, channel);
@@ -208,16 +256,15 @@ internal void Resubscribe(RedisChannel channel, ServerEndPoint server)
208256
internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel)
209257
{
210258
bool changed = false;
211-
var oldOwner = Interlocked.CompareExchange(ref owner, null, null);
212-
if (oldOwner != null && !oldOwner.IsSelectable(RedisCommand.PSUBSCRIBE))
259+
if (owners.Count != 0 && !owners.All(o => o.IsSelectable(RedisCommand.PSUBSCRIBE)))
213260
{
214261
if (UnsubscribeFromServer(channel, CommandFlags.FireAndForget, null, true) != null)
215262
{
216263
changed = true;
217264
}
218-
oldOwner = null;
265+
owners.Clear();
219266
}
220-
if (oldOwner == null && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null)
267+
if (owners.Count == 0 && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null)
221268
{
222269
changed = true;
223270
}

0 commit comments

Comments
 (0)