Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit f3ea6a2

Browse files
committed
Added sentinel
1 parent 6aeb09c commit f3ea6a2

File tree

4 files changed

+383
-0
lines changed

4 files changed

+383
-0
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using System;
2+
namespace ServiceStack.Redis
3+
{
4+
public interface IRedisSentinel : IDisposable
5+
{
6+
IRedisClientsManager Setup();
7+
}
8+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
//
2+
// Redis Sentinel will connect to a Redis Sentinel Instance and create an IRedisClientsManager based off of the first sentinel that returns data
3+
//
4+
// Upon failure of a sentinel, other sentinels will be attempted to be connected to
5+
// Upon a s_down event, the RedisClientsManager will be failed over to the new set of slaves/masters
6+
//
7+
8+
using ServiceStack;
9+
using ServiceStack.Redis;
10+
using System;
11+
using System.Collections.Generic;
12+
using System.Diagnostics;
13+
using System.Linq;
14+
using System.Threading.Tasks;
15+
using System.Web;
16+
17+
namespace ServiceStack.Redis
18+
{
19+
public class RedisSentinel : IRedisSentinel
20+
{
21+
private int failures = 0;
22+
private int sentinelIndex = -1;
23+
private List<string> sentinels;
24+
private RedisSentinelWorker worker;
25+
private PooledRedisClientManager clientManager;
26+
private static int MaxFailures = 5;
27+
28+
public RedisSentinel(IEnumerable<string> sentinelHosts)
29+
{
30+
if (sentinelHosts == null || sentinelHosts.Count() == 0) throw new ArgumentException("sentinels must have at least one entry");
31+
32+
this.sentinels = new List<string>(sentinelHosts);
33+
}
34+
35+
/// <summary>
36+
/// Initialize channel and register client manager
37+
/// </summary>
38+
/// <param name="container"></param>
39+
public IRedisClientsManager Setup()
40+
{
41+
GetValidSentinel();
42+
43+
if (this.clientManager == null)
44+
{
45+
throw new ApplicationException("Unable to resolve sentinels!");
46+
}
47+
48+
return this.clientManager;
49+
}
50+
51+
private void GetValidSentinel()
52+
{
53+
while (this.clientManager == null && failures < RedisSentinel.MaxFailures)
54+
{
55+
try
56+
{
57+
worker = GetNextSentinel();
58+
clientManager = worker.GetClientManager();
59+
worker.BeginListeningForConfigurationChanges();
60+
}
61+
catch (RedisException)
62+
{
63+
if (worker != null)
64+
{
65+
worker.SentinelError -= Worker_SentinelError;
66+
worker.Dispose();
67+
}
68+
69+
failures++;
70+
}
71+
}
72+
}
73+
74+
private RedisSentinelWorker GetNextSentinel()
75+
{
76+
sentinelIndex++;
77+
78+
if (sentinelIndex >= sentinels.Count)
79+
{
80+
sentinelIndex = 0;
81+
}
82+
83+
var sentinelWorker = new RedisSentinelWorker(sentinels[sentinelIndex], "mymaster", this.clientManager);
84+
85+
sentinelWorker.SentinelError += Worker_SentinelError;
86+
return sentinelWorker;
87+
}
88+
89+
private void Worker_SentinelError(object sender, EventArgs e)
90+
{
91+
var worker = sender as RedisSentinelWorker;
92+
93+
if (worker != null)
94+
{
95+
worker.SentinelError -= Worker_SentinelError;
96+
worker.Dispose();
97+
98+
this.worker = GetNextSentinel();
99+
this.worker.BeginListeningForConfigurationChanges();
100+
}
101+
}
102+
103+
public void Dispose()
104+
{
105+
if (worker != null)
106+
{
107+
worker.SentinelError -= Worker_SentinelError;
108+
worker.Dispose();
109+
worker = null;
110+
}
111+
}
112+
}
113+
}
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
using ServiceStack;
2+
using ServiceStack.Redis;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Diagnostics;
6+
using System.IO;
7+
using System.Linq;
8+
using System.Threading.Tasks;
9+
using System.Web;
10+
11+
namespace ServiceStack.Redis
12+
{
13+
internal class RedisSentinelWorker : IDisposable
14+
{
15+
private RedisClient sentinelClient;
16+
private RedisClient sentinelPubSubClient;
17+
private PooledRedisClientManager clientsManager;
18+
private IRedisSubscription sentinelSubscription;
19+
private string sentinelName;
20+
private string host;
21+
22+
public event EventHandler SentinelError;
23+
24+
public RedisSentinelWorker(string host, string sentinelName, PooledRedisClientManager clientsManager = null)
25+
{
26+
this.sentinelName = sentinelName;
27+
this.sentinelClient = new RedisClient(host);
28+
this.sentinelPubSubClient = new RedisClient(host);
29+
this.sentinelSubscription = this.sentinelPubSubClient.CreateSubscription();
30+
this.sentinelSubscription.OnMessage = SentinelMessageReceived;
31+
this.clientsManager = clientsManager;
32+
}
33+
34+
private void SubscribeForChanges(object arg)
35+
{
36+
try
37+
{
38+
// subscribe to all messages
39+
this.sentinelSubscription.SubscribeToChannelsMatching("*");
40+
}
41+
catch (Exception)
42+
{
43+
// problem communicating to sentinel
44+
if (SentinelError != null)
45+
{
46+
SentinelError(this, EventArgs.Empty);
47+
}
48+
}
49+
}
50+
51+
/// <summary>
52+
/// Event that is fired when the sentinel subscription raises an event
53+
/// </summary>
54+
/// <param name="channel"></param>
55+
/// <param name="message"></param>
56+
private void SentinelMessageReceived(string channel, string message)
57+
{
58+
Debug.WriteLine("{0} - {1}".Fmt(channel, message));
59+
60+
// {+|-}sdown is the event for server coming up or down
61+
if (channel.ToLower().Contains("sdown"))
62+
{
63+
ConfigureRedisFromSentinel();
64+
}
65+
}
66+
67+
/// <summary>
68+
/// Does a sentinel check for masters and slaves and either sets up or fails over to the new config
69+
/// </summary>
70+
private void ConfigureRedisFromSentinel()
71+
{
72+
var masters = ConvertMasterArrayToList(this.sentinelClient.Sentinel("master", this.sentinelName));
73+
var slaves = ConvertSlaveArrayToList(this.sentinelClient.Sentinel("slaves", this.sentinelName));
74+
75+
if (this.clientsManager == null)
76+
{
77+
if (slaves.Count() > 0)
78+
{
79+
this.clientsManager = new PooledRedisClientManager(masters, slaves);
80+
}
81+
else
82+
{
83+
this.clientsManager = new PooledRedisClientManager(masters.ToArray());
84+
}
85+
}
86+
else
87+
{
88+
if (slaves.Count() > 0)
89+
{
90+
this.clientsManager.FailoverTo(masters, slaves);
91+
}
92+
else
93+
{
94+
this.clientsManager.FailoverTo(masters.ToArray());
95+
}
96+
}
97+
}
98+
99+
/// <summary>
100+
/// Takes output from sentinel slaves command and converts into a list of servers
101+
/// </summary>
102+
/// <param name="items"></param>
103+
/// <returns></returns>
104+
private IEnumerable<string> ConvertSlaveArrayToList(object[] slaves)
105+
{
106+
var servers = new List<string>();
107+
bool fetchIP = false;
108+
bool fetchPort = false;
109+
bool fetchFlags = false;
110+
string ip = null;
111+
string port = null;
112+
string value = null;
113+
string flags = null;
114+
115+
foreach (var slave in slaves.OfType<object[]>())
116+
{
117+
fetchIP = false;
118+
fetchPort = false;
119+
ip = null;
120+
port = null;
121+
122+
foreach (var item in slave)
123+
{
124+
if (item is byte[])
125+
{
126+
value = System.Text.Encoding.Default.GetString((byte[])item);
127+
if (value == "ip")
128+
{
129+
fetchIP = true;
130+
continue;
131+
}
132+
else if (value == "port")
133+
{
134+
fetchPort = true;
135+
continue;
136+
}
137+
else if (value == "flags")
138+
{
139+
fetchFlags = true;
140+
continue;
141+
}
142+
else if (fetchIP)
143+
{
144+
ip = value;
145+
146+
if (ip == "127.0.0.1")
147+
{
148+
ip = this.sentinelClient.Host;
149+
}
150+
fetchIP = false;
151+
}
152+
else if (fetchPort)
153+
{
154+
port = value;
155+
fetchPort = false;
156+
}
157+
else if (fetchFlags)
158+
{
159+
flags = value;
160+
fetchFlags = false;
161+
162+
if (ip != null && port != null && !flags.Contains("s_down"))
163+
{
164+
servers.Add("{0}:{1}".Fmt(ip, port));
165+
}
166+
}
167+
168+
169+
}
170+
}
171+
}
172+
173+
return servers;
174+
}
175+
176+
/// <summary>
177+
/// Takes output from sentinel master command and converts into a list of servers
178+
/// </summary>
179+
/// <param name="items"></param>
180+
/// <returns></returns>
181+
private IEnumerable<string> ConvertMasterArrayToList(object[] items)
182+
{
183+
var servers = new List<string>();
184+
bool fetchIP = false;
185+
bool fetchPort = false;
186+
string ip = null;
187+
string port = null;
188+
string value = null;
189+
190+
foreach (var item in items)
191+
{
192+
if (item is byte[])
193+
{
194+
value = System.Text.Encoding.Default.GetString((byte[])item);
195+
if (value == "ip")
196+
{
197+
fetchIP = true;
198+
continue;
199+
}
200+
else if (value == "port")
201+
{
202+
fetchPort = true;
203+
continue;
204+
}
205+
else if (fetchIP)
206+
{
207+
ip = value;
208+
if (ip == "127.0.0.1")
209+
{
210+
ip = this.sentinelClient.Host;
211+
}
212+
fetchIP = false;
213+
}
214+
else if (fetchPort)
215+
{
216+
port = value;
217+
fetchPort = false;
218+
}
219+
220+
if (ip != null && port != null)
221+
{
222+
servers.Add("{0}:{1}".Fmt(ip, port));
223+
break;
224+
}
225+
}
226+
}
227+
228+
return servers;
229+
}
230+
231+
public PooledRedisClientManager GetClientManager()
232+
{
233+
ConfigureRedisFromSentinel();
234+
235+
return this.clientsManager;
236+
}
237+
238+
public void Dispose()
239+
{
240+
this.sentinelClient.Dispose();
241+
this.sentinelPubSubClient.Dispose();
242+
243+
try
244+
{
245+
this.sentinelSubscription.Dispose();
246+
}
247+
catch (RedisException)
248+
{
249+
// if this is getting disposed after the sentinel shuts down, this will fail
250+
}
251+
}
252+
253+
public void BeginListeningForConfigurationChanges()
254+
{
255+
// subscribing blocks, so put it on a different thread
256+
Task.Factory.StartNew(SubscribeForChanges, TaskCreationOptions.LongRunning);
257+
}
258+
}
259+
}

src/ServiceStack.Redis/ServiceStack.Redis.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,15 @@
157157
<Compile Include="Generic\RedisTypedCommandQueue.cs" />
158158
<Compile Include="IRedisClientFactory.cs" />
159159
<Compile Include="IRedisFailover.cs" />
160+
<Compile Include="IRedisSentinel.cs" />
160161
<Compile Include="RedisClientManagerCacheClient.cs" />
161162
<Compile Include="RedisClientsManagerExtensions.cs" />
162163
<Compile Include="Pipeline\QueuedRedisCommand.cs" />
163164
<Compile Include="Pipeline\RedisCommand.cs" />
164165
<Compile Include="RedisClient_Slowlog.cs" />
165166
<Compile Include="RedisEndpoint.cs" />
167+
<Compile Include="RedisSentinel.cs" />
168+
<Compile Include="RedisSentinelWorker.cs" />
166169
<Compile Include="ScanResult.cs" />
167170
<Compile Include="ShardedConnectionPool.cs" />
168171
<Compile Include="ShardedRedisClientManager.cs" />

0 commit comments

Comments
 (0)