Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 2fe09ef

Browse files
committed
Add support for periodic heartbeats and auto reconnects
1 parent 14fdbd5 commit 2fe09ef

File tree

3 files changed

+229
-22
lines changed

3 files changed

+229
-22
lines changed

src/ServiceStack.Redis/RedisPubSubServer.cs

Lines changed: 151 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,19 @@ public class RedisPubSubServer : IRedisPubSubServer
1313
private DateTime serverTimeAtStart;
1414
private Stopwatch startedAt;
1515

16+
public TimeSpan? HeartbeatInterval = TimeSpan.FromSeconds(10);
17+
public TimeSpan HeartbeatTimeout = TimeSpan.FromSeconds(30);
18+
private long lastHeartbeatTicks;
19+
private Timer heartbeatTimer;
20+
1621
public Action OnInit { get; set; }
1722
public Action OnStart { get; set; }
23+
public Action OnHeartbeatSent { get; set; }
24+
public Action OnHeartbeatReceived { get; set; }
1825
public Action OnStop { get; set; }
1926
public Action OnDispose { get; set; }
2027
public Action<string, string> OnMessage { get; set; }
28+
public Action<string> OnControlCommand { get; set; }
2129
public Action<string> OnUnSubscribe { get; set; }
2230
public Action<Exception> OnError { get; set; }
2331
public Action<IRedisPubSubServer> OnFailover { get; set; }
@@ -34,6 +42,10 @@ public class RedisPubSubServer : IRedisPubSubServer
3442
private int status;
3543
private Thread bgThread; //Subscription controller thread
3644
private long bgThreadCount = 0;
45+
private int autoRestart = YES;
46+
47+
private const int NO = 0;
48+
private const int YES = 1;
3749

