Skip to content

Commit 1aa06a2

Browse files
committed
Very naive WebSocket implementation #5
1 parent b458de3 commit 1aa06a2

File tree

2 files changed

+125
-3
lines changed

2 files changed

+125
-3
lines changed

AngleSharp.Io/Dom/WebSocket.cs

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,15 @@
22
{
33
using AngleSharp.Attributes;
44
using AngleSharp.Dom;
5+
using AngleSharp.Dom.Events;
6+
using AngleSharp.Io.Extensions;
57
using System;
68
using System.IO;
79
using System.Linq;
10+
using System.Net.WebSockets;
11+
using System.Text;
12+
using System.Threading;
13+
using System.Threading.Tasks;
814

915
/// <summary>
1016
/// Represents the WebSocket interface. For more information see:
@@ -15,8 +21,14 @@ public class WebSocket : EventTarget, IDisposable
1521
{
1622
#region Fields
1723

24+
const Int32 ReceiveChunkSize = 2048;
25+
const Int32 SendChunkSize = 1024;
26+
1827
readonly Url _url;
1928
readonly MemoryStream _buffered;
29+
readonly CancellationTokenSource _cts;
30+
readonly ClientWebSocket _ws;
31+
2032
String _protocol;
2133
WebSocketReadyState _state;
2234

@@ -77,6 +89,7 @@ public WebSocket(String url, params String[] protocols)
7789
_protocol = String.Empty;
7890
_state = WebSocketReadyState.Connecting;
7991
_buffered = new MemoryStream();
92+
_cts = new CancellationTokenSource();
8093

8194
if (_url.IsInvalid || _url.IsRelative)
8295
throw new DomException(DomError.Syntax);
@@ -85,6 +98,17 @@ public WebSocket(String url, params String[] protocols)
8598

8699
if (invalid > 0)
87100
throw new DomException(DomError.Syntax);
101+
102+
_ws = new ClientWebSocket();
103+
_ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(20);
104+
ConnectAsync(url).Forget();
105+
}
106+
107+
async Task ConnectAsync(String url)
108+
{
109+
await _ws.ConnectAsync(new Uri(url), _cts.Token).ConfigureAwait(false);
110+
OnConnected();
111+
StartListen().Forget();
88112
}
89113

90114
#endregion
@@ -140,7 +164,7 @@ public String Protocol
140164
[DomName("send")]
141165
public void Send(String data)
142166
{
143-
//TODO
167+
SendAsync(data).Wait();
144168
}
145169

146170
/// <summary>
@@ -149,12 +173,14 @@ public void Send(String data)
149173
[DomName("close")]
150174
public void Close()
151175
{
152-
//TODO
176+
_state = WebSocketReadyState.Closing;
177+
StopListen();
178+
OnDisconnected();
153179
}
154180

155181
void IDisposable.Dispose()
156182
{
157-
Close();
183+
StopListen();
158184
}
159185

160186
#endregion
@@ -172,6 +198,92 @@ static Boolean IsValid(String protocol)
172198
return true;
173199
}
174200

201+
async Task SendAsync(String message)
202+
{
203+
if (_ws.State != WebSocketState.Open)
204+
throw new Exception("WebSocket is already in CLOSING or CLOSED state.");
205+
206+
var messageBuffer = Encoding.UTF8.GetBytes(message);
207+
var remainder = 0;
208+
var messagesCount = Math.DivRem(messageBuffer.Length, SendChunkSize, out remainder);
209+
210+
if (remainder > 0)
211+
messagesCount++;
212+
else
213+
remainder = SendChunkSize;
214+
215+
for (var i = 0; i < messagesCount; i++)
216+
{
217+
var offset = SendChunkSize * i;
218+
var lastMessage = (i + 1) == messagesCount;
219+
var count = lastMessage ? remainder : SendChunkSize;
220+
var segment = new ArraySegment<Byte>(messageBuffer, offset, count);
221+
await _ws.SendAsync(segment, WebSocketMessageType.Text, lastMessage, _cts.Token).ConfigureAwait(false);
222+
}
223+
}
224+
225+
async Task StartListen()
226+
{
227+
var buffer = new Byte[ReceiveChunkSize];
228+
var stringResult = new StringBuilder();
229+
230+
try
231+
{
232+
while (_ws.State == WebSocketState.Open)
233+
{
234+
var segment = new ArraySegment<Byte>(buffer);
235+
var result = await _ws.ReceiveAsync(segment, _cts.Token).ConfigureAwait(false);
236+
237+
if (result.MessageType == WebSocketMessageType.Close)
238+
{
239+
await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, String.Empty, _cts.Token).ConfigureAwait(false);
240+
OnDisconnected();
241+
return;
242+
}
243+
244+
stringResult.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));
245+
246+
if (result.EndOfMessage)
247+
{
248+
OnMessage(stringResult.ToString());
249+
stringResult.Clear();
250+
}
251+
}
252+
}
253+
catch
254+
{
255+
OnDisconnected();
256+
}
257+
finally
258+
{
259+
StopListen();
260+
}
261+
}
262+
263+
void StopListen()
264+
{
265+
_cts.Cancel();
266+
_ws.Abort();
267+
_ws.Dispose();
268+
}
269+
270+
void OnMessage(String message)
271+
{
272+
this.Dispatch(new MessageEvent(MessageEvent, data: message, origin: _url.Href));
273+
}
274+
275+
void OnDisconnected()
276+
{
277+
_state = WebSocketReadyState.Closed;
278+
this.Dispatch(new Event(CloseEvent));
279+
}
280+
281+
void OnConnected()
282+
{
283+
_state = WebSocketReadyState.Open;
284+
this.Dispatch(new Event(OpenEvent));
285+
}
286+
175287
#endregion
176288
}
177289
}

AngleSharp.Io/Extensions/GeneralExtensions.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
{
33
using AngleSharp.Network;
44
using System;
5+
using System.Threading.Tasks;
56

67
/// <summary>
78
/// Some general extension methods.
@@ -29,5 +30,14 @@ public static String Stringify(this HttpMethod method)
2930
return method.ToString().ToUpperInvariant();
3031
}
3132
}
33+
34+
/// <summary>
35+
/// Forgets the given task. Exceptions are ignored and continuations
36+
/// are pointless.
37+
/// </summary>
38+
/// <param name="task">The task to forget after firing.</param>
39+
public static void Forget(this Task task)
40+
{
41+
}
3242
}
3343
}

0 commit comments

Comments
 (0)