Skip to content

Commit b9a8c77

Browse files
Merge pull request #11 from Lercher/inputhandler-refactoring
Inputhandler refactoring for issue #4 and #8
2 parents acb5c7f + f5804ec commit b9a8c77

File tree

6 files changed

+339
-85
lines changed

6 files changed

+339
-85
lines changed

src/JsonRpc/IScheduler.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace JsonRpc
5+
{
6+
public interface IScheduler : IDisposable
7+
{
8+
void Start();
9+
void Add(RequestProcessType type, Func<Task> request);
10+
}
11+
}

src/JsonRpc/InputHandler.cs

Lines changed: 34 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
11
using System;
2-
using System.Collections.Concurrent;
3-
using System.Collections.Generic;
42
using System.IO;
53
using System.Linq;
6-
using System.Text;
74
using System.Threading;
85
using System.Threading.Tasks;
9-
using JsonRpc.Server;
106
using JsonRpc.Server.Messages;
117
using Newtonsoft.Json.Linq;
128

139
namespace JsonRpc
1410
{
1511
public class InputHandler : IInputHandler
1612
{
17-
private readonly TimeSpan _sleepTime = TimeSpan.FromMilliseconds(50);
1813
public const char CR = '\r';
1914
public const char LF = '\n';
2015
public static char[] CRLF = { CR, LF };
@@ -28,8 +23,7 @@ public class InputHandler : IInputHandler
2823
private Thread _inputThread;
2924
private readonly IRequestRouter _requestRouter;
3025
private readonly IResponseRouter _responseRouter;
31-
private readonly ConcurrentQueue<(RequestProcessType type, Func<Task> request)> _queue;
32-
private Thread _queueThread;
26+
private readonly IScheduler _scheduler;
3327

3428
public InputHandler(
3529
TextReader input,
@@ -46,31 +40,16 @@ IResponseRouter responseRouter
4640
_requestProcessIdentifier = requestProcessIdentifier;
4741
_requestRouter = requestRouter;
4842
_responseRouter = responseRouter;
49-
_queue = new ConcurrentQueue<(RequestProcessType type, Func<Task> request)>();
5043

51-
_inputThread = new Thread(ProcessInputStream) { IsBackground = true };
52-
53-
_queueThread = new Thread(ProcessRequestQueue) { IsBackground = true };
54-
}
55-
56-
internal InputHandler(
57-
TextReader input,
58-
IOutputHandler outputHandler,
59-
IReciever reciever,
60-
IRequestProcessIdentifier requestProcessIdentifier,
61-
IRequestRouter requestRouter,
62-
IResponseRouter responseRouter,
63-
TimeSpan sleepTime
64-
) : this(input, outputHandler, reciever, requestProcessIdentifier, requestRouter, responseRouter)
65-
{
66-
_sleepTime = sleepTime;
67-
}
44+
_scheduler = new ProcessScheduler();
45+
_inputThread = new Thread(ProcessInputStream) { IsBackground = true, Name = "ProcessInputStream" };
46+
}
6847

6948
public void Start()
7049
{
7150
_outputHandler.Start();
7251
_inputThread.Start();
73-
_queueThread.Start();
52+
_scheduler.Start();
7453
}
7554

7655
private async void ProcessInputStream()
@@ -81,10 +60,14 @@ private async void ProcessInputStream()
8160

8261
var buffer = new char[300];
8362
var current = await _input.ReadBlockAsync(buffer, 0, MinBuffer);
63+
if (current == 0) return; // no more _input
64+
8465
while (current < MinBuffer || buffer[current - 4] != CR || buffer[current - 3] != LF ||
8566
buffer[current - 2] != CR || buffer[current - 1] != LF)
8667
{
87-
current += await _input.ReadBlockAsync(buffer, current, 1);
68+
var n = await _input.ReadBlockAsync(buffer, current, 1);
69+
if (n == 0) return; // no more _input, mitigates endless loop here.
70+
current += n;
8871
}
8972

9073
var headersContent = new string(buffer, 0, current);
@@ -97,17 +80,28 @@ private async void ProcessInputStream()
9780
var value = headers[i].Trim();
9881
if (header.Equals("Content-Length", StringComparison.OrdinalIgnoreCase))
9982
{
100-
length = long.Parse(value);
83+
length = 0;
84+
long.TryParse(value, out length);
10185
}
10286
}
10387