3850
public DateTime CurrentServerTime
3951
{
@@ -62,6 +74,8 @@ public RedisPubSubServer(IRedisClientsManager clientsManager, params string[] ch
6274

6375
public IRedisPubSubServer Start()
6476
{
77+
Interlocked.CompareExchange(ref autoRestart, 0, autoRestart);
78+
6579
if (Interlocked.CompareExchange(ref status, 0, 0) == Status.Started)
6680
{
6781
//Start any stopped worker threads
@@ -122,10 +136,63 @@ private void Init()
122136
startedAt = Stopwatch.StartNew();
123137
}
124138

139+
DisposeHeartbeatTimer();
140+
141+
if (HeartbeatInterval != null)
142+
{
143+
heartbeatTimer = new Timer(SendHeartbeat, null,
144+
TimeSpan.FromMilliseconds(0), HeartbeatInterval.Value);
145+
}
146+
147+
Interlocked.CompareExchange(ref lastHeartbeatTicks, DateTime.UtcNow.Ticks, lastHeartbeatTicks);
148+
125149
if (OnInit != null)
126150
OnInit();
127151
}
128152

153+
void SendHeartbeat(object state)
154+
{
155+
if (OnHeartbeatSent != null)
156+
OnHeartbeatSent();
157+
158+
if (Interlocked.CompareExchange(ref status, 0, 0) != Status.Started)
159+
return;
160+
161+
NotifyAllSubscribers(ControlCommand.Pulse);
162+
163+
if (DateTime.UtcNow - new DateTime(lastHeartbeatTicks) > HeartbeatTimeout)
164+
{
165+
if (Interlocked.CompareExchange(ref status, 0, 0) == Status.Started)
166+
{
167+
Restart();
168+
}
169+
}
170+
}
171+
172+
void Pulse()
173+
{
174+
Interlocked.CompareExchange(ref lastHeartbeatTicks, DateTime.UtcNow.Ticks, lastHeartbeatTicks);
175+
176+
if (OnHeartbeatReceived != null)
177+
OnHeartbeatReceived();
178+
}
179+
180+
private void DisposeHeartbeatTimer()
181+
{
182+
if (heartbeatTimer == null)
183+
return;
184+
185+
try
186+
{
187+
heartbeatTimer.Dispose();
188+
}
189+
catch (Exception ex)
190+
{
191+
if (this.OnError != null) this.OnError(ex);
192+
}
193+
heartbeatTimer = null;
194+
}
195+
129196
private IRedisClient masterClient;
130197
private void RunLoop()
131198
{
@@ -150,9 +217,21 @@ private void RunLoop()
150217

151218
subscription.OnMessage = (channel, msg) =>
152219
{
153-
if (msg == Operation.ControlCommand)
220+
if (string.IsNullOrEmpty(msg))
221+
return;
222+
223+
var ctrlMsg = msg.SplitOnFirst(':');
224+
if (ctrlMsg[0] == ControlCommand.Control)
154225
{
155226
var op = Interlocked.CompareExchange(ref doOperation, Operation.NoOp, doOperation);
227+
228+
var msgType = ctrlMsg.Length > 1
229+
? ctrlMsg[1]
230+
: null;
231+
232+
if (OnControlCommand != null)
233+
OnControlCommand(msgType ?? Operation.GetName(op));
234+
156235
switch (op)
157236
{
158237
case Operation.Stop:
@@ -169,9 +248,15 @@ private void RunLoop()
169248
subscription.UnSubscribeFromAllChannels(); //Un block thread.
170249
return;
171250
}
172-
}
173251

174-
if (!string.IsNullOrEmpty(msg))
252+
switch (msgType)
253+
{
254+
case ControlCommand.Pulse:
255+
Pulse();
256+
break;
257+
}
258+
}
259+
else
175260
{
176261
OnMessage(channel, msg);
177262
}
@@ -200,19 +285,22 @@ private void RunLoop()
200285

201286
if (this.OnError != null)
202287
this.OnError(ex);
288+
}
203289

290+
if (Interlocked.CompareExchange(ref autoRestart, 0, 0) == YES
291+
&& Interlocked.CompareExchange(ref status, 0, 0) != Status.Disposed)
292+
{
204293
if (KeepAliveRetryAfterMs != null)
205-
{
206294
Thread.Sleep(KeepAliveRetryAfterMs.Value);
207295

208-
if (Interlocked.CompareExchange(ref status, 0, 0) != Status.Disposed)
209-
Start();
210-
}
296+
Start();
211297
}
212298
}
213299

214300
public void Stop()
215301
{
302+
Interlocked.CompareExchange(ref autoRestart, NO, autoRestart);
303+
216304
if (Interlocked.CompareExchange(ref status, 0, 0) == Status.Disposed)
217305
throw new ObjectDisposedException("RedisPubSubServer has been disposed");
218306

@@ -221,20 +309,36 @@ public void Stop()
221309
Log.Debug("Stopping RedisPubSubServer...");
222310

223311
//Unblock current bgthread by issuing StopCommand
224-
try
312+
SendControlCommand(Operation.Stop);
313+
}
314+
}
315+
316+
private void SendControlCommand(int operation)
317+
{
318+
Interlocked.CompareExchange(ref doOperation, operation, doOperation);
319+
NotifyAllSubscribers();
320+
}
321+
322+
private void NotifyAllSubscribers(string commandType=null)
323+
{
324+
var msg = ControlCommand.Control;
325+
if (commandType != null)
326+
msg += ":" + commandType;
327+
328+
try
329+
{
330+
using (var redis = ClientsManager.GetClient())
225331
{
226-
using (var redis = ClientsManager.GetClient())
332+
foreach (var channel in Channels)
227333
{
228-
Interlocked.CompareExchange(ref doOperation, Operation.Stop, doOperation);
229-
Channels.Each(x =>
230-
redis.PublishMessage(x, Operation.ControlCommand));
334+
redis.PublishMessage(channel, msg);
231335
}
232336
}
233-
catch (Exception ex)
234-
{
235-
if (this.OnError != null) this.OnError(ex);
236-
Log.Warn("Could not send STOP message to bg thread: " + ex.Message);
237-
}
337+
}
338+
catch (Exception ex)
339+
{
340+
if (this.OnError != null) this.OnError(ex);
341+
Log.Warn("Could not send '{0}' message to bg thread: {1}".Fmt(msg, ex.Message));
238342
}
239343
}
240344

@@ -251,8 +355,10 @@ private void HandleFailover(IRedisClientsManager clientsManager)
251355
using (var currentlySubscribedClient = ((RedisClient)masterClient).CloneClient())
252356
{
253357
Interlocked.CompareExchange(ref doOperation, Operation.Reset, doOperation);
254-
Channels.Each(x =>
255-
currentlySubscribedClient.PublishMessage(x, Operation.ControlCommand));
358+
foreach (var channel in Channels)
359+
{
360+
currentlySubscribedClient.PublishMessage(channel, ControlCommand.Control);
361+
}
256362
}
257363
}
258364
else
@@ -279,7 +385,7 @@ void HandleUnSubscribe(string channel)
279385
public void Restart()
280386
{
281387
Stop();
282-
Start();
388+
Interlocked.CompareExchange(ref autoRestart, YES, autoRestart);
283389
}
284390

