1111using Newtonsoft . Json . Linq ;
1212using PuppeteerSharp . Helpers ;
1313using PuppeteerSharp . Messaging ;
14+ using PuppeteerSharp . Transport ;
1415
1516namespace PuppeteerSharp
1617{
@@ -21,28 +22,26 @@ public class Connection : IDisposable, IConnection
2122 {
2223 private readonly ILogger _logger ;
2324
24- internal Connection ( string url , int delay , WebSocket ws , ILoggerFactory loggerFactory = null )
25+ internal Connection ( string url , int delay , IConnectionTransport transport , ILoggerFactory loggerFactory = null )
2526 {
2627 LoggerFactory = loggerFactory ?? new LoggerFactory ( ) ;
2728 Url = url ;
2829 Delay = delay ;
29- WebSocket = ws ;
30+ Transport = transport ;
3031
3132 _logger = LoggerFactory . CreateLogger < Connection > ( ) ;
32- _socketQueue = new TaskQueue ( ) ;
33- _callbacks = new ConcurrentDictionary < int , MessageTask > ( ) ;
33+
34+ Transport . MessageReceived += Transport_MessageReceived ;
35+ Transport . Closed += Transport_Closed ;
36+ _callbacks = new ConcurrentDictionary < int , MessageTask > ( ) ;
3437 _sessions = new ConcurrentDictionary < string , CDPSession > ( ) ;
35- _websocketReaderCancellationSource = new CancellationTokenSource ( ) ;
3638
37- Task . Factory . StartNew ( GetResponseAsync ) ;
3839 }
3940
4041 #region Private Members
4142 private int _lastId ;
4243 private readonly ConcurrentDictionary < int , MessageTask > _callbacks ;
4344 private readonly ConcurrentDictionary < string , CDPSession > _sessions ;
44- private readonly TaskQueue _socketQueue ;
45- private readonly CancellationTokenSource _websocketReaderCancellationSource ;
4645 #endregion
4746
4847 #region Properties
@@ -57,10 +56,10 @@ internal Connection(string url, int delay, WebSocket ws, ILoggerFactory loggerFa
5756 /// <value>The delay.</value>
5857 public int Delay { get ; }
5958 /// <summary>
60- /// Gets the WebSocket .
59+ /// Gets the Connection transport .
6160 /// </summary>
62- /// <value>The web socket .</value>
63- public WebSocket WebSocket { get ; }
61+ /// <value>Connection transport .</value>
62+ public IConnectionTransport Transport { get ; }
6463 /// <summary>
6564 /// Occurs when the connection is closed.
6665 /// </summary>
@@ -114,10 +113,7 @@ internal async Task<JObject> SendAsync(string method, dynamic args = null, bool
114113 _callbacks [ id ] = callback ;
115114 }
116115
117- var encoded = Encoding . UTF8 . GetBytes ( message ) ;
118- var buffer = new ArraySegment < byte > ( encoded , 0 , encoded . Length ) ;
119- await _socketQueue . Enqueue ( ( ) => WebSocket . SendAsync ( buffer , WebSocketMessageType . Text , true , default ) ) . ConfigureAwait ( false ) ;
120-
116+ await Transport . SendAsync ( message ) . ConfigureAwait ( false ) ;
121117 return waitForCallback ? await callback . TaskWrapper . Task . ConfigureAwait ( false ) : null ;
122118 }
123119
@@ -149,7 +145,7 @@ private void OnClose()
149145 }
150146 IsClosed = true ;
151147
152- _websocketReaderCancellationSource . Cancel ( ) ;
148+ Transport . StopReading ( ) ;
153149 Closed ? . Invoke ( this , new EventArgs ( ) ) ;
154150
155151 foreach ( var session in _sessions . Values . ToArray ( ) )
@@ -178,73 +174,15 @@ internal static IConnection FromSession(CDPSession session)
178174 }
179175 #region Private Methods
180176
181- /// <summary>
182- /// Starts listening the socket
183- /// </summary>
184- /// <returns>The start.</returns>
185- private async Task < object > GetResponseAsync ( )
177+ private async void Transport_MessageReceived ( object sender , MessageReceivedEventArgs e )
186178 {
187- var buffer = new byte [ 2048 ] ;
179+ var response = e . Message ;
180+ JObject obj = null ;
188181
189- //If it's not in the list we wait for it
190- while ( true )
182+ if ( response . Length > 0 && Delay > 0 )
191183 {
192- if ( IsClosed )
193- {
194- OnClose ( ) ;
195- return null ;
196- }
197-
198- var endOfMessage = false ;
199- var response = new StringBuilder ( ) ;
200-
201- while ( ! endOfMessage )
202- {
203- WebSocketReceiveResult result ;
204- try
205- {
206- result = await WebSocket . ReceiveAsync (
207- new ArraySegment < byte > ( buffer ) ,
208- _websocketReaderCancellationSource . Token ) . ConfigureAwait ( false ) ;
209- }
210- catch ( OperationCanceledException )
211- {
212- return null ;
213- }
214- catch ( Exception )
215- {
216- OnClose ( ) ;
217- return null ;
218- }
219-
220- endOfMessage = result . EndOfMessage ;
221-
222- if ( result . MessageType == WebSocketMessageType . Text )
223- {
224- response . Append ( Encoding . UTF8 . GetString ( buffer , 0 , result . Count ) ) ;
225- }
226- else if ( result . MessageType == WebSocketMessageType . Close )
227- {
228- OnClose ( ) ;
229- return null ;
230- }
231- }
232-
233- if ( response . Length > 0 )
234- {
235- if ( Delay > 0 )
236- {
237- await Task . Delay ( Delay ) . ConfigureAwait ( false ) ;
238- }
239-
240- ProcessResponse ( response . ToString ( ) ) ;
241- }
184+ await Task . Delay ( Delay ) . ConfigureAwait ( false ) ;
242185 }
243- }
244-
245- private void ProcessResponse ( string response )
246- {
247- JObject obj = null ;
248186
249187 try
250188 {
@@ -307,6 +245,9 @@ private void ProcessResponse(string response)
307245 }
308246 }
309247 }
248+
249+ void Transport_Closed ( object sender , EventArgs e ) => OnClose ( ) ;
250+
310251 #endregion
311252
312253 #region Static Methods
@@ -324,16 +265,23 @@ private void ProcessResponse(string response)
324265
325266 internal static async Task < Connection > Create ( string url , IConnectionOptions connectionOptions , ILoggerFactory loggerFactory = null )
326267 {
327- var ws = await ( connectionOptions . WebSocketFactory ?? DefaultWebSocketFactory ) (
328- new Uri ( url ) ,
329- connectionOptions ,
330- default ) . ConfigureAwait ( false ) ;
331- return new Connection ( url , connectionOptions . SlowMo , ws , loggerFactory ) ;
268+ var transport = connectionOptions . Transport ;
269+
270+ if ( transport == null )
271+ {
272+ var ws = await ( connectionOptions . WebSocketFactory ?? DefaultWebSocketFactory ) (
273+ new Uri ( url ) ,
274+ connectionOptions ,
275+ default ) . ConfigureAwait ( false ) ;
276+ transport = new WebSocketTransport ( ws , connectionOptions . EnqueueTransportMessages ) ;
277+ }
278+
279+ return new Connection ( url , connectionOptions . SlowMo , transport , loggerFactory ) ;
332280 }
333281
334282 /// <summary>
335283 /// Releases all resource used by the <see cref="Connection"/> object.
336- /// It will raise the <see cref="Closed"/> event and dispose <see cref="WebSocket "/>.
284+ /// It will raise the <see cref="Closed"/> event and dispose <see cref="Transport "/>.
337285 /// </summary>
338286 /// <remarks>Call <see cref="Dispose"/> when you are finished using the <see cref="Connection"/>. The
339287 /// <see cref="Dispose"/> method leaves the <see cref="Connection"/> in an unusable state.
@@ -343,7 +291,7 @@ internal static async Task<Connection> Create(string url, IConnectionOptions con
343291 public void Dispose ( )
344292 {
345293 OnClose ( ) ;
346- WebSocket . Dispose ( ) ;
294+ Transport . Dispose ( ) ;
347295 }
348296 #endregion
349297
0 commit comments