Skip to content

Commit f24fe59

Browse files
authored
Merge pull request #34 from jacqueskang/cancellation-token
support cancellation token in IpcReader/IpcWriter
2 parents bff256b + 6b1e4f8 commit f24fe59

File tree

8 files changed

+101
-45
lines changed

8 files changed

+101
-45
lines changed

src/JKang.IpcServiceFramework.Client/IpcServiceClient.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.IO;
66
using System.Linq.Expressions;
7+
using System.Threading;
78
using System.Threading.Tasks;
89

910
namespace JKang.IpcServiceFramework
@@ -23,10 +24,11 @@ protected IpcServiceClient(
2324
_converter = converter;
2425
}
2526

26-
public async Task InvokeAsync(Expression<Action<TInterface>> exp)
27+
public async Task InvokeAsync(Expression<Action<TInterface>> exp,
28+
CancellationToken cancellationToken = default(CancellationToken))
2729
{
2830
IpcRequest request = GetRequest(exp, new MyInterceptor());
29-
IpcResponse response = await GetResponseAsync(request);
31+
IpcResponse response = await GetResponseAsync(request, cancellationToken);
3032

3133
if (response.Succeed)
3234
{
@@ -38,10 +40,11 @@ public async Task InvokeAsync(Expression<Action<TInterface>> exp)
3840
}
3941
}
4042

41-
public async Task<TResult> InvokeAsync<TResult>(Expression<Func<TInterface, TResult>> exp)
43+
public async Task<TResult> InvokeAsync<TResult>(Expression<Func<TInterface, TResult>> exp,
44+
CancellationToken cancellationToken = default(CancellationToken))
4245
{
4346
IpcRequest request = GetRequest(exp, new MyInterceptor<TResult>());
44-
IpcResponse response = await GetResponseAsync(request);
47+
IpcResponse response = await GetResponseAsync(request, cancellationToken);
4548

4649
if (response.Succeed)
4750
{
@@ -83,19 +86,19 @@ private static IpcRequest GetRequest(Expression exp, MyInterceptor interceptor)
8386
};
8487
}
8588

86-
protected abstract Task<Stream> ConnectToServerAsync();
89+
protected abstract Task<Stream> ConnectToServerAsync(CancellationToken cancellationToken);
8790

88-
private async Task<IpcResponse> GetResponseAsync(IpcRequest request)
91+
private async Task<IpcResponse> GetResponseAsync(IpcRequest request, CancellationToken cancellationToken)
8992
{
90-
using (Stream client = await ConnectToServerAsync())
93+
using (Stream client = await ConnectToServerAsync(cancellationToken))
9194
using (var writer = new IpcWriter(client, _serializer, leaveOpen: true))
9295
using (var reader = new IpcReader(client, _serializer, leaveOpen: true))
9396
{
9497
// send request
95-
writer.Write(request);
98+
await writer.WriteAsync(request, cancellationToken).ConfigureAwait(false);
9699

97100
// receive response
98-
return reader.ReadIpcResponse();
101+
return await reader.ReadIpcResponseAsync(cancellationToken).ConfigureAwait(false);
99102
}
100103
}
101104

src/JKang.IpcServiceFramework.Client/NamedPipe/NamedPipeIpcServiceClient.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using JKang.IpcServiceFramework.Services;
22
using System.IO;
33
using System.IO.Pipes;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace JKang.IpcServiceFramework.NamedPipe
@@ -16,10 +17,10 @@ public NamedPipeIpcServiceClient(IIpcMessageSerializer serializer, IValueConvert
1617
_pipeName = pipeName;
1718
}
1819

19-
protected override async Task<Stream> ConnectToServerAsync()
20+
protected override async Task<Stream> ConnectToServerAsync(CancellationToken cancellationToken)
2021
{
2122
var stream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, PipeOptions.None);
22-
await stream.ConnectAsync();
23+
await stream.ConnectAsync(cancellationToken);
2324
return stream;
2425
}
2526
}

src/JKang.IpcServiceFramework.Client/Tcp/TcpIpcServiceClient.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Net;
55
using System.Net.Sockets;
66
using System.Threading.Tasks;
7+
using System.Threading;
78

89
namespace JKang.IpcServiceFramework.Tcp
910
{
@@ -20,13 +21,28 @@ public TcpIpcServiceClient(IIpcMessageSerializer serializer, IValueConverter con
2021
_serverPort = serverPort;
2122
}
2223

23-
protected override async Task<Stream> ConnectToServerAsync()
24+
protected override async Task<Stream> ConnectToServerAsync(CancellationToken cancellationToken)
2425
{
26+
cancellationToken.ThrowIfCancellationRequested();
27+
2528
var client = new TcpClient();
26-
await client.ConnectAsync(_serverIp, _serverPort);
27-
var stream = client.GetStream();
29+
IAsyncResult result = client.BeginConnect(_serverIp, _serverPort, null, null);
30+
31+
await Task.Run(() =>
32+
{
33+
// poll every 1 second to check cancellation request
34+
while (!result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(1000), false))
35+
{
36+
if (cancellationToken.IsCancellationRequested)
37+
{
38+
client.Close();
39+
cancellationToken.ThrowIfCancellationRequested();
40+
}
41+
}
42+
client.EndConnect(result);
43+
});
2844

29-
return stream;
45+
return client.GetStream();
3046
}
3147
}
3248
}

