Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 245f1cc

Browse files
committed
Add OnMessageBytes to RedisSubscription + RedisPubSubServer
1 parent 56691b9 commit 245f1cc

File tree

2 files changed

+39
-9
lines changed

2 files changed

+39
-9
lines changed

src/ServiceStack.Redis/RedisPubSubServer.cs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class RedisPubSubServer : IRedisPubSubServer
3030
/// Callback fired on each message received, handle with (channel, msg) => ...
3131
/// </summary>
3232
public Action<string, string> OnMessage { get; set; }
33+
public Action<string, byte[]> OnMessageBytes { get; set; }
3334

3435
public Action<string> OnControlCommand { get; set; }
3536
public Action<string> OnUnSubscribe { get; set; }
@@ -55,8 +56,8 @@ public class RedisPubSubServer : IRedisPubSubServer
5556
private int autoRestart = YES;
5657
public bool AutoRestart
5758
{
58-
get { return Interlocked.CompareExchange(ref autoRestart, 0, 0) == YES; }
59-
set { Interlocked.CompareExchange(ref autoRestart, value ? YES : NO, autoRestart); }
59+
get => Interlocked.CompareExchange(ref autoRestart, 0, 0) == YES;
60+
set => Interlocked.CompareExchange(ref autoRestart, value ? YES : NO, autoRestart);
6061
}
6162

6263
public DateTime CurrentServerTime => new DateTime(serverTimeAtStart.Ticks + startedAt.ElapsedTicks, DateTimeKind.Utc);
@@ -231,18 +232,35 @@ private void RunLoop()
231232
{
232233
subscription.OnUnSubscribe = HandleUnSubscribe;
233234

235+
if (OnMessageBytes != null)
236+
{
237+
bool IsCtrlMessage(byte[] msg)
238+
{
239+
if (msg.Length < 4)
240+
return false;
241+
return msg[0] == 'C' && msg[1] == 'T' && msg[0] == 'R' && msg[0] == 'L';
242+
}
243+
244+
((RedisSubscription)subscription).OnMessageBytes = (channel, msg) => {
245+
if (IsCtrlMessage(msg))
246+
return;
247+
248+
OnMessageBytes(channel, msg);
249+
};
250+
}
251+
234252
subscription.OnMessage = (channel, msg) =>
235253
{
236254
if (string.IsNullOrEmpty(msg))
237255
return;
238256

239-
var ctrlMsg = msg.SplitOnFirst(':');
240-
if (ctrlMsg[0] == ControlCommand.Control)
257+
var ctrlMsg = msg.LeftPart(':');
258+
if (ctrlMsg == ControlCommand.Control)
241259
{
242260
var op = Interlocked.CompareExchange(ref doOperation, Operation.NoOp, doOperation);
243261

244-
var msgType = ctrlMsg.Length > 1
245-
? ctrlMsg[1]
262+
var msgType = msg.IndexOf(':') >= 0
263+
? msg.RightPart(':')
246264
: null;
247265

248266
OnControlCommand?.Invoke(msgType ?? Operation.GetName(op));

src/ServiceStack.Redis/RedisSubscription.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public RedisSubscription(IRedisNativeClient redisClient)
3030

3131
public Action<string> OnSubscribe { get; set; }
3232
public Action<string, string> OnMessage { get; set; }
33+
public Action<string, byte[]> OnMessageBytes { get; set; }
3334
public Action<string> OnUnSubscribe { get; set; }
3435

3536
public void SubscribeToChannels(params string[] channels)
@@ -91,17 +92,28 @@ private void ParseSubscriptionResults(byte[][] multiBytes)
9192
}
9293
else if (MessageWord.AreEqual(messageType))
9394
{
94-
var message = multiBytes[i + MsgIndex].FromUtf8Bytes();
95-
95+
var msgBytes = multiBytes[i + MsgIndex];
96+
if (this.OnMessageBytes != null)
97+
{
98+
this.OnMessageBytes(channel, msgBytes);
99+
}
100+
101+
var message = msgBytes.FromUtf8Bytes();
96102
if (this.OnMessage != null)
97103
{
98104
this.OnMessage(channel, message);
99105
}
100106
}
101107
else if (PMessageWord.AreEqual(messageType))
102108
{
103-
var message = multiBytes[i + MsgIndex + 1].FromUtf8Bytes();
104109
channel = multiBytes[i + 2].FromUtf8Bytes();
110+
var msgBytes = multiBytes[i + MsgIndex + 1];
111+
if (this.OnMessageBytes != null)
112+
{
113+
this.OnMessageBytes(channel, msgBytes);
114+
}
115+
116+
var message = msgBytes.FromUtf8Bytes();
105117
if (this.OnMessage != null)
106118
{
107119
this.OnMessage(channel, message);

0 commit comments

Comments
 (0)