104-
var requestBuffer = new char[length];
105-
106-
await _input.ReadBlockAsync(requestBuffer, 0, requestBuffer.Length);
107-
108-
var payload = new string(requestBuffer);
109-
110-
HandleRequest(payload);
88+
if (length == 0 || length >= int.MaxValue)
89+
{
90+
HandleRequest(string.Empty);
91+
}
92+
else
93+
{
94+
var requestBuffer = new char[length];
95+
var received = 0;
96+
while (received < length)
97+
{
98+
var n = await _input.ReadBlockAsync(requestBuffer, received, requestBuffer.Length - received);
99+
if (n == 0) return; // no more _input
100+
received += n;
101+
}
102+
var payload = new string(requestBuffer);
103+
HandleRequest(payload);
104+
}
111105
}
112106
}
113107

@@ -158,24 +152,23 @@ private void HandleRequest(string request)
158152
{
159153
if (item.IsRequest)
160154
{
161-
_queue.Enqueue((
155+
_scheduler.Add(
162156
type,
163157
async () => {
164158
var result = await _requestRouter.RouteRequest(item.Request);
165-
166159
_outputHandler.Send(result.Value);
167160
}
168-
));
161+
);
169162
}
170163
else if (item.IsNotification)
171164
{
172-
_queue.Enqueue((
165+
_scheduler.Add(
173166
type,
174167
() => {
175168
_requestRouter.RouteNotification(item.Notification);
176169
return Task.CompletedTask;
177170
}
178-
));
171+
);
179172
}
180173
else if (item.IsError)
181174
{
@@ -185,42 +178,12 @@ private void HandleRequest(string request)
185178
}
186179
}
187180

188-
private bool IsNextSerial()
189-
{
190-
return _queue.TryPeek(out var queueResult) && queueResult.type == RequestProcessType.Serial;
191-
}
192-
193-
private async void ProcessRequestQueue()
194-
{
195-
while (true)
196-
{
197-
if (_queueThread == null) return;
198-
var items = new List<Func<Task>>();
199-
while (!_queue.IsEmpty)
200-
{
201-
if (IsNextSerial() && items.Count > 0)
202-
{
203-
break;
204-
}
205-
206-
if (_queue.TryDequeue(out var queueResult))
207-
items.Add(queueResult.request);
208-
}
209-
210-
await Task.WhenAll(items.Select(x => x()));
211-
212-
if (_queue.IsEmpty)
213-
{
214-
await Task.Delay(_sleepTime);
215-
}
216-
}
217-
}
218181

219182
public void Dispose()
220183
{
221184
_outputHandler.Dispose();
222185
_inputThread = null;
223-
_queueThread = null;
186+
_scheduler.Dispose();
224187
}
225188
}
226189
}

src/JsonRpc/OutputHandler.cs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace JsonRpc
1010
public class OutputHandler : IOutputHandler
1111
{
1212
private readonly TextWriter _output;
13-
private Thread _thread;
13+
private readonly Thread _thread;
1414
private readonly BlockingCollection<object> _queue;
1515
private readonly CancellationTokenSource _cancel;
1616

@@ -19,9 +19,7 @@ public OutputHandler(TextWriter output)
1919
_output = output;
2020
_queue = new BlockingCollection<object>();
2121
_cancel = new CancellationTokenSource();
22-
_thread = new Thread(ProcessOutputQueue) {
23-
IsBackground = true
24-
};
22+
_thread = new Thread(ProcessOutputQueue) { IsBackground = true, Name = "ProcessOutputQueue" };
2523
}
2624

2725
public void Start()
@@ -37,10 +35,9 @@ public void Send(object value)
3735
private void ProcessOutputQueue()
3836
{
3937
var token = _cancel.Token;
40-
while (true)
38+
try
4139
{
42-
if (_thread == null) return;
43-
try
40+
while (true)
4441
{
4542
if (_queue.TryTake(out var value, Timeout.Infinite, token))
4643
{
@@ -51,19 +48,24 @@ private void ProcessOutputQueue()
5148
sb.Append($"Content-Length: {content.Length}\r\n");
5249
sb.Append($"\r\n");
5350
sb.Append(content);
54-
5551
_output.Write(sb.ToString());
5652
}
5753
}
58-
catch (OperationCanceledException) { }
54+
}
55+
catch (OperationCanceledException ex)
56+
{
57+
if (ex.CancellationToken != token)
58+
throw;
59+
// else ignore. Exceptions: OperationCanceledException - The CancellationToken has been canceled.
5960
}
6061
}
6162

