Skip to content

Commit a004474

Browse files
Added cancellation to all async methods that didn't already have it. (#225)
* Added cancellation tokens everywhere +semver:major * Added new methods for sending requests when using Mediatr typed request objects. Simplified extenison methods to use these new methods. Added method name to each params type. Added unit tests to validate that all param classes and handler interfaces have all the appropriate pieces (method, abstract handler, delegating handler, etc) * Added small logging change to show more information over state * Added xml confguration support
1 parent 6d014ba commit a004474

File tree

117 files changed

+1006
-257
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

117 files changed

+1006
-257
lines changed

sample/SampleServer/Program.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ static async Task MainAsync(string[] args)
7676
Section = "terminal",
7777
});
7878
})
79-
.OnInitialize(new InitializeDelegate(async (server, request) => {
79+
.OnInitialize(async (server, request, token) => {
8080
var manager = server.ProgressManager.WorkDone(request, new WorkDoneProgressBegin() {
8181
Title = "Server is starting...",
8282
Percentage = 10,
@@ -89,8 +89,8 @@ static async Task MainAsync(string[] args)
8989
Percentage = 20,
9090
Message = "loading in progress"
9191
});
92-
}))
93-
.OnInitialized(new InitializedDelegate(async (server, request, response) => {
92+
})
93+
.OnInitialized(async (server, request, response, token) => {
9494
workDone.OnNext(new WorkDoneProgressReport() {
9595
Percentage = 40,
9696
Message = "loading almost done",
@@ -102,9 +102,10 @@ static async Task MainAsync(string[] args)
102102
Message = "loading done",
103103
Percentage = 100,
104104
});
105-
}))
106-
.OnStarted(async (languageServer, result) => {
107-
using var manager = await languageServer.ProgressManager.Create(new WorkDoneProgressBegin() { Title = "Doing some work..." });
105+
workDone.OnCompleted();
106+
})
107+
.OnStarted(async (languageServer, result, token) => {
108+
using var manager = languageServer.ProgressManager.Create(new WorkDoneProgressBegin() { Title = "Doing some work..." });
108109

109110
manager.OnNext(new WorkDoneProgressReport() { Message = "doing things..." });
110111
await Task.Delay(10000);

src/Dap.Protocol/Requests/RunInTerminalExtensions.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1+
using System.Threading;
12
using System.Threading.Tasks;
23

34
namespace OmniSharp.Extensions.DebugAdapter.Protocol.Requests
45
{
56
public static class RunInTerminalExtensions
67
{
7-
public static Task<RunInTerminalResponse> RunInTerminal(this IDebugAdapterClient mediator, RunInTerminalArguments @params)
8+
public static Task<RunInTerminalResponse> RunInTerminal(this IDebugAdapterClient mediator, RunInTerminalArguments @params, CancellationToken cancellationToken = default)
89
{
9-
return mediator.SendRequest<RunInTerminalArguments, RunInTerminalResponse>(RequestNames.RunInTerminal, @params);
10+
return mediator.SendRequest(@params, cancellationToken);
1011
}
1112
}
1213

src/JsonRpc/CancelParams.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace OmniSharp.Extensions.JsonRpc
66
{
7+
[Method(JsonRpcNames.CancelRequest)]
78
public class CancelParams : IRequest
89
{
910
/// <summary>

src/JsonRpc/IOutputHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34

45
namespace OmniSharp.Extensions.JsonRpc
56
{
67
public interface IOutputHandler : IDisposable
78
{
89
void Start();
9-
void Send(object value);
10+
void Send(object value, CancellationToken cancellationToken);
1011
Task WaitForShutdown();
1112
}
1213
}

src/JsonRpc/IResponseRouter.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
using System.Threading;
12
using System.Threading.Tasks;
3+
using MediatR;
24
using Newtonsoft.Json.Linq;
35

46
namespace OmniSharp.Extensions.JsonRpc
@@ -7,9 +9,12 @@ public interface IResponseRouter
79
{
810
void SendNotification(string method);
911
void SendNotification<T>(string method, T @params);
10-
Task<TResponse> SendRequest<T, TResponse>(string method, T @params);
11-
Task<TResponse> SendRequest<TResponse>(string method);
12-
Task SendRequest<T>(string method, T @params);
12+
void SendNotification(IRequest @params);
13+
Task<TResponse> SendRequest<T, TResponse>(string method, T @params, CancellationToken cancellationToken);
14+
Task<TResponse> SendRequest<TResponse>(IRequest<TResponse> @params, CancellationToken cancellationToken);
15+
Task SendRequest(IRequest @params, CancellationToken cancellationToken);
16+
Task<TResponse> SendRequest<TResponse>(string method, CancellationToken cancellationToken);
17+
Task SendRequest<T>(string method, T @params, CancellationToken cancellationToken);
1318
TaskCompletionSource<JToken> GetRequest(long id);
1419
}
1520
}

src/JsonRpc/InputHandler.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private void ProcessInputStream()
104104

105105
if (length == 0 || length >= int.MaxValue)
106106
{
107-
HandleRequest(string.Empty);
107+
HandleRequest(string.Empty, CancellationToken.None);
108108
}
109109
else
110110
{
@@ -118,7 +118,7 @@ private void ProcessInputStream()
118118
}
119119
// TODO sometimes: encoding should be based on the respective header (including the wrong "utf8" value)
120120
var payload = System.Text.Encoding.UTF8.GetString(requestBuffer);
121-
HandleRequest(payload);
121+
HandleRequest(payload, CancellationToken.None);
122122
}
123123
}
124124
catch (IOException)
@@ -129,7 +129,7 @@ private void ProcessInputStream()
129129
}
130130
}
131131

132-
private void HandleRequest(string request)
132+
private void HandleRequest(string request, CancellationToken cancellationToken)
133133
{
134134
JToken payload;
135135
try
@@ -138,13 +138,13 @@ private void HandleRequest(string request)
138138
}
139139
catch
140140
{
141-
_outputHandler.Send(new ParseError());
141+
_outputHandler.Send(new ParseError(), cancellationToken);
142142
return;
143143
}
144144

145145
if (!_receiver.IsValid(payload))
146146
{
147-
_outputHandler.Send(new InvalidRequest());
147+
_outputHandler.Send(new InvalidRequest(), cancellationToken);
148148
return;
149149
}
150150

@@ -186,12 +186,12 @@ private void HandleRequest(string request)
186186
async () => {
187187
try
188188
{
189-
var result = await _requestRouter.RouteRequest(descriptor, item.Request, CancellationToken.None);
189+
var result = await _requestRouter.RouteRequest(descriptor, item.Request, cancellationToken);
190190
if (result.IsError && result.Error is RequestCancelled)
191191
{
192192
return;
193193
}
194-
_outputHandler.Send(result.Value);
194+
_outputHandler.Send(result.Value, cancellationToken);
195195
}
196196
catch (Exception e)
197197
{
@@ -230,7 +230,7 @@ private void HandleRequest(string request)
230230
if (item.IsError)
231231
{
232232
// TODO:
233-
_outputHandler.Send(item.Error);
233+
_outputHandler.Send(item.Error, cancellationToken);
234234
}
235235
}
236236

src/JsonRpc/JsonRpcServer.cs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,19 +180,34 @@ public void SendNotification<T>(string method, T @params)
180180
_responseRouter.SendNotification(method, @params);
181181
}
182182

183-
public Task<TResponse> SendRequest<T, TResponse>(string method, T @params)
183+
public void SendNotification(IRequest @params)
184184
{
185-
return _responseRouter.SendRequest<T, TResponse>(method, @params);
185+
_responseRouter.SendNotification(@params);
186186
}
187187

188-
public Task<TResponse> SendRequest<TResponse>(string method)
188+
public Task<TResponse> SendRequest<T, TResponse>(string method, T @params, CancellationToken cancellationToken)
189189
{
190-
return _responseRouter.SendRequest<TResponse>(method);
190+
return _responseRouter.SendRequest<T, TResponse>(method, @params, cancellationToken);
191191
}
192192

193-
public Task SendRequest<T>(string method, T @params)
193+
public Task<TResponse> SendRequest<TResponse>(IRequest<TResponse> @params, CancellationToken cancellationToken)
194194
{
195-
return _responseRouter.SendRequest(method, @params);
195+
return _responseRouter.SendRequest(@params, cancellationToken);
196+
}
197+
198+
public Task SendRequest(IRequest @params, CancellationToken cancellationToken)
199+
{
200+
return _responseRouter.SendRequest(@params, cancellationToken);
201+
}
202+
203+
public Task<TResponse> SendRequest<TResponse>(string method, CancellationToken cancellationToken)
204+
{
205+
return _responseRouter.SendRequest<TResponse>(method, cancellationToken);
206+
}
207+
208+
public Task SendRequest<T>(string method, T @params, CancellationToken cancellationToken)
209+
{
210+
return _responseRouter.SendRequest(method, @params, cancellationToken);
196211
}
197212

198213
public TaskCompletionSource<JToken> GetRequest(long id)

src/JsonRpc/OutputHandler.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,25 @@ public class OutputHandler : IOutputHandler
1818

1919
public OutputHandler(Stream output, ISerializer serializer)
2020
{
21-
if (!output.CanWrite) throw new ArgumentException($"must provide a writable stream for {nameof(output)}", nameof(output));
21+
if (!output.CanWrite)
22+
throw new ArgumentException($"must provide a writable stream for {nameof(output)}", nameof(output));
2223
_output = output;
2324
_serializer = serializer;
2425
_queue = new BlockingCollection<object>();
2526
_cancel = new CancellationTokenSource();
2627
_outputIsFinished = new TaskCompletionSource<object>();
27-
_thread = new Thread(ProcessOutputQueue) { IsBackground = true, Name = "ProcessOutputQueue" };
28+
_thread = new Thread(ProcessOutputQueue) {IsBackground = true, Name = "ProcessOutputQueue"};
2829
}
2930

3031
public void Start()
3132
{
3233
_thread.Start();
3334
}
3435

35-
public void Send(object value)
36+
public void Send(object value, CancellationToken cancellationToken)
3637
{
37-
_queue.Add(value);
38+
if (!cancellationToken.IsCancellationRequested)
39+
_queue.Add(value);
3840
}
3941

4042
private void ProcessOutputQueue()
@@ -62,7 +64,7 @@ private void ProcessOutputQueue()
6264
ms.Write(contentBytes, 0, contentBytes.Length);
6365
if (!token.IsCancellationRequested)
6466
{
65-
_output.Write(ms.ToArray(), 0, (int)ms.Position);
67+
_output.Write(ms.ToArray(), 0, (int) ms.Position);
6668
}
6769
}
6870

src/JsonRpc/ResponseRouter.cs

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
using System;
12
using System.Collections.Concurrent;
3+
using System.Reflection;
4+
using System.Threading;
25
using System.Threading.Tasks;
6+
using MediatR;
37
using Newtonsoft.Json.Linq;
48

59
namespace OmniSharp.Extensions.JsonRpc
@@ -10,6 +14,7 @@ public class ResponseRouter : IResponseRouter
1014
private readonly ISerializer _serializer;
1115
private readonly object _lock = new object();
1216
private readonly ConcurrentDictionary<long, TaskCompletionSource<JToken>> _requests = new ConcurrentDictionary<long, TaskCompletionSource<JToken>>();
17+
private static readonly ConcurrentDictionary<Type, string> _methodCache = new ConcurrentDictionary<Type, string>();
1318

1419
public ResponseRouter(IOutputHandler outputHandler, ISerializer serializer)
1520
{
@@ -21,41 +26,61 @@ public void SendNotification(string method)
2126
{
2227
_outputHandler.Send(new Client.Notification() {
2328
Method = method
24-
});
29+
}, CancellationToken.None);
2530
}
2631

