Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 1e1b805

Browse files
committed
Add callback to capture events in RedisPubServer
1 parent c9f27d1 commit 1e1b805

File tree

1 file changed

+47
-20
lines changed

1 file changed

+47
-20
lines changed

src/ServiceStack.Redis/RedisPubSubServer.cs

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
using System;
22
using System.Diagnostics;
3-
using System.Text;
43
using System.Threading;
5-
using System.Threading.Tasks;
64
using ServiceStack.Logging;
75
using ServiceStack.Text;
86

@@ -34,6 +32,7 @@ public class RedisPubSubServer : IRedisPubSubServer
3432

3533
public Action<string> OnControlCommand { get; set; }
3634
public Action<string> OnUnSubscribe { get; set; }
35+
public Action<string> OnEvent { get; set; }
3736
public Action<Exception> OnError { get; set; }
3837
public Action<IRedisPubSubServer> OnFailover { get; set; }
3938
public bool IsSentinelSubscription { get; set; }
@@ -97,6 +96,8 @@ public IRedisPubSubServer Start()
9796
//Only 1 thread allowed past
9897
if (Interlocked.CompareExchange(ref status, Status.Starting, Status.Stopped) == Status.Stopped) //Should only be 1 thread past this point
9998
{
99+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} Stopped] Start()> Stopped -> Starting");
100+
100101
var initErrors = 0;
101102
bool hasInit = false;
102103
while (!hasInit)
@@ -108,6 +109,7 @@ public IRedisPubSubServer Start()
108109
}
109110
catch (Exception ex)
110111
{
112+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {GetStatus()}] Start().Init()> Exception: {ex.Message}");
111113
OnError?.Invoke(ex);
112114
SleepBackOffMultiplier(initErrors++);
113115
}
@@ -188,6 +190,8 @@ void SendHeartbeat(object state)
188190
if (DateTime.UtcNow - new DateTime(lastHeartbeatTicks) > HeartbeatTimeout)
189191
{
190192
currentStatus = Interlocked.CompareExchange(ref status, 0, 0);
193+
194+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {Status.GetStatus(currentStatus)}] SendHeartbeat()> Exceeded HeartbeatTimeout");
191195
if (currentStatus == Status.Started)
192196
{
193197
Restart();
@@ -210,7 +214,7 @@ private void DisposeHeartbeatTimer()
210214
try
211215
{
212216
if (Log.IsDebugEnabled)
213-
Log.DebugFormat("RedisPubServer.DisposeHeartbeatTimer()");
217+
Log.Debug("RedisPubServer.DisposeHeartbeatTimer()");
214218

215219
heartbeatTimer.Dispose();
216220
}
@@ -227,6 +231,8 @@ private void RunLoop()
227231
if (Interlocked.CompareExchange(ref status, Status.Started, Status.Starting) != Status.Starting) return;
228232
Interlocked.Increment(ref timesStarted);
229233

234+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} Started] RunLoop().Stop> Starting -> Started, timesStarted: {timesStarted}");
235+
230236
try
231237
{
232238
//RESET
@@ -280,22 +286,31 @@ bool IsCtrlMessage(byte[] msg)
280286
if (Log.IsDebugEnabled)
281287
Log.Debug("Stop Command Issued");
282288

289+
var holdStatus = GetStatus();
290+
283291
Interlocked.CompareExchange(ref status, Status.Stopping, Status.Started);
292+
293+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {holdStatus}] RunLoop().Stop> Started -> Stopping");
284294
try
285295
{
286296
if (Log.IsDebugEnabled)
287297
Log.Debug("UnSubscribe From All Channels...");
288298

299+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {GetStatus()}] RunLoop().Stop> subscription.UnSubscribeFromAllChannels()");
300+
289301
// ReSharper disable once AccessToDisposedClosure
290302
subscription.UnSubscribeFromAllChannels(); //Un block thread.
291303
}
292304
finally
293305
{
306+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {GetStatus()}] RunLoop().Stop> Stopping -> Stopped");
294307
Interlocked.CompareExchange(ref status, Status.Stopped, Status.Stopping);
295308
}
296309
return;
297310

298311
case Operation.Reset:
312+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {GetStatus()}] RunLoop().Reset> subscription.UnSubscribeFromAllChannels()");
313+
299314
// ReSharper disable once AccessToDisposedClosure
300315
subscription.UnSubscribeFromAllChannels(); //Un block thread.
301316
return;
@@ -330,10 +345,14 @@ bool IsCtrlMessage(byte[] msg)
330345
lastExMsg = ex.Message;
331346
Interlocked.Increment(ref noOfErrors);
332347
Interlocked.Increment(ref noOfContinuousErrors);
348+
349+
var holdStatus = GetStatus();
333350

334351
if (Interlocked.CompareExchange(ref status, Status.Stopped, Status.Started) != Status.Started)
335352
Interlocked.CompareExchange(ref status, Status.Stopped, Status.Stopping);
336353

