@@ -21,7 +21,8 @@ public class Proxy : IDisposable
21
21
{
22
22
23
23
private ClientWebSocket _Socket ;
24
- private readonly System . Timers . Timer _PingTimer ;
24
+ private System . Timers . Timer _PingTimer ;
25
+ private System . Timers . Timer _PongTimer ;
25
26
private ConfigurationSettings _Configuration ;
26
27
private ILogger _Logger ;
27
28
private static bool _Reconnect ;
@@ -34,29 +35,52 @@ public Proxy(IOptions<ConfigurationSettings> settings, ILoggerFactory loggerFact
34
35
_Configuration = settings . Value ;
35
36
_Logger = loggerFactory . CreateLogger ( "TwitchPubSub" ) ;
36
37
37
- _Socket = new ClientWebSocket ( ) ;
38
-
39
- // Start a timer to manage the connection over the websocket
40
- _PingTimer = new System . Timers . Timer ( TimeSpan . FromMinutes ( 4 ) . TotalMilliseconds ) ;
41
- _PingTimer . Elapsed += _PingTimer_Elapsed ;
42
- _PingTimer . Start ( ) ;
43
-
44
38
}
45
39
46
40
public async Task StartAsync ( IEnumerable < TwitchTopic > topics , CancellationToken token )
47
41
{
48
42
49
43
_Topics = topics ;
50
- await StartListening ( topics ) ;
51
- var messageBuffer = new Memory < byte > ( ) ;
52
44
45
+ // Start a timer to manage the connection over the websocket
46
+ _PingTimer = new System . Timers . Timer ( TimeSpan . FromSeconds ( 30 ) . TotalMilliseconds ) ;
47
+ _PingTimer . Elapsed += _PingTimer_Elapsed ;
48
+ _PingTimer . Start ( ) ;
49
+
50
+ await StartListening ( topics ) ;
53
51
54
52
while ( ! token . IsCancellationRequested )
55
53
{
56
54
57
- await _Socket . ReceiveAsync ( messageBuffer , token ) ;
55
+ var buffer = new byte [ 1024 ] ;
56
+ var messageBuffer = new ArraySegment < byte > ( buffer ) ;
57
+ var completeMessage = new StringBuilder ( ) ;
58
+
59
+ var result = await _Socket . ReceiveAsync ( messageBuffer , token ) ;
60
+ completeMessage . Append ( Encoding . UTF8 . GetString ( messageBuffer ) ) ;
61
+ while ( ! result . EndOfMessage )
62
+ {
63
+ buffer = new byte [ 1024 ] ;
64
+ messageBuffer = new ArraySegment < byte > ( buffer ) ;
65
+ result = await _Socket . ReceiveAsync ( messageBuffer , token ) ;
66
+ completeMessage . Append ( Encoding . UTF8 . GetString ( messageBuffer ) ) ;
67
+ }
68
+
69
+ if ( result . MessageType == WebSocketMessageType . Close ) {
70
+ _Reconnect = true ;
71
+ break ;
72
+ }
58
73
59
- HandleMessage ( UTF8Encoding . UTF8 . GetString ( messageBuffer . Span ) ) ;
74
+ try
75
+ {
76
+ HandleMessage ( completeMessage . ToString ( ) ) ;
77
+ } catch ( UnhandledPubSubMessageException ) {
78
+ // do nothing
79
+ } catch ( Exception e ) {
80
+ _Logger . LogError ( e , "Error while parsing message from Twitch: " + completeMessage . ToString ( ) ) ;
81
+ _Logger . LogError ( "Reconnecting..." ) ;
82
+ _Reconnect = true ;
83
+ }
60
84
61
85
if ( _Reconnect ) {
62
86
break ;
@@ -75,6 +99,15 @@ public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken
75
99
private void HandleMessage ( string receivedMessage )
76
100
{
77
101
102
+ var jDoc = JObject . Parse ( receivedMessage ) ;
103
+ var messageType = jDoc [ "type" ] . Value < string > ( ) ;
104
+ if ( messageType == "RESPONSE" && jDoc [ "error" ] . Value < string > ( ) != "" )
105
+ {
106
+ throw new Exception ( "Unable to connect" ) ;
107
+ } else if ( messageType == "RESPONSE" ) {
108
+ return ;
109
+ }
110
+
78
111
foreach ( var strategy in _Strategies )
79
112
{
80
113
if ( strategy ( receivedMessage ) ) return ;
@@ -87,7 +120,7 @@ private void HandleMessage(string receivedMessage)
87
120
private async Task StartListening ( IEnumerable < TwitchTopic > topics )
88
121
{
89
122
90
- await _Socket . ConnectAsync ( new Uri ( "wss://pubsub-edge.twitch.tv" ) , CancellationToken . None ) ;
123
+ _Socket = new ClientWebSocket ( ) ;
91
124
92
125
var message = new PubSubListen
93
126
{
@@ -98,25 +131,39 @@ private async Task StartListening(IEnumerable<TwitchTopic> topics)
98
131
}
99
132
} ;
100
133
101
- await SendMessageOnSocket ( JsonConvert . SerializeObject ( message ) ) ;
134
+ await _Socket . ConnectAsync ( new Uri ( "wss://pubsub-edge.twitch.tv:443" ) , CancellationToken . None )
135
+ . ContinueWith ( t => SendMessageOnSocket ( JsonConvert . SerializeObject ( message ) ) ) ;
102
136
103
137
}
104
138
105
139
private void _PingTimer_Elapsed ( object sender , ElapsedEventArgs e )
106
140
{
107
141
var message = @"{ ""type"": ""PING"" }" ;
108
142
SendMessageOnSocket ( message ) . GetAwaiter ( ) . GetResult ( ) ;
143
+ _PongTimer = new System . Timers . Timer ( TimeSpan . FromSeconds ( 10 ) . TotalMilliseconds ) ;
144
+ _PongTimer . Elapsed += _PongTimer_Elapsed ;
145
+ _PongTimer . Start ( ) ;
109
146
_PingAcknowledged = false ;
110
147
111
148
// TODO: handle the lack of returned PONG message
112
149
113
150
}
114
151
115
- private async Task SendMessageOnSocket ( string message )
152
+ private void _PongTimer_Elapsed ( object sender , ElapsedEventArgs e )
116
153
{
154
+ if ( ! _PingAcknowledged ) {
155
+ _Reconnect = true ;
156
+ _PongTimer . Dispose ( ) ;
157
+ }
158
+ }
159
+
160
+ private Task SendMessageOnSocket ( string message )
161
+ {
162
+
163
+ if ( _Socket . State != WebSocketState . Open ) return Task . CompletedTask ;
117
164
118
- var byteArray = Encoding . UTF8 . GetBytes ( message ) ;
119
- await _Socket . SendAsync ( byteArray , WebSocketMessageType . Text , false , CancellationToken . None ) ;
165
+ var byteArray = Encoding . ASCII . GetBytes ( message ) ;
166
+ return _Socket . SendAsync ( byteArray , WebSocketMessageType . Text , true , CancellationToken . None ) ;
120
167
121
168
}
122
169
@@ -140,6 +187,8 @@ private bool HandlePongMessage(string message) {
140
187
141
188
if ( message . Contains ( @"""PONG""" ) ) {
142
189
_PingAcknowledged = true ;
190
+ _PongTimer . Stop ( ) ;
191
+ _PongTimer . Dispose ( ) ;
143
192
return true ;
144
193
}
145
194
@@ -164,11 +213,19 @@ private bool HandleChannelPointsMessage(string message) {
164
213
165
214
var jDoc = JObject . Parse ( message ) ;
166
215
167
- if ( jDoc [ "type" ] . Value < string > ( ) == "reward-redeemed" ) {
216
+ if ( jDoc [ "type" ] . Value < string > ( ) == "MESSAGE" && jDoc [ "data" ] [ "topic" ] . Value < string > ( ) . StartsWith ( "channel-points-channel-v1" ) ) {
168
217
169
- var messageObj = JsonConvert . DeserializeObject < PubSubMessage < ChannelRedemption > > ( message ) ;
218
+ var innerMessage = jDoc [ "data" ] [ "message" ] . Value < string > ( ) ;
170
219
171
- OnChannelPointsRedeemed ? . BeginInvoke ( null , messageObj . data , null , null ) ;
220
+ PubSubRedemptionMessage messageObj = null ;
221
+ try
222
+ {
223
+ messageObj = JsonConvert . DeserializeObject < PubSubRedemptionMessage > ( innerMessage ) ;
224
+ } catch ( Exception e ) {
225
+ _Logger . LogError ( e , "Error while deserializing the message" ) ;
226
+ _Logger . LogInformation ( "Message contents: " + innerMessage ) ;
227
+ }
228
+ OnChannelPointsRedeemed ? . Invoke ( null , messageObj ? . data ) ;
172
229
return true ;
173
230
174
231
}
@@ -191,6 +248,7 @@ protected virtual void Dispose(bool disposing)
191
248
if ( disposing )
192
249
{
193
250
_PingTimer . Dispose ( ) ;
251
+ _PongTimer . Dispose ( ) ;
194
252
}
195
253
196
254
_Socket . Dispose ( ) ;
0 commit comments