2732
public void SendNotification<T>(string method, T @params)
2833
{
2934
_outputHandler.Send(new Client.Notification() {
3035
Method = method,
3136
Params = @params
32-
});
37+
}, CancellationToken.None);
3338
}
3439

35-
public async Task<TResponse> SendRequest<T, TResponse>(string method, T @params)
40+
public void SendNotification(IRequest @params)
41+
{
42+
SendNotification(GetMethodName(@params.GetType()), @params);
43+
}
44+
45+
public async Task<TResponse> SendRequest<T, TResponse>(string method, T @params, CancellationToken cancellationToken)
3646
{
3747
var tcs = new TaskCompletionSource<JToken>();
48+
3849
var nextId = _serializer.GetNextId();
3950
_requests.TryAdd(nextId, tcs);
4051

4152
_outputHandler.Send(new Client.Request() {
4253
Method = method,
4354
Params = @params,
4455
Id = nextId
45-
});
56+
}, cancellationToken);
4657

4758
try
4859
{
4960
var result = await tcs.Task;
61+
if (typeof(TResponse) == typeof(Unit))
62+
{
63+
return (TResponse)(object)Unit.Value;
64+
}
5065
return result.ToObject<TResponse>(_serializer.JsonSerializer);
5166
}
5267
finally
5368
{
54-
_requests.TryRemove(nextId, out var _);
69+
_requests.TryRemove(nextId, out _);
5570
}
5671
}
5772