354+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {holdStatus}] RunLoop().Stop> Started|Stopping -> Stopped");
355+
337356
OnStop?.Invoke();
338357

339358
OnError?.Invoke(ex);
@@ -343,6 +362,8 @@ bool IsCtrlMessage(byte[] msg)
343362
{
344363
if (WaitBeforeNextRestart != null)
345364
TaskUtils.Sleep(WaitBeforeNextRestart.Value);
365+
366+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {GetStatus()}] RunLoop().AutoRestart> Start()");
346367
Start();
347368
}
348369
}
@@ -361,6 +382,8 @@ private void Stop(bool shouldRestart)
361382

362383
if (Interlocked.CompareExchange(ref status, Status.Stopping, Status.Started) == Status.Started)
363384
{
385+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {GetStatus()}] Stop()> Started -> Stopping");
386+
364387
if (Log.IsDebugEnabled)
365388
Log.Debug("Stopping RedisPubSubServer...");
366389

@@ -392,7 +415,7 @@ private void NotifyAllSubscribers(string commandType=null)
392415
catch (Exception ex)
393416
{
394417
OnError?.Invoke(ex);
395-
Log.Warn("Could not send '{0}' message to bg thread: {1}".Fmt(msg, ex.Message));
418+
Log.WarnFormat("Could not send '{0}' message to bg thread: {1}", msg, ex.Message);
396419
}
397420
}
398421

@@ -447,10 +470,12 @@ private void KillBgThreadIfExists()
447470
{
448471
#if !NETSTANDARD2_0
449472
//Ideally we shouldn't get here, but lets try our hardest to clean it up
473+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {GetStatus()}] KillBgThreadIfExists()> bgThread.Interrupt()");
450474
Log.Warn("Interrupting previous Background Thread: " + bgThread.Name);
451475
bgThread.Interrupt();
452476
if (!bgThread.Join(TimeSpan.FromSeconds(3)))
453477
{
478+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {GetStatus()}] KillBgThreadIfExists()> bgThread.Abort()");
454479
Log.Warn(bgThread.Name + " just wont die, so we're now aborting it...");
455480
bgThread.Abort();
456481
}
@@ -471,7 +496,7 @@ private void SleepBackOffMultiplier(int continuousErrorsCount)
471496
maxSleepMs);
472497

473498
if (Log.IsDebugEnabled)
474-
Log.Debug("Sleeping for {0}ms after {1} continuous errors".Fmt(nextTry, continuousErrorsCount));
499+
Log.DebugFormat("Sleeping for {0}ms after {1} continuous errors", nextTry, continuousErrorsCount);
475500

476501
TaskUtils.Sleep(nextTry);
477502
}
@@ -514,26 +539,22 @@ class Status //dep-free copy of WorkerStatus
514539
public const int Stopping = 1;
515540
public const int Starting = 2;
516541
public const int Started = 3;
517-
}
518542

519-
public string GetStatus()
520-
{
521-
switch (Interlocked.CompareExchange(ref status, 0, 0))
543+
public static string GetStatus(int status)
522544
{
523-
case Status.Disposed:
524-
return "Disposed";
525-
case Status.Stopped:
526-
return "Stopped";
527-
case Status.Stopping:
528-
return "Stopping";
529-
case Status.Starting:
530-
return "Starting";
531-
case Status.Started:
532-
return "Started";
545+
return status switch {
546+
Disposed => nameof(Disposed),
547+
Stopped => nameof(Stopped),
548+
Stopping => nameof(Stopping),
549+
Starting => nameof(Starting),
550+
Started => nameof(Started),
551+
_ => throw new NotSupportedException("Unknown status: " + status)
552+
};
533553
}
534-
return null;
535554
}
536555

556+
public string GetStatus() => Status.GetStatus(Interlocked.CompareExchange(ref status, 0, 0));
557+
537558
public string GetStatsDescription()
538559
{
539560
var sb = StringBuilderCache.Allocate();
@@ -555,11 +576,17 @@ public virtual void Dispose()
555576
if (Log.IsDebugEnabled)
556577
Log.Debug("RedisPubServer.Dispose()...");
557578

579+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {GetStatus()}] Dispose()>");
580+
558581
Stop();
559582

583+
var holdStatus = GetStatus();
584+
560585
if (Interlocked.CompareExchange(ref status, Status.Disposed, Status.Stopped) != Status.Stopped)
561586
Interlocked.CompareExchange(ref status, Status.Disposed, Status.Stopping);
562587

588+
OnEvent?.Invoke($"[{DateTime.UtcNow.TimeOfDay:g} {holdStatus}] Dispose()> -> Disposed");
589+
563590
try
564591
{
565592
OnDispose?.Invoke();

0 commit comments

Comments
 (0)