src/JKang.IpcServiceFramework.Core/IO/IpcReader.cs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,49 @@
11
using System;
22
using System.IO;
3-
using System.Text;
3+
using System.Threading;
4+
using System.Threading.Tasks;
45

56
namespace JKang.IpcServiceFramework.IO
67
{
78
public class IpcReader : IDisposable
89
{
9-
private readonly BinaryReader _reader;
10+
private readonly byte[] _lengthBuffer = new byte[4];
11+
private readonly Stream _stream;
1012
private readonly IIpcMessageSerializer _serializer;
13+
private readonly bool _leaveOpen;
1114

1215
public IpcReader(Stream stream, IIpcMessageSerializer serializer)
1316
: this(stream, serializer, leaveOpen: false)
1417
{ }
1518

1619
public IpcReader(Stream stream, IIpcMessageSerializer serializer, bool leaveOpen)
1720
{
18-
_reader = new BinaryReader(stream, Encoding.UTF8, leaveOpen);
21+
_stream = stream;
1922
_serializer = serializer;
23+
_leaveOpen = leaveOpen;
2024
}
2125

22-
public IpcRequest ReadIpcRequest()
26+
public async Task<IpcRequest> ReadIpcRequestAsync(CancellationToken cancellationToken = default(CancellationToken))
2327
{
24-
byte[] binary = ReadMessage();
28+
byte[] binary = await ReadMessageAsync(cancellationToken);
2529
return _serializer.DeserializeRequest(binary);
2630
}
2731

28-
public IpcResponse ReadIpcResponse()
32+
public async Task<IpcResponse> ReadIpcResponseAsync(CancellationToken cancellationToken = default(CancellationToken))
2933
{
30-
byte[] binary = ReadMessage();
34+
byte[] binary = await ReadMessageAsync(cancellationToken);
3135
return _serializer.DeserializeResponse(binary);
3236
}
3337

34-
private byte[] ReadMessage()
38+
private async Task<byte[]> ReadMessageAsync(CancellationToken cancellationToken)
3539
{
36-
int length = _reader.ReadInt32();
37-
return _reader.ReadBytes(length);
40+
await _stream.ReadAsync(_lengthBuffer, 0, _lengthBuffer.Length, cancellationToken);
41+
int length = _lengthBuffer[0] | _lengthBuffer[1] << 8 | _lengthBuffer[2] << 16 | _lengthBuffer[3] << 24;
42+
43+
byte[] bytes = new byte[length];
44+
await _stream.ReadAsync(bytes, 0, length, cancellationToken);
45+
46+
return bytes;
3847
}
3948

4049
#region IDisposible
@@ -56,7 +65,10 @@ protected virtual void Dispose(bool disposing)
5665

5766
if (disposing)
5867
{
59-
_reader.Dispose();
68+
if (!_leaveOpen)
69+
{
70+
_stream.Dispose();
71+
}
6072
}
6173

6274
_disposed = true;

src/JKang.IpcServiceFramework.Core/IO/IpcWriter.cs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,52 @@
11
using System;
22
using System.IO;
3-
using System.Text;
3+
using System.Threading;
4+
using System.Threading.Tasks;
45

56
namespace JKang.IpcServiceFramework.IO
67
{
78
public class IpcWriter : IDisposable
89
{
9-
private readonly BinaryWriter _writer;
10+
private readonly byte[] _lengthBuffer = new byte[4];
11+
private readonly Stream _stream;
1012
private readonly IIpcMessageSerializer _serializer;
13+
private readonly bool _leaveOpen;
1114

1215
public IpcWriter(Stream stream, IIpcMessageSerializer serializer)
1316
: this(stream, serializer, leaveOpen: false)
1417
{ }
1518

1619
public IpcWriter(Stream stream, IIpcMessageSerializer serializer, bool leaveOpen)
1720
{
18-
_writer = new BinaryWriter(stream, Encoding.UTF8, leaveOpen);
21+
_stream = stream;
1922
_serializer = serializer;
23+
_leaveOpen = leaveOpen;
2024
}
2125

22-
public void Write(IpcRequest request)
26+
public async Task WriteAsync(IpcRequest request,
27+
CancellationToken cancellationToken = default(CancellationToken))
2328
{
2429
byte[] binary = _serializer.SerializeRequest(request);
25-
WriteMessage(binary);
30+
await WriteMessageAsync(binary, cancellationToken);
2631
}
2732

28-
public void Write(IpcResponse response)
33+
public async Task WriteAsync(IpcResponse response,
34+
CancellationToken cancellationToken = default(CancellationToken))
2935
{
3036
byte[] binary = _serializer.SerializeResponse(response);
31-
WriteMessage(binary);
37+
await WriteMessageAsync(binary, cancellationToken);
3238
}
3339

34-
private void WriteMessage(byte[] binary)
40+
private async Task WriteMessageAsync(byte[] binary, CancellationToken cancellationToken)
3541
{
36-
_writer.Write(binary.Length);
37-
_writer.Write(binary);
42+
int length = binary.Length;
43+
_lengthBuffer[0] = (byte)length;
44+
_lengthBuffer[1] = (byte)(length >> 8);
45+
_lengthBuffer[2] = (byte)(length >> 16);
46+
_lengthBuffer[3] = (byte)(length >> 24);
47+
48+
await _stream.WriteAsync(_lengthBuffer, 0, _lengthBuffer.Length, cancellationToken);
49+
await _stream.WriteAsync(binary, 0, binary.Length, cancellationToken);
3850
}
3951

4052
#region IDisposible
@@ -56,7 +68,10 @@ protected virtual void Dispose(bool disposing)
5668

5769
if (disposing)
5870
{
59-
_writer.Dispose();
71+
if (!_leaveOpen)
72+
{
73+
_stream.Dispose();
74+
}
6075
}
6176

6277
_disposed = true;

src/JKang.IpcServiceFramework.Server/IpcServiceEndpoint.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ protected IpcServiceEndpoint(string name, IServiceProvider serviceProvider)
2424
public abstract Task ListenAsync(CancellationToken cancellationToken = default(CancellationToken));
2525
}
2626

27-
public abstract class IpcServiceEndpoint<TContract>: IpcServiceEndpoint
28-
where TContract: class
27+
public abstract class IpcServiceEndpoint<TContract> : IpcServiceEndpoint
28+
where TContract : class
2929
{
3030
private readonly IValueConverter _converter;
3131
private readonly IIpcMessageSerializer _serializer;
@@ -37,15 +37,22 @@ protected IpcServiceEndpoint(string name, IServiceProvider serviceProvider)
3737
_serializer = serviceProvider.GetRequiredService<IIpcMessageSerializer>();
3838
}
3939

40-
protected void Process(Stream server, ILogger logger)
40+
protected async Task ProcessAsync(Stream server, ILogger logger, CancellationToken cancellationToken)
4141
{
4242
using (var writer = new IpcWriter(server, _serializer, leaveOpen: true))
4343
using (var reader = new IpcReader(server, _serializer, leaveOpen: true))
4444
{
4545
try
4646
{
47+
if (cancellationToken.IsCancellationRequested)
48+
{
49+
return;
50+
}
51+
4752
logger?.LogDebug($"[thread {Thread.CurrentThread.ManagedThreadId}] client connected, reading request...");
48-
IpcRequest request = reader.ReadIpcRequest();
53+
IpcRequest request = await reader.ReadIpcRequestAsync(cancellationToken).ConfigureAwait(false);
54+
55+
cancellationToken.ThrowIfCancellationRequested();
4956

5057
logger?.LogDebug($"[thread {Thread.CurrentThread.ManagedThreadId}] request received, invoking corresponding method...");
5158
IpcResponse response;
@@ -54,8 +61,10 @@ protected void Process(Stream server, ILogger logger)
5461
response = GetReponse(request, scope);
5562
}
5663

64+
cancellationToken.ThrowIfCancellationRequested();
65+
5766
logger?.LogDebug($"[thread {Thread.CurrentThread.ManagedThreadId}] sending response...");
58-
writer.Write(response);
67+
await writer.WriteAsync(response, cancellationToken).ConfigureAwait(false);
5968

6069
logger?.LogDebug($"[thread {Thread.CurrentThread.ManagedThreadId}] done.");
6170
}

src/JKang.IpcServiceFramework.Server/NamedPipe/NamedPipeIpcServiceEndpoint.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ private void StartServerThread(object obj)
6363
{
6464
using (var server = new NamedPipeServerStream(PipeName, PipeDirection.InOut, _options.ThreadCount))
6565
{
66-
server.WaitForConnectionAsync().Wait(token);
67-
Task.Run(() => Process(server, _logger)).Wait(token);
66+
server.WaitForConnectionAsync(token).Wait();
67+
ProcessAsync(server, _logger, token).Wait();
6868
}
6969
}
7070
catch when (token.IsCancellationRequested)

src/JKang.IpcServiceFramework.Server/Tcp/TcpIpcServiceEndpoint.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public TcpIpcServiceEndpoint(String name, IServiceProvider serviceProvider, IPAd
4040
{
4141
TcpClient client = await _listener.AcceptTcpClientAsync();
4242
NetworkStream server = client.GetStream();
43-
Process(server, _logger);
43+
await ProcessAsync(server, _logger, cancellationToken);
4444
}
4545
}
4646
catch when (cancellationToken.IsCancellationRequested)

0 commit comments

Comments
 (0)