58-
public async Task<TResponse> SendRequest<TResponse>(string method)
73+
public Task<TResponse> SendRequest<TResponse>(IRequest<TResponse> @params, CancellationToken cancellationToken)
74+
{
75+
return SendRequest<IRequest<TResponse>, TResponse>(GetMethodName(@params.GetType()), @params, cancellationToken);
76+
}
77+
78+
public Task SendRequest(IRequest @params, CancellationToken cancellationToken)
79+
{
80+
return SendRequest(GetMethodName(@params.GetType()), @params, cancellationToken);
81+
}
82+
83+
public async Task<TResponse> SendRequest<TResponse>(string method, CancellationToken cancellationToken)
5984
{
6085
var nextId = _serializer.GetNextId();
6186

@@ -66,7 +91,7 @@ public async Task<TResponse> SendRequest<TResponse>(string method)
6691
Method = method,
6792
Params = null,
6893
Id = nextId
69-
});
94+
}, cancellationToken);
7095

7196
try
7297
{
@@ -79,7 +104,7 @@ public async Task<TResponse> SendRequest<TResponse>(string method)
79104
}
80105
}
81106

82-
public async Task SendRequest<T>(string method, T @params)
107+
public async Task SendRequest<T>(string method, T @params, CancellationToken cancellationToken)
83108
{
84109
var nextId = _serializer.GetNextId();
85110

@@ -90,7 +115,7 @@ public async Task SendRequest<T>(string method, T @params)
90115
Method = method,
91116
Params = @params,
92117
Id = nextId
93-
});
118+
}, cancellationToken);
94119