6263
public void Dispose()
6364
{
64-
_output?.Dispose();
65-
_thread = null;
6665
_cancel.Cancel();
66+
_thread.Join();
67+
_cancel.Dispose();
68+
_output.Dispose();
6769
}
6870
}
6971
}

src/JsonRpc/ProcessScheduler.cs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Collections.Concurrent;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace JsonRpc
8+
{
9+
public class ProcessScheduler : IScheduler
10+
{
11+
private readonly BlockingCollection<(RequestProcessType type, Func<Task> request)> _queue;
12+
private readonly CancellationTokenSource _cancel;
13+
private readonly Thread _thread;
14+
15+
public ProcessScheduler()
16+
{
17+
_queue = new BlockingCollection<(RequestProcessType type, Func<Task> request)>();
18+
_cancel = new CancellationTokenSource();
19+
_thread = new Thread(ProcessRequestQueue) { IsBackground = true, Name = "ProcessRequestQueue" };
20+
}
21+
22+
public void Start()
23+
{
24+
_thread.Start();
25+
}
26+
27+
public void Add(RequestProcessType type, Func<Task> request)
28+
{
29+
_queue.Add((type, request));
30+
}
31+
32+
private Task Start(Func<Task> request)
33+
{
34+
var t = request();
35+
if (t.Status == TaskStatus.Created) // || t.Status = TaskStatus.WaitingForActivation ?
36+
t.Start();
37+
return t;
38+
}
39+
40+
private List<Task> RemoveCompleteTasks(List<Task> list)
41+
{
42+
if (list.Count == 0) return list;
43+
44+
var result = new List<Task>();
45+
foreach(var t in list)
46+
{
47+
if (t.IsFaulted)
48+
{
49+
// TODO: Handle Fault
50+
}
51+
else if (!t.IsCompleted)
52+
{
53+
result.Add(t);
54+
}
55+
}
56+
return result;
57+
}
58+
59+
public long _TestOnly_NonCompleteTaskCount = 0;
60+
private void ProcessRequestQueue()
61+
{
62+
// see https://github.com/OmniSharp/csharp-language-server-protocol/issues/4
63+
// no need to be async, because this thing already allocated a thread on it's own.
64+
var token = _cancel.Token;
65+
var waitables = new List<Task>();
66+
try
67+
{
68+
while (true)
69+
{
70+
if (_queue.TryTake(out var item, Timeout.Infinite, token))
71+
{
72+
var (type, request) = item;
73+
if (type == RequestProcessType.Serial)
74+
{
75+
Task.WaitAll(waitables.ToArray(), token);
76+
Start(request).Wait(token);
77+
}
78+
else if (type == RequestProcessType.Parallel)
79+
{
80+
waitables.Add(Start(request));
81+
}
82+
else
83+
throw new NotImplementedException("Only Serial and Parallel execution types can be handled currently");
84+
waitables = RemoveCompleteTasks(waitables);
85+
Interlocked.Exchange(ref _TestOnly_NonCompleteTaskCount, waitables.Count);
86+
}
87+
}
88+
}
89+
catch (OperationCanceledException ex)
90+
{
91+
if (ex.CancellationToken != token)
92+
throw;
93+
// OperationCanceledException - The CancellationToken has been canceled.
94+
Task.WaitAll(waitables.ToArray(), TimeSpan.FromMilliseconds(1000));
95+
var keeponrunning = RemoveCompleteTasks(waitables);
96+
Interlocked.Exchange(ref _TestOnly_NonCompleteTaskCount, keeponrunning.Count);
97+
keeponrunning.ForEach((t) =>
98+
{
99+
// TODO: There is no way to abort a Task. As we don't construct the tasks, we can do nothing here
100+
// Option is: change the task factory "Func<Task> request" to a "Func<CancellationToken, Task> request"
101+
});
102+
}
103+
}
104+
105+
private bool _disposed = false;
106+
public void Dispose()
107+
{
108+
if (_disposed) return;
109+
_disposed = true;
110+
_cancel.Cancel();
111+
_thread.Join();
112+
_cancel.Dispose();
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)