285391
private void KillBgThreadIfExists()
@@ -319,12 +425,33 @@ private void SleepBackOffMultiplier(int continuousErrorsCount)
319425

320426
public static class Operation //dep-free copy of WorkerOperation
321427
{
322-
public const string ControlCommand = "CTRL";
323-
324428
public const int NoOp = 0;
325429
public const int Stop = 1;
326430
public const int Reset = 2;
327431
public const int Restart = 3;
432+
433+
public static string GetName(int op)
434+
{
435+
switch (op)
436+
{
437+
case NoOp:
438+
return "NoOp";
439+
case Stop:
440+
return "Stop";
441+
case Reset:
442+
return "Reset";
443+
case Restart:
444+
return "Restart";
445+
default:
446+
return null;
447+
}
448+
}
449+
}
450+
451+
public static class ControlCommand
452+
{
453+
public const string Control = "CTRL";
454+
public const string Pulse = "PULSE";
328455
}
329456

330457
class Status //dep-free copy of WorkerStatus
@@ -396,6 +523,8 @@ public virtual void Dispose()
396523
{
397524
if (this.OnError != null) this.OnError(ex);
398525
}
526+
527+
DisposeHeartbeatTimer();
399528
}
400529
}
401530
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
using System;
2+
using System.Threading;
3+
using NUnit.Framework;
4+
using ServiceStack.Text;
5+
6+
namespace ServiceStack.Redis.Tests
7+
{
8+
[Ignore, Explicit("Ignore long running tests")]
9+
[TestFixture]
10+
public class RedisPubSubServerTests
11+
{
12+
private static RedisPubSubServer CreatePubSubServer(
13+
int intervalSecs = 1, int timeoutSecs = 3)
14+
{
15+
var clientsManager = new RedisManagerPool(TestConfig.MasterHosts);
16+
using (var redis = clientsManager.GetClient())
17+
redis.FlushAll();
18+
19+
var pubSub = new RedisPubSubServer(
20+
clientsManager,
21+
"topic:test")
22+
{
23+
HeartbeatInterval = TimeSpan.FromSeconds(intervalSecs),
24+
HeartbeatTimeout = TimeSpan.FromSeconds(timeoutSecs)
25+
};
26+
27+
return pubSub;
28+
}
29+
30+
[Test]
31+
public void Does_send_heartbeat_pulses()
32+
{
33+
int pulseCount = 0;
34+
using (var pubSub = CreatePubSubServer(intervalSecs: 1, timeoutSecs: 3))
35+
{
36+
pubSub.OnHeartbeatReceived = () => "pulse #{0}".Print(++pulseCount);
37+
pubSub.Start();
38+
39+
Thread.Sleep(3100);
40+
41+
Assert.That(pulseCount, Is.GreaterThan(2));
42+
}
43+
}
44+
45+
[Test]
46+
public void Does_restart_when_Heartbeat_Timeout_exceeded()
47+
{
48+
//This auto restarts 2 times before letting connection to stay alive
49+
50+
int pulseCount = 0;
51+
int startCount = 0;
52+
int stopCount = 0;
53+
54+
using (var pubSub = CreatePubSubServer(intervalSecs: 1, timeoutSecs: 3))
55+
{
56+
pubSub.OnStart = () => "start #{0}".Print(++startCount);
57+
pubSub.OnStop = () => "stop #{0}".Print(++stopCount);
58+
pubSub.OnHeartbeatReceived = () => "pulse #{0}".Print(++pulseCount);
59+
60+
//pause longer than heartbeat timeout so autoreconnects
61+
pubSub.OnControlCommand = op =>
62+
{
63+
if (op == "PULSE" && stopCount < 2)
64+
Thread.Sleep(4000);
65+
};
66+
67+
pubSub.Start();
68+
69+
Thread.Sleep(30 * 1000);
70+
71+
Assert.That(pulseCount, Is.GreaterThan(3));
72+
Assert.That(startCount, Is.EqualTo(3));
73+
Assert.That(stopCount, Is.EqualTo(2));
74+
}
75+
}
76+
}
77+
}

tests/ServiceStack.Redis.Tests/ServiceStack.Redis.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@
240240
<Compile Include="RedisClientTests.cs" />
241241
<Compile Include="Properties\AssemblyInfo.cs" />
242242
<Compile Include="RedisClientTestsBase.cs" />
243+
<Compile Include="RedisPubSubServerTests.cs" />
243244
<Compile Include="RedisPubSubTests.cs" />
244245
<Compile Include="RedisScanTests.cs" />
245246
<Compile Include="RedisSentinelTests.cs" />

0 commit comments

Comments
 (0)