95120
try
96121
{
@@ -107,5 +132,22 @@ public TaskCompletionSource<JToken> GetRequest(long id)
107132
_requests.TryGetValue(id, out var source);
108133
return source;
109134
}
135+
136+
private string GetMethodName(Type type)
137+
{
138+
if (!_methodCache.TryGetValue(type, out var methodName))
139+
{
140+
var attribute = type.GetCustomAttribute<MethodAttribute>(true);
141+
if (attribute == null)
142+
{
143+
throw new NotSupportedException($"Unable to infer method name for type {type.FullName}");
144+
}
145+
146+
methodName = attribute.Method;
147+
_methodCache.TryAdd(type, methodName);
148+
}
149+
150+
return methodName;
151+
}
110152
}
111153
}

src/Protocol/Client/Server/RegisterCapabilityExtensions.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using OmniSharp.Extensions.LanguageServer.Protocol.Models;
45
using OmniSharp.Extensions.LanguageServer.Protocol.Server;
@@ -9,9 +10,9 @@ namespace OmniSharp.Extensions.LanguageServer.Protocol.Server
910
{
1011
public static class RegisterCapabilityExtensions
1112
{
12-
public static async Task RegisterCapability(this ILanguageServerClient mediator, RegistrationParams @params)
13+
public static Task RegisterCapability(this ILanguageServerClient mediator, RegistrationParams @params, CancellationToken cancellationToken = default)
1314
{
14-
await mediator.SendRequest(ClientNames.RegisterCapability, @params);
15+
return mediator.SendRequest(@params, cancellationToken);
1516
}
1617
}
1718
}

0 commit comments

Comments
 (0)