Skip to content

Commit c29b5c9

Browse files
Martin LercherMartin Lercher
authored andcommitted
Refactored the process scheduling component of Input handler into it's own class.
1 parent e06dcde commit c29b5c9

File tree

2 files changed

+78
-51
lines changed

2 files changed

+78
-51
lines changed

src/JsonRpc/InputHandler.cs

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
using System;
2-
using System.Collections.Concurrent;
3-
using System.Collections.Generic;
42
using System.IO;
53
using System.Linq;
6-
using System.Text;
74
using System.Threading;
85
using System.Threading.Tasks;
9-
using JsonRpc.Server;
106
using JsonRpc.Server.Messages;
117
using Newtonsoft.Json.Linq;
128

@@ -27,9 +23,7 @@ public class InputHandler : IInputHandler
2723
private Thread _inputThread;
2824
private readonly IRequestRouter _requestRouter;
2925
private readonly IResponseRouter _responseRouter;
30-
private readonly BlockingCollection<(RequestProcessType type, Func<Task> request)> _queue;
31-
private readonly CancellationTokenSource _cancelQueue;
32-
private Thread _queueThread;
26+
private readonly ProcessScheduler _scheduler;
3327

3428
public InputHandler(
3529
TextReader input,
@@ -46,19 +40,16 @@ IResponseRouter responseRouter
4640
_requestProcessIdentifier = requestProcessIdentifier;
4741
_requestRouter = requestRouter;
4842
_responseRouter = responseRouter;
49-
_queue = new BlockingCollection<(RequestProcessType type, Func<Task> request)>();
50-
_cancelQueue = new CancellationTokenSource();
5143

44+
_scheduler = new ProcessScheduler();
5245
_inputThread = new Thread(ProcessInputStream) { IsBackground = true };
53-
54-
_queueThread = new Thread(ProcessRequestQueue) { IsBackground = true };
5546
}
5647

5748
public void Start()
5849
{
5950
_outputHandler.Start();
6051
_inputThread.Start();
61-
_queueThread.Start();
52+
_scheduler.Start();
6253
}
6354

6455
private async void ProcessInputStream()
@@ -145,24 +136,24 @@ private void HandleRequest(string request)
145136
{
146137
if (item.IsRequest)
147138
{
148-
_queue.Add((
139+
_scheduler.Add(
149140
type,
150141
async () => {
151142
var result = await _requestRouter.RouteRequest(item.Request);
152143

153144
_outputHandler.Send(result.Value);
154145
}
155-
));
146+
);
156147
}
157148
else if (item.IsNotification)
158149
{
159-
_queue.Add((
150+
_scheduler.Add(
160151
type,
161152
() => {
162153
_requestRouter.RouteNotification(item.Notification);
163154
return Task.CompletedTask;
164155
}
165-
));
156+
);
166157
}
167158
else if (item.IsError)
168159
{
@@ -172,46 +163,12 @@ private void HandleRequest(string request)
172163
}
173164
}
174165

175-
private Task Start(Func<Task> request)
176-
{
177-
var t = request();
178-
t.Start();
179-
return t;
180-
}
181-
182-
private async void ProcessRequestQueue()
183-
{
184-
// see https://github.com/OmniSharp/csharp-language-server-protocol/issues/4
185-
var token = _cancelQueue.Token;
186-
var waitables = new List<Task>();
187-
while(true)
188-
{
189-
if (_queueThread == null) return;
190-
if (_queue.TryTake(out var item, Timeout.Infinite, token))
191-
{
192-
var (type, request) = item;
193-
if (type == RequestProcessType.Serial)
194-
{
195-
await Task.WhenAll(waitables);
196-
waitables.Clear();
197-
await Start(request);
198-
}
199-
else if (type == RequestProcessType.Parallel)
200-
{
201-
waitables.Add(Start(request));
202-
}
203-
else
204-
throw new NotImplementedException("Only Serial and Parallel execution types can be handled currently");
205-
}
206-
}
207-
}
208166

209167
public void Dispose()
210168
{
211169
_outputHandler.Dispose();
212170
_inputThread = null;
213-
_queueThread = null;
214-
_cancelQueue.Cancel();
171+
_scheduler?.Dispose();
215172
}
216173
}
217174
}

src/JsonRpc/ProcessScheduler.cs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Collections.Concurrent;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace JsonRpc
8+
{
9+
public class ProcessScheduler : IDisposable
10+
{
11+
private readonly BlockingCollection<(RequestProcessType type, Func<Task> request)> _queue;
12+
private readonly CancellationTokenSource _cancel;
13+
private Thread _queueThread;
14+
15+
public ProcessScheduler() {
16+
_queue = new BlockingCollection<(RequestProcessType type, Func<Task> request)>();
17+
_cancel = new CancellationTokenSource();
18+
_queueThread = new Thread(ProcessRequestQueue) { IsBackground = true };
19+
}
20+
21+
public void Start() {
22+
_queueThread.Start();
23+
}
24+
25+
public void Add(RequestProcessType type, Func<Task> request) {
26+
_queue.Add((type, request));
27+
}
28+
29+
private Task Start(Func<Task> request)
30+
{
31+
var t = request();
32+
t.Start();
33+
return t;
34+
}
35+
36+
private void ProcessRequestQueue()
37+
{
38+
// see https://github.com/OmniSharp/csharp-language-server-protocol/issues/4
39+
// no need to be async, because this thing already allocated a thread on it's own.
40+
var token = _cancel.Token;
41+
var waitables = new List<Task>();
42+
while(true)
43+
{
44+
if (_queueThread == null) return;
45+
if (_queue.TryTake(out var item, Timeout.Infinite, token))
46+
{
47+
var (type, request) = item;
48+
if (type == RequestProcessType.Serial)
49+
{
50+
Task.WaitAll(waitables.ToArray(), token);
51+
waitables.Clear();
52+
Start(request).Wait(token);
53+
}
54+
else if (type == RequestProcessType.Parallel)
55+
{
56+
waitables.Add(Start(request));
57+
}
58+
else
59+
throw new NotImplementedException("Only Serial and Parallel execution types can be handled currently");
60+
}
61+
}
62+
}
63+
64+
public void Dispose()
65+
{
66+
_queueThread = null;
67+
_cancel.Cancel();
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)