Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 8bf02c7

Browse files
committed
Merge pull request #199 from svantreeck/master
Added an implementation of Sentinel Support to Redis
2 parents 6aeb09c + aca062b commit 8bf02c7

File tree

4 files changed

+394
-0
lines changed

4 files changed

+394
-0
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using System;
2+
namespace ServiceStack.Redis
3+
{
4+
public interface IRedisSentinel : IDisposable
5+
{
6+
IRedisClientsManager Setup();
7+
}
8+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
//
2+
// Redis Sentinel will connect to a Redis Sentinel Instance and create an IRedisClientsManager based off of the first sentinel that returns data
3+
//
4+
// Upon failure of a sentinel, other sentinels will be attempted to be connected to
5+
// Upon a s_down event, the RedisClientsManager will be failed over to the new set of slaves/masters
6+
//
7+
8+
using ServiceStack;
9+
using ServiceStack.Redis;
10+
using System;
11+
using System.Collections.Generic;
12+
using System.Diagnostics;
13+
using System.Linq;
14+
using System.Threading.Tasks;
15+
using System.Web;
16+
17+
namespace ServiceStack.Redis
18+
{
19+
public class RedisSentinel : IRedisSentinel
20+
{
21+
private readonly string sentinelName;
22+
private int failures = 0;
23+
private int sentinelIndex = -1;
24+
private List<string> sentinels;
25+
private RedisSentinelWorker worker;
26+
private PooledRedisClientManager clientManager;
27+
private static int MaxFailures = 5;
28+
29+
public RedisSentinel(IEnumerable<string> sentinelHosts, string sentinelName)
30+
{
31+
if (sentinelHosts == null || sentinelHosts.Count() == 0) throw new ArgumentException("sentinels must have at least one entry");
32+
33+
this.sentinelName = sentinelName;
34+
this.sentinels = new List<string>(sentinelHosts);
35+
}
36+
37+
/// <summary>
38+
/// Initialize channel and register client manager
39+
/// </summary>
40+
/// <param name="container"></param>
41+
public IRedisClientsManager Setup()
42+
{
43+
GetValidSentinel();
44+
45+
if (this.clientManager == null)
46+
{
47+
throw new ApplicationException("Unable to resolve sentinels!");
48+
}
49+
50+
return this.clientManager;
51+
}
52+
53+
private void GetValidSentinel()
54+
{
55+
while (this.clientManager == null && failures < RedisSentinel.MaxFailures)
56+
{
57+
try
58+
{
59+
worker = GetNextSentinel();
60+
clientManager = worker.GetClientManager();
61+
worker.BeginListeningForConfigurationChanges();
62+
}
63+
catch (RedisException)
64+
{
65+
if (worker != null)
66+
{
67+
worker.SentinelError -= Worker_SentinelError;
68+
worker.Dispose();
69+
}
70+
71+
failures++;
72+
}
73+
}
74+
}
75+
76+
private RedisSentinelWorker GetNextSentinel()
77+
{
78+
sentinelIndex++;
79+
80+
if (sentinelIndex >= sentinels.Count)
81+
{
82+
sentinelIndex = 0;
83+
}
84+
85+
var sentinelWorker = new RedisSentinelWorker(sentinels[sentinelIndex], this.sentinelName, this.clientManager);
86+
87+
sentinelWorker.SentinelError += Worker_SentinelError;
88+
return sentinelWorker;
89+
}
90+
91+
private void Worker_SentinelError(object sender, EventArgs e)
92+
{
93+
var worker = sender as RedisSentinelWorker;
94+
95+
if (worker != null)
96+
{
97+
worker.SentinelError -= Worker_SentinelError;
98+
worker.Dispose();
99+
100+
this.worker = GetNextSentinel();
101+
this.worker.BeginListeningForConfigurationChanges();
102+
}
103+
}
104+
105+
public void Dispose()
106+
{
107+
if (worker != null)
108+
{
109+
worker.SentinelError -= Worker_SentinelError;
110+
worker.Dispose();
111+
worker = null;
112+
}
113+
}
114+
}
115+
}
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
using ServiceStack;
2+
using ServiceStack.Logging;
3+
using ServiceStack.Redis;
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Diagnostics;
7+
using System.IO;
8+
using System.Linq;
9+
using System.Text;
10+
using System.Threading.Tasks;
11+
using System.Web;
12+
13+
namespace ServiceStack.Redis
14+
{
15+
internal class RedisSentinelWorker : IDisposable
16+
{
17+
protected static readonly ILog Log = LogManager.GetLogger(typeof(RedisSentinelWorker));
18+
19+
private RedisClient sentinelClient;
20+
private RedisClient sentinelPubSubClient;
21+
private PooledRedisClientManager clientsManager;
22+
private IRedisSubscription sentinelSubscription;
23+
private string sentinelName;
24+
private string host;
25+
26+
public event EventHandler SentinelError;
27+
28+
public RedisSentinelWorker(string host, string sentinelName, PooledRedisClientManager clientsManager = null)
29+
{
30+
this.sentinelName = sentinelName;
31+
this.sentinelClient = new RedisClient(host);
32+
this.sentinelPubSubClient = new RedisClient(host);
33+
this.sentinelSubscription = this.sentinelPubSubClient.CreateSubscription();
34+
this.sentinelSubscription.OnMessage = SentinelMessageReceived;
35+
this.clientsManager = clientsManager;
36+
37+
Log.Info("Set up Redis Sentinel on {0}".Fmt(host));
38+
}
39+
40+
private void SubscribeForChanges(object arg)
41+
{
42+
try
43+
{
44+
// subscribe to all messages
45+
this.sentinelSubscription.SubscribeToChannelsMatching("*");
46+
}
47+
catch (Exception)
48+
{
49+
Log.Error("Problem Subscribing to Redis Channel on {0}:{1}".Fmt(this.sentinelClient.Host, this.sentinelClient.Port));
50+
// problem communicating to sentinel
51+
if (SentinelError != null)
52+
{
53+
SentinelError(this, EventArgs.Empty);
54+
}
55+
}
56+
}
57+
58+
/// <summary>
59+
/// Event that is fired when the sentinel subscription raises an event
60+
/// </summary>
61+
/// <param name="channel"></param>
62+
/// <param name="message"></param>
63+
private void SentinelMessageReceived(string channel, string message)
64+
{
65+
// {+|-}sdown is the event for server coming up or down
66+
if (channel.ToLower().Contains("sdown"))
67+
{
68+
Log.Info("Sentinel detected server down/up with message:{0}".Fmt(message));
69+
70+
ConfigureRedisFromSentinel();
71+
}
72+
}
73+
74+
/// <summary>
75+
/// Does a sentinel check for masters and slaves and either sets up or fails over to the new config
76+
/// </summary>
77+
private void ConfigureRedisFromSentinel()
78+
{
79+
Log.Info("Configuring Redis Clients");
80+
81+
var masters = ConvertMasterArrayToList(this.sentinelClient.Sentinel("master", this.sentinelName));
82+
var slaves = ConvertSlaveArrayToList(this.sentinelClient.Sentinel("slaves", this.sentinelName));
83+
84+
if (this.clientsManager == null)
85+
{
86+
if (slaves.Count() > 0)
87+
{
88+
this.clientsManager = new PooledRedisClientManager(masters, slaves);
89+
}
90+
else
91+
{
92+
this.clientsManager = new PooledRedisClientManager(masters.ToArray());
93+
}
94+
}
95+
else
96+
{
97+
if (slaves.Count() > 0)
98+
{
99+
this.clientsManager.FailoverTo(masters, slaves);
100+
}
101+
else
102+
{
103+
this.clientsManager.FailoverTo(masters.ToArray());
104+
}
105+
}
106+
}
107+
108+
/// <summary>
109+
/// Takes output from sentinel slaves command and converts into a list of servers
110+
/// </summary>
111+
/// <param name="items"></param>
112+
/// <returns></returns>
113+
private IEnumerable<string> ConvertSlaveArrayToList(object[] slaves)
114+
{
115+
var servers = new List<string>();
116+
bool fetchIP = false;
117+
bool fetchPort = false;
118+
bool fetchFlags = false;
119+
string ip = null;
120+
string port = null;
121+
string value = null;
122+
string flags = null;
123+
124+
foreach (var slave in slaves.OfType<object[]>())
125+
{
126+
fetchIP = false;
127+
fetchPort = false;
128+
ip = null;
129+
port = null;
130+
131+
foreach (var item in slave)
132+
{
133+
if (item is byte[])
134+
{
135+
value = Encoding.UTF8.GetString((byte[])item);
136+
if (value == "ip")
137+
{
138+
fetchIP = true;
139+
continue;
140+
}
141+
else if (value == "port")
142+
{
143+
fetchPort = true;
144+
continue;
145+
}
146+
else if (value == "flags")
147+
{
148+
fetchFlags = true;
149+
continue;
150+
}
151+
else if (fetchIP)
152+
{
153+
ip = value;
154+
155+
if (ip == "127.0.0.1")
156+
{
157+
ip = this.sentinelClient.Host;
158+
}
159+
fetchIP = false;
160+
}
161+
else if (fetchPort)
162+
{
163+
port = value;
164+
fetchPort = false;
165+
}
166+
else if (fetchFlags)
167+
{
168+
flags = value;
169+
fetchFlags = false;
170+
171+
if (ip != null && port != null && !flags.Contains("s_down"))
172+
{
173+
servers.Add("{0}:{1}".Fmt(ip, port));
174+
}
175+
}
176+
177+
178+
}
179+
}
180+
}
181+
182+
return servers;
183+
}
184+
185+
/// <summary>
186+
/// Takes output from sentinel master command and converts into a list of servers
187+
/// </summary>
188+
/// <param name="items"></param>
189+
/// <returns></returns>
190+
private IEnumerable<string> ConvertMasterArrayToList(object[] items)
191+
{
192+
var servers = new List<string>();
193+
bool fetchIP = false;
194+
bool fetchPort = false;
195+
string ip = null;
196+
string port = null;
197+
string value = null;
198+
199+
foreach (var item in items)
200+
{
201+
if (item is byte[])
202+
{
203+
value = Encoding.UTF8.GetString((byte[])item);
204+
if (value == "ip")
205+
{
206+
fetchIP = true;
207+
continue;
208+
}
209+
else if (value == "port")
210+
{
211+
fetchPort = true;
212+
continue;
213+
}
214+
else if (fetchIP)
215+
{
216+
ip = value;
217+
if (ip == "127.0.0.1")
218+
{
219+
ip = this.sentinelClient.Host;
220+
}
221+
fetchIP = false;
222+
}
223+
else if (fetchPort)
224+
{
225+
port = value;
226+
fetchPort = false;
227+
}
228+
229+
if (ip != null && port != null)
230+
{
231+
servers.Add("{0}:{1}".Fmt(ip, port));
232+
break;
233+
}
234+
}
235+
}
236+
237+
return servers;
238+
}
239+
240+
public PooledRedisClientManager GetClientManager()
241+
{
242+
ConfigureRedisFromSentinel();
243+
244+
return this.clientsManager;
245+
}
246+
247+
public void Dispose()
248+
{
249+
this.sentinelClient.Dispose();
250+
this.sentinelPubSubClient.Dispose();
251+
252+
try
253+
{
254+
this.sentinelSubscription.Dispose();
255+
}
256+
catch (RedisException)
257+
{
258+
// if this is getting disposed after the sentinel shuts down, this will fail
259+
}
260+
}
261+
262+
public void BeginListeningForConfigurationChanges()
263+
{
264+
// subscribing blocks, so put it on a different thread
265+
Task.Factory.StartNew(SubscribeForChanges, TaskCreationOptions.LongRunning);
266+
}
267+
}
268+
}

0 commit comments

Comments
 (0)