Skip to content

Commit 678e22d

Browse files
committed
Added pub sub message converter
1 parent c20f13b commit 678e22d

File tree

3 files changed

+174
-85
lines changed

3 files changed

+174
-85
lines changed

.editorconfig

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,32 @@ insert_final_newline = true
1111
csharp_style_var_for_built_in_types = true:error
1212
csharp_style_var_when_type_is_apparent = true:error
1313
csharp_style_var_elsewhere = true:error
14-
csharp_prefer_braces = true
14+
csharp_prefer_braces = true:warning
15+
csharp_new_line_before_open_brace = none
16+
17+
18+
19+
######################################################################
20+
# Start by defining the naming symbols (groups) for fields...
21+
######################################################################
22+
# allowed by design guidelines, but naming is not specified by naming guidelines
23+
dotnet_naming_symbols.private_fields.applicable_kinds = field
24+
dotnet_naming_symbols.private_fields.applicable_accessibilities = private, internal, protected_internal
25+
26+
######################################################################
27+
# Now define the styles that will be applied to those naming symbols...
28+
######################################################################
29+
# prefix_with_underscore_pascal_case
30+
dotnet_naming_style.prefix_with_underscore_pascal_case.capitalization = pascal_case
31+
dotnet_naming_style.prefix_with_underscore_pascal_case.required_prefix = _
32+
33+
######################################################################
34+
# Naming Rules are matched in the order listed, and only the first match is applied
35+
# Use this to match allowed field types, then match all other field types with the invalid style
36+
# Explicitly mark the field type that is user-preference, to allow simple changing to camelCase
37+
# or other settings...
38+
######################################################################
39+
# Fields that are private can be formatted entirely by user preference
40+
dotnet_naming_rule.private_fields_rule.symbols = private_fields
41+
dotnet_naming_rule.private_fields_rule.style = prefix_with_underscore_pascal_case
42+
dotnet_naming_rule.private_fields_rule.severity = warning

Fritz.Twitch/PubSub/Proxy.cs

Lines changed: 53 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using Microsoft.Extensions.Logging;
22
using Microsoft.Extensions.Options;
33
using Newtonsoft.Json;
4-
using Newtonsoft.Json.Linq;
54
using System;
65
using System.Collections.Generic;
76
using System.Linq;
@@ -11,24 +10,21 @@
1110
using System.Threading.Tasks;
1211
using System.Timers;
1312

