Skip to content

Commit ee4224a

Browse files
tommymonkkblok
authored andcommitted
Concurrency improvements (#715)
1 parent 924a7ad commit ee4224a

File tree

3 files changed

+20
-18
lines changed

3 files changed

+20
-18
lines changed

lib/PuppeteerSharp/CDPSession.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34
using System.Linq;
5+
using System.Threading;
46
using System.Threading.Tasks;
57
using Microsoft.Extensions.Logging;
68
using Newtonsoft.Json;
@@ -45,16 +47,16 @@ internal CDPSession(IConnection connection, TargetType targetType, string sessio
4547
TargetType = targetType;
4648
SessionId = sessionId;
4749

48-
_callbacks = new Dictionary<int, MessageTask>();
50+
_callbacks = new ConcurrentDictionary<int, MessageTask>();
4951
_logger = Connection.LoggerFactory.CreateLogger<CDPSession>();
50-
_sessions = new Dictionary<string, CDPSession>();
52+
_sessions = new ConcurrentDictionary<string, CDPSession>();
5153
}
5254

5355
#region Private Members
5456
private int _lastId;
55-
private readonly Dictionary<int, MessageTask> _callbacks;
57+
private readonly ConcurrentDictionary<int, MessageTask> _callbacks;
5658
private readonly ILogger _logger;
57-
private readonly Dictionary<string, CDPSession> _sessions;
59+
private readonly ConcurrentDictionary<string, CDPSession> _sessions;
5860
#endregion
5961

6062
#region Properties
@@ -133,7 +135,7 @@ public async Task<JObject> SendAsync(string method, dynamic args = null, bool wa
133135
{
134136
throw new PuppeteerException($"Protocol error ({method}): Session closed. Most likely the {TargetType} has been closed.");
135137
}
136-
var id = ++_lastId;
138+
var id = Interlocked.Increment(ref _lastId);
137139
var message = JsonConvert.SerializeObject(new Dictionary<string, object>
138140
{
139141
{ MessageKeys.Id, id },
@@ -165,9 +167,8 @@ await Connection.SendAsync(
165167
}
166168
catch (Exception ex)
167169
{
168-
if (waitForCallback && _callbacks.ContainsKey(id))
170+
if (waitForCallback && _callbacks.TryRemove(id, out _))
169171
{
170-
_callbacks.Remove(id);
171172
callback.TaskWrapper.SetException(new MessageException(ex.Message, ex));
172173
}
173174
}
@@ -213,7 +214,7 @@ internal void OnMessage(string message)
213214

214215
var id = obj[MessageKeys.Id]?.Value<int>();
215216

216-
if (id.HasValue && _callbacks.TryGetValue(id.Value, out var callback) && _callbacks.Remove(id.Value))
217+
if (id.HasValue && _callbacks.TryRemove(id.Value, out var callback))
217218
{
218219
if (obj[MessageKeys.Error] != null)
219220
{
@@ -249,7 +250,7 @@ internal void OnMessage(string message)
249250
{
250251
var sessionId = param[MessageKeys.SessionId].AsString();
251252

252-
if (_sessions.TryGetValue(sessionId, out var session) && _sessions.Remove(sessionId))
253+
if (_sessions.TryRemove(sessionId, out var session))
253254
{
254255
session.OnClosed();
255256
}

lib/PuppeteerSharp/Connection.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34
using System.Linq;
45
using System.Net.WebSockets;
@@ -29,17 +30,17 @@ internal Connection(string url, int delay, WebSocket ws, ILoggerFactory loggerFa
2930

3031
_logger = LoggerFactory.CreateLogger<Connection>();
3132
_socketQueue = new TaskQueue();
32-
_callbacks = new Dictionary<int, MessageTask>();
33-
_sessions = new Dictionary<string, CDPSession>();
33+
_callbacks = new ConcurrentDictionary<int, MessageTask>( );
34+
_sessions = new ConcurrentDictionary<string, CDPSession>();
3435
_websocketReaderCancellationSource = new CancellationTokenSource();
3536

3637
Task.Factory.StartNew(GetResponseAsync);
3738
}
3839

3940
#region Private Members
4041
private int _lastId;
41-
private readonly Dictionary<int, MessageTask> _callbacks;
42-
private readonly Dictionary<string, CDPSession> _sessions;
42+
private readonly ConcurrentDictionary<int, MessageTask> _callbacks;
43+
private readonly ConcurrentDictionary<string, CDPSession> _sessions;
4344
private readonly TaskQueue _socketQueue;
4445
private readonly CancellationTokenSource _websocketReaderCancellationSource;
4546
#endregion
@@ -91,7 +92,7 @@ internal async Task<JObject> SendAsync(string method, dynamic args = null, bool
9192
throw new TargetClosedException($"Protocol error({method}): Target closed.");
9293
}
9394

94-
var id = ++_lastId;
95+
var id = Interlocked.Increment(ref _lastId);
9596
var message = JsonConvert.SerializeObject(new Dictionary<string, object>
9697
{
9798
{ MessageKeys.Id, id },
@@ -133,7 +134,7 @@ internal async Task<CDPSession> CreateSessionAsync(TargetInfo targetInfo)
133134
targetId = targetInfo.TargetId
134135
}).ConfigureAwait(false))[MessageKeys.SessionId].AsString();
135136
var session = new CDPSession(this, targetInfo.Type, sessionId);
136-
_sessions.Add(sessionId, session);
137+
_sessions.TryAdd(sessionId, session);
137138
return session;
138139
}
139140

@@ -263,7 +264,7 @@ private void ProcessResponse(string response)
263264
{
264265
//If we get the object we are waiting for we return if
265266
//if not we add this to the list, sooner or later some one will come for it
266-
if (_callbacks.TryGetValue(id.Value, out var callback) && _callbacks.Remove(id.Value))
267+
if (_callbacks.TryRemove(id.Value, out var callback))
267268
{
268269
if (obj[MessageKeys.Error] != null)
269270
{
@@ -291,7 +292,7 @@ private void ProcessResponse(string response)
291292
else if (method == "Target.detachedFromTarget")
292293
{
293294
var sessionId = param[MessageKeys.SessionId].AsString();
294-
if (_sessions.TryGetValue(sessionId, out var session) && _sessions.Remove(sessionId) && !session.IsClosed)
295+
if (_sessions.TryRemove(sessionId, out var session) && !session.IsClosed)
295296
{
296297
session.OnClosed();
297298
}

lib/PuppeteerSharp/WaitTask.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ internal WaitTask(
155155

156156
internal async Task Rerun()
157157
{
158-
var runCount = ++_runCount;
158+
var runCount = Interlocked.Increment(ref _runCount);
159159
JSHandle success = null;
160160
Exception exception = null;
161161

0 commit comments

Comments
 (0)