Skip to content

Commit 954496d

Browse files
Rework ProcessScheduler (#228)
* Added failing test * fixed an issue where if cancellation token was throw and not caught the scheduling thread would die (and potentially the entire process if the exception went unhandled) * Deal with unhandled exceptions as well, and report down through logging * Added generic exception handling * Removed commented code * Try to make tests work * rework tests to hopefully be more stable during CI
1 parent 4a22daa commit 954496d

14 files changed

+371
-235
lines changed

Directory.Build.targets

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
<PackageReference Update="coverlet.collector" Version="1.2.0" />
3434
<PackageReference Update="coverlet.msbuild" Version="2.8.0" />
3535
<PackageReference Update="System.Reactive" Version="4.3.2" />
36+
<PackageReference Update="Microsoft.Reactive.Testing" Version="4.3.2" />
3637
<PackageReference Update="MediatR" Version="8.0.1" />
3738
<PackageReference Update="MediatR.Extensions.Microsoft.DependencyInjection" Version="8.0.0" />
3839
</ItemGroup>

src/JsonRpc/Connection.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ namespace OmniSharp.Extensions.JsonRpc
77
public class Connection : IDisposable
88
{
99
private readonly IInputHandler _inputHandler;
10-
private readonly IRequestRouter<IHandlerDescriptor> _requestRouter;
1110

1211
public Connection(
1312
Stream input,
@@ -17,10 +16,9 @@ public Connection(
1716
IRequestRouter<IHandlerDescriptor> requestRouter,
1817
IResponseRouter responseRouter,
1918
ILoggerFactory loggerFactory,
20-
ISerializer serializer)
19+
ISerializer serializer,
20+
int? concurrency)
2121
{
22-
_requestRouter = requestRouter;
23-
2422
_inputHandler = new InputHandler(
2523
input,
2624
outputHandler,
@@ -29,7 +27,8 @@ public Connection(
2927
requestRouter,
3028
responseRouter,
3129
loggerFactory,
32-
serializer
30+
serializer,
31+
concurrency
3332
);
3433
}
3534

src/JsonRpc/IScheduler.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,21 @@
11
using System;
2+
using System.Reactive;
3+
using System.Reactive.Linq;
24
using System.Threading.Tasks;
35

46
namespace OmniSharp.Extensions.JsonRpc
57
{
68
public interface IScheduler : IDisposable
79
{
810
void Start();
9-
void Add(RequestProcessType type, string name, Func<Task> request);
11+
void Add(RequestProcessType type, string name, IObservable<Unit> request);
12+
}
13+
14+
public static class SchedulerExtensions
15+
{
16+
public static void Add(this IScheduler scheduler, RequestProcessType type, string name, Func<Task> request)
17+
{
18+
scheduler.Add(type, name, Observable.FromAsync(request));
19+
}
1020
}
1121
}

src/JsonRpc/InputHandler.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public InputHandler(
3737
IRequestRouter<IHandlerDescriptor> requestRouter,
3838
IResponseRouter responseRouter,
3939
ILoggerFactory loggerFactory,
40-
ISerializer serializer
40+
ISerializer serializer,
41+
int? concurrency
4142
)
4243
{
4344
if (!input.CanRead) throw new ArgumentException($"must provide a readable stream for {nameof(input)}", nameof(input));
@@ -49,15 +50,15 @@ ISerializer serializer
4950
_responseRouter = responseRouter;
5051
_serializer = serializer;
5152
_logger = loggerFactory.CreateLogger<InputHandler>();
52-
_scheduler = new ProcessScheduler(loggerFactory);
53+
_scheduler = new ProcessScheduler(loggerFactory, concurrency);
5354
_inputThread = new Thread(ProcessInputStream) { IsBackground = true, Name = "ProcessInputStream" };
5455
}
5556

5657
public void Start()
5758
{
59+
_scheduler.Start();
5860
_outputHandler.Start();
5961
_inputThread.Start();
60-
_scheduler.Start();
6162
}
6263

6364
// don't be async: We already allocated a seperate thread for this.

src/JsonRpc/ProcessScheduler.cs

Lines changed: 76 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
4+
using System.ComponentModel;
5+
using System.Diagnostics;
6+
using System.Linq;
7+
using System.Reactive;
8+
using System.Reactive.Concurrency;
9+
using System.Reactive.Disposables;
10+
using System.Reactive.Linq;
11+
using System.Reactive.Subjects;
412
using System.Threading;
513
using System.Threading.Tasks;
614
using Microsoft.Extensions.Logging;
@@ -9,122 +17,97 @@ namespace OmniSharp.Extensions.JsonRpc
917
{
1018
public class ProcessScheduler : IScheduler
1119
{
20+
private readonly int? _concurrency;
1221
private readonly ILogger<ProcessScheduler> _logger;
13-
private readonly BlockingCollection<(RequestProcessType type, string name, Func<Task> request)> _queue;
14-
private readonly CancellationTokenSource _cancel;
15-
private readonly Thread _thread;
22+
private readonly IObserver<(RequestProcessType type, string name, IObservable<Unit> request)> _enqueue;
23+
private readonly IObservable<(RequestProcessType type, string name, IObservable<Unit> request)> _queue;
24+
private bool _disposed = false;
25+
private readonly CompositeDisposable _disposable = new CompositeDisposable();
26+
private readonly System.Reactive.Concurrency.IScheduler _scheduler;
1627

17-
public ProcessScheduler(ILoggerFactory loggerFactory)
28+
public ProcessScheduler(ILoggerFactory loggerFactory, int? concurrency) : this(loggerFactory, concurrency,
29+
new EventLoopScheduler(
30+
_ => new Thread(_) {IsBackground = true, Name = "ProcessRequestQueue"}))
1831
{
19-
_logger = loggerFactory.CreateLogger<ProcessScheduler>();
20-
_queue = new BlockingCollection<(RequestProcessType type, string name, Func<Task> request)>();
21-
_cancel = new CancellationTokenSource();
22-
_thread = new Thread(ProcessRequestQueue) { IsBackground = true, Name = "ProcessRequestQueue" };
2332
}
2433

25-
public void Start()
34+
internal ProcessScheduler(ILoggerFactory loggerFactory, int? concurrency,
35+
System.Reactive.Concurrency.IScheduler scheduler)
2636
{
27-
_thread.Start();
28-
}
37+
_concurrency = concurrency;
38+
_logger = loggerFactory.CreateLogger<ProcessScheduler>();
2939

30-
public void Add(RequestProcessType type, string name, Func<Task> request)
31-
{
32-
_queue.Add((type, name, request));
40+
var subject = new Subject<(RequestProcessType type, string name, IObservable<Unit> request)>();
41+
_disposable.Add(subject);
42+
_enqueue = subject;
43+
_scheduler = scheduler;
44+
_queue = subject;
3345
}
3446

35-
private Task Start(Func<Task> request)
47+
public void Start()
3648
{
37-
var t = request();
38-
if (t.Status == TaskStatus.Created) // || t.Status = TaskStatus.WaitingForActivation ?
39-
t.Start();
40-
return t;
41-
}
49+
var obs = Observable.Create<Unit>(observer => {
50+
var cd = new CompositeDisposable();
4251

43-
private List<Task> RemoveCompleteTasks(List<Task> list)
44-
{
45-
if (list.Count == 0) return list;
52+
var observableQueue =
53+
new BehaviorSubject<(RequestProcessType type, ReplaySubject<IObservable<Unit>> observer)>((
54+
RequestProcessType.Serial, new ReplaySubject<IObservable<Unit>>(int.MaxValue)));
55+
56+
cd.Add(_queue.Subscribe(item => {
57+
if (observableQueue.Value.type != item.type)
58+
{
59+
observableQueue.Value.observer.OnCompleted();
60+
observableQueue.OnNext((item.type, new ReplaySubject<IObservable<Unit>>(int.MaxValue)));
61+
}
62+
63+
observableQueue.Value.observer.OnNext(HandleRequest(item.name, item.request));
64+
}));
4665

47-
var result = new List<Task>();
48-
foreach (var t in list)
66+
cd.Add(observableQueue
67+
.Select(item => {
68+
var (type, replay) = item;
69+
70+
if (type == RequestProcessType.Serial)
71+
return replay.Concat();
72+
73+
return _concurrency.HasValue
74+
? replay.Merge(_concurrency.Value)
75+
: replay.Merge();
76+
})
77+
.Concat()
78+
.Subscribe(observer)
79+
);
80+
81+
return cd;
82+
});
83+
84+
_disposable.Add(obs
85+
// .ObserveOn(_scheduler)
86+
.Subscribe(_ => { })
87+
);
88+
89+
IObservable<Unit> HandleRequest(string name, IObservable<Unit> request)
4990
{
50-
if (t.IsFaulted)
51-
{
52-
// TODO: Handle Fault
53-
}
54-
else if (!t.IsCompleted)
55-
{
56-
result.Add(t);
57-
}
91+
return request
92+
.Catch<Unit, OperationCanceledException>(ex => Observable.Empty<Unit>())
93+
.Catch<Unit, Exception>(ex => {
94+
_logger.LogCritical(Events.UnhandledException, ex, "Unhandled exception executing {Name}",
95+
name);
96+
return Observable.Empty<Unit>();
97+
});
5898
}
59-
return result;
6099
}
61100

62-
public long _TestOnly_NonCompleteTaskCount = 0;
63-
private void ProcessRequestQueue()
101+
public void Add(RequestProcessType type, string name, IObservable<Unit> request)
64102
{
65-
// see https://github.com/OmniSharp/csharp-language-server-protocol/issues/4
66-
// no need to be async, because this thing already allocated a thread on it's own.
67-
var token = _cancel.Token;
68-
var waitables = new List<Task>();
69-
try
70-
{
71-
while (!token.IsCancellationRequested)
72-
{
73-
if (_queue.TryTake(out var item, Timeout.Infinite, token))
74-
{
75-
var (type, name, request) = item;
76-
try
77-
{
78-
if (type == RequestProcessType.Serial)
79-
{
80-
Task.WaitAll(waitables.ToArray(), token);
81-
Start(request).Wait(token);
82-
}
83-
else if (type == RequestProcessType.Parallel)
84-
{
85-
waitables.Add(Start(request));
86-
}
87-
else
88-
throw new NotImplementedException("Only Serial and Parallel execution types can be handled currently");
89-
waitables = RemoveCompleteTasks(waitables);
90-
Interlocked.Exchange(ref _TestOnly_NonCompleteTaskCount, waitables.Count);
91-
}
92-
catch (OperationCanceledException ex) when (ex.CancellationToken == token)
93-
{
94-
throw;
95-
}
96-
catch (Exception e)
97-
{
98-
// TODO: Should we rethrow or swallow?
99-
// If an exception happens... the whole system could be in a bad state, hence this throwing currently.
100-
_logger.LogCritical(Events.UnhandledException, e, "Unhandled exception executing {Name}", name);
101-
throw;
102-
}
103-
}
104-
}
105-
}
106-
catch (OperationCanceledException ex) when (ex.CancellationToken == token)
107-
{
108-
// OperationCanceledException - The CancellationToken has been canceled.
109-
Task.WaitAll(waitables.ToArray(), TimeSpan.FromMilliseconds(1000));
110-
var keeponrunning = RemoveCompleteTasks(waitables);
111-
Interlocked.Exchange(ref _TestOnly_NonCompleteTaskCount, keeponrunning.Count);
112-
keeponrunning.ForEach((t) =>
113-
{
114-
// TODO: There is no way to abort a Task. As we don't construct the tasks, we can do nothing here
115-
// Option is: change the task factory "Func<Task> request" to a "Func<CancellationToken, Task> request"
116-
});
117-
}
103+
_enqueue.OnNext((type, name, request));
118104
}
119105

120-
private bool _disposed = false;
121106
public void Dispose()
122107
{
123108
if (_disposed) return;
124109
_disposed = true;
125-
_cancel.Cancel();
126-
_thread.Join();
127-
_cancel.Dispose();
110+
_disposable.Dispose();
128111
}
129112
}
130113
}

src/JsonRpc/RequestRouterBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public virtual async Task<ErrorResponse> RouteRequest(TDescriptor descriptor, Re
161161

162162
return new JsonRpc.Client.Response(request.Id, responseValue, request);
163163
}
164-
catch (TaskCanceledException)
164+
catch (OperationCanceledException)
165165
{
166166
_logger.LogDebug("Request {Id} was cancelled", id);
167167
return new RequestCancelled();

src/Protocol/Document/Server/ICompletionHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace OmniSharp.Extensions.LanguageServer.Protocol.Server
1212
[Parallel, Method(DocumentNames.Completion)]
1313
public interface ICompletionHandler : IJsonRpcRequestHandler<CompletionParams, CompletionList>, IRegistration<CompletionRegistrationOptions>, ICapability<CompletionCapability> { }
1414

15-
[Serial, Method(DocumentNames.CompletionResolve)]
15+
[Parallel, Method(DocumentNames.CompletionResolve)]
1616
public interface ICompletionResolveHandler : ICanBeResolvedHandler<CompletionItem> { }
1717

1818
public abstract class CompletionHandler : ICompletionHandler, ICompletionResolveHandler

src/Server/LanguageServer.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class LanguageServer : ILanguageServer, IInitializeHandler, IInitializedH
5454
private readonly SupportedCapabilities _supportedCapabilities;
5555
private Task _initializingTask;
5656
private readonly ILanguageServerConfiguration _configuration;
57+
private readonly int? _concurrency;
5758

5859
public static Task<ILanguageServer> From(Action<LanguageServerOptions> optionsAction)
5960
{
@@ -118,7 +119,8 @@ public static ILanguageServer PreInit(LanguageServerOptions options)
118119
options.AddDefaultLoggingProvider,
119120
options.ProgressManager,
120121
options.ServerInfo,
121-
options.ConfigurationBuilderAction
122+
options.ConfigurationBuilderAction,
123+
options.Concurrency
122124
);
123125
}
124126

@@ -143,7 +145,8 @@ internal LanguageServer(
143145
bool addDefaultLoggingProvider,
144146
ProgressManager progressManager,
145147
ServerInfo serverInfo,
146-
Action<IConfigurationBuilder> configurationBuilderAction)
148+
Action<IConfigurationBuilder> configurationBuilderAction,
149+
int? concurrency)
147150
{
148151
var outputHandler = new OutputHandler(output, serializer);
149152

@@ -234,7 +237,7 @@ internal LanguageServer(
234237

235238
var requestRouter = _serviceProvider.GetRequiredService<IRequestRouter<ILspHandlerDescriptor>>();
236239
_responseRouter = _serviceProvider.GetRequiredService<IResponseRouter>();
237-
_connection = ActivatorUtilities.CreateInstance<Connection>(_serviceProvider, input);
240+
_connection = ActivatorUtilities.CreateInstance<Connection>(_serviceProvider, input, concurrency);
238241

239242
_exitHandler = new ServerExitHandler(_shutdownHandler);
240243

src/Server/LanguageServerOptions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public LanguageServerOptions()
3838
internal Action<ILoggingBuilder> LoggingBuilderAction { get; set; } = new Action<ILoggingBuilder>(_ => { });
3939
internal Action<IConfigurationBuilder> ConfigurationBuilderAction { get; set; } = new Action<IConfigurationBuilder>(_ => { });
4040
internal bool AddDefaultLoggingProvider { get; set; }
41+
public int? Concurrency { get; set; }
4142

4243
internal readonly List<InitializeDelegate> InitializeDelegates = new List<InitializeDelegate>();
4344
internal readonly List<InitializedDelegate> InitializedDelegates = new List<InitializedDelegate>();

src/Server/LanguageServerOptionsExtensions.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,18 @@ public static LanguageServerOptions WithServerInfo(this LanguageServerOptions op
8686
return options;
8787
}
8888

89+
/// <summary>
90+
/// Set maximum number of allowed parallel actions
91+
/// </summary>
92+
/// <param name="options"></param>
93+
/// <param name="concurrency"></param>
94+
/// <returns></returns>
95+
public static LanguageServerOptions WithConcurrency(this LanguageServerOptions options, int? concurrency)
96+
{
97+
options.Concurrency = concurrency;
98+
return options;
99+
}
100+
89101
public static LanguageServerOptions OnInitialize(this LanguageServerOptions options, InitializeDelegate @delegate)
90102
{
91103
options.InitializeDelegates.Add(@delegate);

0 commit comments

Comments
 (0)