14-
namespace Fritz.Twitch.PubSub
15-
{
13+
namespace Fritz.Twitch.PubSub {
1614

1715
/// <summary>
1816
/// Manage interactions with the Twitch pubsub API
1917
/// </summary>
20-
public class Proxy : IDisposable
21-
{
18+
public class Proxy : IDisposable {
2219

2320
private ClientWebSocket _Socket;
2421
private System.Timers.Timer _PingTimer;
2522
private System.Timers.Timer _PongTimer;
26-
private ConfigurationSettings _Configuration;
27-
private ILogger _Logger;
23+
private readonly ConfigurationSettings _Configuration;
24+
private readonly ILogger _Logger;
2825
private static bool _Reconnect;
2926

30-
public Proxy(IOptions<ConfigurationSettings> settings, ILoggerFactory loggerFactory)
31-
{
27+
public Proxy(IOptions<ConfigurationSettings> settings, ILoggerFactory loggerFactory) {
3228

3329
InitializeMethodStrategies();
3430

@@ -37,8 +33,7 @@ public Proxy(IOptions<ConfigurationSettings> settings, ILoggerFactory loggerFact
3733

3834
}
3935

40-
public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken token)
41-
{
36+
public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken token) {
4237

4338
_Topics = topics;
4439

@@ -49,17 +44,15 @@ public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken
4944

5045
await StartListening(topics);
5146

52-
while (!token.IsCancellationRequested)
53-
{
47+
while (!token.IsCancellationRequested) {
5448

5549
var buffer = new byte[1024];
5650
var messageBuffer = new ArraySegment<byte>(buffer);
5751
var completeMessage = new StringBuilder();
5852

5953
var result = await _Socket.ReceiveAsync(messageBuffer, token);
6054
completeMessage.Append(Encoding.UTF8.GetString(messageBuffer));
61-
while (!result.EndOfMessage)
62-
{
55+
while (!result.EndOfMessage) {
6356
buffer = new byte[1024];
6457
messageBuffer = new ArraySegment<byte>(buffer);
6558
result = await _Socket.ReceiveAsync(messageBuffer, token);
@@ -71,12 +64,13 @@ public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken
7164
break;
7265
}
7366

74-
try
75-
{
67+
try {
7668
HandleMessage(completeMessage.ToString());
77-
} catch (UnhandledPubSubMessageException) {
69+
}
70+
catch (UnhandledPubSubMessageException) {
7871
// do nothing
79-
} catch (Exception e) {
72+
}
73+
catch (Exception e) {
8074
_Logger.LogError(e, "Error while parsing message from Twitch: " + completeMessage.ToString());
8175
_Logger.LogError("Reconnecting...");
8276
_Reconnect = true;
@@ -88,44 +82,39 @@ public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken
8882

8983
}
9084

91-
if (_Reconnect) _ = Task.Run(() => StartAsync(topics, token));
92-
85+
if (_Reconnect) {
86+
_ = Task.Run(() => StartAsync(topics, token));
87+
}
9388
}
9489

9590

96-
private delegate bool OnReceivedMessage(string message);
97-
private List<OnReceivedMessage> _Strategies = new List<OnReceivedMessage>();
91+
private delegate bool OnReceivedMessage(IPubSubReceiveMessage message);
92+
private readonly List<OnReceivedMessage> _Strategies = new List<OnReceivedMessage>();
9893

99-
private void HandleMessage(string receivedMessage)
100-
{
94+
private void HandleMessage(string receivedMessage) {
95+
var message = JsonConvert.DeserializeObject<IPubSubReceiveMessage>(receivedMessage);
10196

102-
var jDoc = JObject.Parse(receivedMessage);
103-
var messageType = jDoc["type"].Value<string>();
104-
if (messageType == "RESPONSE" && jDoc["error"].Value<string>() != "")
105-
{
106-
throw new Exception("Unable to connect");
107-
} else if (messageType == "RESPONSE") {
97+
if (message is ResponseReceiveMessage response) {
98+
if (!string.IsNullOrWhiteSpace(response.Error)) {
99+
throw new Exception($"Unable to connect: {response.Error}");
100+
}
108101
return;
109102
}
110103

111-
foreach (var strategy in _Strategies)
112-
{
113-
if (strategy(receivedMessage)) return;
104+
foreach (var strategy in _Strategies) {
105+
if (strategy(message)) {
106+
return;
107+
}
114108
}
115109

116110
throw new UnhandledPubSubMessageException();
117-
118111
}
119112

120-
private async Task StartListening(IEnumerable<TwitchTopic> topics)
121-
{
122-
113+
private async Task StartListening(IEnumerable<TwitchTopic> topics) {
123114
_Socket = new ClientWebSocket();
124115

125-
var message = new PubSubListen
126-
{
127-
data = new PubSubListen.PubSubListenData
128-
{
116+
var message = new PubSubListen {
117+
data = new PubSubListen.PubSubListenData {
129118
auth_token = _Configuration.OAuthToken,
130119
topics = topics.Select(t => t.TopicString).ToArray()
131120
}
@@ -136,31 +125,30 @@ await _Socket.ConnectAsync(new Uri("wss://pubsub-edge.twitch.tv:443"), Cancellat
136125

137126
}
138127

139-
private void _PingTimer_Elapsed(object sender, ElapsedEventArgs e)
140-
{
128+
private void _PingTimer_Elapsed(object sender, ElapsedEventArgs e) {
141129
var message = @"{ ""type"": ""PING"" }";
142130
SendMessageOnSocket(message).GetAwaiter().GetResult();
143131
_PongTimer = new System.Timers.Timer(TimeSpan.FromSeconds(10).TotalMilliseconds);
144132
_PongTimer.Elapsed += _PongTimer_Elapsed;
145133
_PongTimer.Start();
146134
_PingAcknowledged = false;
147135

148-
// TODO: handle the lack of returned PONG message
136+
// TODO: handle the lack of returned PONG message
149137

150138
}
151139

152-
private void _PongTimer_Elapsed(object sender, ElapsedEventArgs e)
153-
{
140+
private void _PongTimer_Elapsed(object sender, ElapsedEventArgs e) {
154141
if (!_PingAcknowledged) {
155142
_Reconnect = true;
156143
_PongTimer.Dispose();
157144
}
158145
}
159146

160-
private Task SendMessageOnSocket(string message)
161-
{
147+
private Task SendMessageOnSocket(string message) {
162148

163-
if (_Socket.State != WebSocketState.Open) return Task.CompletedTask;
149+
if (_Socket.State != WebSocketState.Open) {
150+
return Task.CompletedTask;
151+
}
164152

165153
var byteArray = Encoding.ASCII.GetBytes(message);
166154
return _Socket.SendAsync(byteArray, WebSocketMessageType.Text, true, CancellationToken.None);
@@ -183,9 +171,9 @@ private void InitializeMethodStrategies() {
183171

184172
}
185173

186-
private bool HandlePongMessage(string message) {
174+
private bool HandlePongMessage(IPubSubReceiveMessage message) {
187175

188-
if (message.Contains(@"""PONG""")) {
176+
if (message is PongReceiveMessage) {
189177
_PingAcknowledged = true;
190178
_PongTimer.Stop();
191179
_PongTimer.Dispose();
@@ -197,9 +185,9 @@ private bool HandlePongMessage(string message) {
197185

198186
}
199187

200-
private bool HandleReconnectMessage(string message) {
188+
private bool HandleReconnectMessage(IPubSubReceiveMessage message) {
201189

202-
if (message.Contains(@"""RECONNECT""")) {
190+
if (message is ReconnectReceiveMessage) {
203191

204192
_Reconnect = true;
205193

@@ -210,26 +198,12 @@ private bool HandleReconnectMessage(string message) {
210198

211199
}
212200

213-
private bool HandleChannelPointsMessage(string message) {
214-
215-
var jDoc = JObject.Parse(message);
216-
217-
if (jDoc["type"].Value<string>() == "MESSAGE" && jDoc["data"]["topic"].Value<string>().StartsWith("channel-points-channel-v1") ) {
201+
private bool HandleChannelPointsMessage(IPubSubReceiveMessage message) {
218202

219-
var innerMessage = jDoc["data"]["message"].Value<string>();
220-
221-
PubSubRedemptionMessage messageObj = null;
222-
try
223-
{
224-
messageObj = JsonConvert.DeserializeObject<PubSubRedemptionMessage>(innerMessage);
225-
} catch (Exception e) {
226-
_Logger.LogError(e, "Error while deserializing the message");
227-
_Logger.LogInformation("Message contents: " + innerMessage);
228-
}
229-
_Logger.LogWarning($"Channel Points redeemed: {innerMessage}");
230-
OnChannelPointsRedeemed?.Invoke(null, messageObj?.data);
203+
if (message is ChannelPointsReceiveMessage channelPointsMessage) {
204+
_Logger.LogWarning($"Channel Points redeemed: {channelPointsMessage.Data.Message}");
205+
OnChannelPointsRedeemed?.Invoke(null, channelPointsMessage?.Data?.Message);
231206
return true;
232-
233207
}
234208

235209
return false;
@@ -239,35 +213,30 @@ private bool HandleChannelPointsMessage(string message) {
239213
#endregion
240214

241215
#region IDisposable Support
242-
private bool disposedValue = false; // To detect redundant calls
216+
private bool _DisposedValue = false; // To detect redundant calls
243217
private bool _PingAcknowledged;
244218
private IEnumerable<TwitchTopic> _Topics;
245219

246-
protected virtual void Dispose(bool disposing)
247-
{
248-
if (!disposedValue)
249-
{
250-
if (disposing)
251-
{
220+
protected virtual void Dispose(bool disposing) {
221+
if (!_DisposedValue) {
222+
if (disposing) {
252223
_PingTimer.Dispose();
253224
_PongTimer.Dispose();
254225
}
255226

256227
_Socket.Dispose();
257228

258-
disposedValue = true;
229+
_DisposedValue = true;
259230
}
260231
}
261232

262-
~Proxy()
263-
{
233+
~Proxy() {
264234
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
265235
Dispose(false);
266236
}
267237

268238
// This code added to correctly implement the disposable pattern.
269-
public void Dispose()
270-
{
239+
public void Dispose() {
271240
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
272241
Dispose(true);
273242
// TODO: uncomment the following line if the finalizer is overridden above.

0 commit comments

Comments
 (0)