Skip to content

Commit bff256b

Browse files
authored
Merge pull request #33 from jacqueskang/cancellation-token
support graceful shutdown
2 parents ef30725 + 3965985 commit bff256b

File tree

6 files changed

+81
-34
lines changed

6 files changed

+81
-34
lines changed

src/IpcServiceSample.ConsoleServer/Program.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
using JKang.IpcServiceFramework;
33
using Microsoft.Extensions.DependencyInjection;
44
using Microsoft.Extensions.Logging;
5+
using System;
56
using System.Net;
7+
using System.Threading;
8+
using System.Threading.Tasks;
69

710
namespace IpcServiceSample.ConsoleServer
811
{
@@ -14,11 +17,18 @@ static void Main(string[] args)
1417
IServiceCollection services = ConfigureServices(new ServiceCollection());
1518

1619
// build and run service host
17-
new IpcServiceHostBuilder(services.BuildServiceProvider())
20+
IIpcServiceHost host = new IpcServiceHostBuilder(services.BuildServiceProvider())
1821
.AddNamedPipeEndpoint<IComputingService>("computingEndpoint", "pipeName")
1922
.AddTcpEndpoint<ISystemService>("systemEndpoint", IPAddress.Loopback, 45684)
20-
.Build()
21-
.Run();
23+
.Build();
24+
25+
var source = new CancellationTokenSource();
26+
Task.WaitAll(host.RunAsync(source.Token), Task.Run(() =>
27+
{
28+
Console.WriteLine("Press any key to shutdown.");
29+
Console.ReadKey();
30+
source.Cancel();
31+
}));
2232
}
2333

2434
private static IServiceCollection ConfigureServices(IServiceCollection services)
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1-
namespace JKang.IpcServiceFramework
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace JKang.IpcServiceFramework
25
{
36
public interface IIpcServiceHost
47
{
58
void Run();
9+
10+
Task RunAsync(CancellationToken cancellationToken = default(CancellationToken));
611
}
712
}

src/JKang.IpcServiceFramework.Server/IpcServiceEndpoint.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ protected IpcServiceEndpoint(string name, IServiceProvider serviceProvider)
2121
public string Name { get; }
2222
public IServiceProvider ServiceProvider { get; }
2323

24-
public abstract void Listen();
24+
public abstract Task ListenAsync(CancellationToken cancellationToken = default(CancellationToken));
2525
}
2626

2727
public abstract class IpcServiceEndpoint<TContract>: IpcServiceEndpoint

src/JKang.IpcServiceFramework.Server/IpcServiceHost.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78

89
namespace JKang.IpcServiceFramework
@@ -22,12 +23,15 @@ public IpcServiceHost(IEnumerable<IpcServiceEndpoint> endpoints, IServiceProvide
2223

2324
public void Run()
2425
{
25-
Parallel.ForEach(_endpoints, endpoint =>
26-
{
27-
_logger?.LogDebug($"Starting endpoint '{endpoint.Name}'...");
28-
endpoint.Listen();
29-
_logger?.LogDebug($"Endpoint '{endpoint.Name}' stopped.");
30-
});
26+
RunAsync().Wait();
27+
}
28+
29+
public Task RunAsync(CancellationToken cancellationToken = default(CancellationToken))
30+
{
31+
Task[] tasks = _endpoints
32+
.Select(endpoint => endpoint.ListenAsync(cancellationToken))
33+
.ToArray();
34+
return Task.WhenAll(tasks);
3135
}
3236
}
3337
}

src/JKang.IpcServiceFramework.Server/NamedPipe/NamedPipeIpcServiceEndpoint.cs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
using System;
44
using System.IO.Pipes;
55
using System.Threading;
6+
using System.Threading.Tasks;
67

78
namespace JKang.IpcServiceFramework.NamedPipe
89
{
910
public class NamedPipeIpcServiceEndpoint<TContract> : IpcServiceEndpoint<TContract>
10-
where TContract: class
11+
where TContract : class
1112
{
1213
private readonly ILogger<NamedPipeIpcServiceEndpoint<TContract>> _logger;
1314
private readonly NamedPipeOptions _options;
@@ -23,39 +24,51 @@ public NamedPipeIpcServiceEndpoint(string name, IServiceProvider serviceProvider
2324

2425
public string PipeName { get; }
2526

26-
public override void Listen()
27+
public override Task ListenAsync(CancellationToken cancellationToken = default(CancellationToken))
2728
{
2829
NamedPipeOptions options = ServiceProvider.GetRequiredService<NamedPipeOptions>();
2930

3031
var threads = new Thread[options.ThreadCount];
3132
for (int i = 0; i < threads.Length; i++)
3233
{
3334
threads[i] = new Thread(StartServerThread);
34-
threads[i].Start();
35+
threads[i].Start(cancellationToken);
3536
}
3637

37-
while (true)
38+
return Task.Factory.StartNew(() =>
3839
{
39-
Thread.Sleep(100);
40-
for (int i = 0; i < threads.Length; i++)
40+
_logger.LogDebug($"Listening named pipe endpoint '{Name}'...");
41+
while (!cancellationToken.IsCancellationRequested)
4142
{
42-
if (threads[i].Join(250))
43+
Thread.Sleep(100);
44+
45+
for (int i = 0; i < threads.Length; i++)
4346
{
44-
// thread is finished, starting a new thread
45-
threads[i] = new Thread(StartServerThread);
46-
threads[i].Start();
47+
if (threads[i].Join(250))
48+
{
49+
// thread is finished, starting a new thread
50+
threads[i] = new Thread(StartServerThread);
51+
threads[i].Start(cancellationToken);
52+
}
4753
}
4854
}
49-
}
55+
_logger.LogDebug($"Shutting down named pipe endpoint '{Name}'...");
56+
});
5057
}
5158

5259
private void StartServerThread(object obj)
5360
{
54-
using (var server = new NamedPipeServerStream(PipeName, PipeDirection.InOut, _options.ThreadCount))
61+
var token = (CancellationToken)obj;
62+
try
63+
{
64+
using (var server = new NamedPipeServerStream(PipeName, PipeDirection.InOut, _options.ThreadCount))
65+
{
66+
server.WaitForConnectionAsync().Wait(token);
67+
Task.Run(() => Process(server, _logger)).Wait(token);
68+
}
69+
}
70+
catch when (token.IsCancellationRequested)
5571
{
56-
server.WaitForConnection();
57-
58-
Process(server, _logger);
5972
}
6073
}
6174
}

src/JKang.IpcServiceFramework.Server/Tcp/TcpIpcServiceEndpoint.cs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
using System.Net.Sockets;
44
using Microsoft.Extensions.Logging;
55
using Microsoft.Extensions.DependencyInjection;
6+
using System.Threading;
7+
using System.Threading.Tasks;
68

79
namespace JKang.IpcServiceFramework.Tcp
810
{
911
public class TcpIpcServiceEndpoint<TContract> : IpcServiceEndpoint<TContract>
10-
where TContract: class
12+
where TContract : class
1113
{
1214
private readonly ILogger<TcpIpcServiceEndpoint<TContract>> _logger;
1315
private readonly TcpListener _listener;
@@ -19,18 +21,31 @@ public TcpIpcServiceEndpoint(String name, IServiceProvider serviceProvider, IPAd
1921
_logger = serviceProvider.GetService<ILogger<TcpIpcServiceEndpoint<TContract>>>();
2022
}
2123

22-
public override void Listen()
24+
public override Task ListenAsync(CancellationToken cancellationToken = default(CancellationToken))
2325
{
2426
_listener.Start();
2527

26-
_logger?.LogDebug("TCP listener started.");
28+
cancellationToken.Register(() =>
29+
{
30+
_logger.LogDebug($"Shutting down tcp endpoint '{Name}'...");
31+
_listener.Stop();
32+
});
2733

28-
while (true)
34+
return Task.Run(async () =>
2935
{
30-
TcpClient client = _listener.AcceptTcpClient();
31-
NetworkStream server = client.GetStream();
32-
Process(server, _logger);
33-
}
36+
try
37+
{
38+
_logger.LogDebug($"Listening tcp endpoint '{Name}'...");
39+
while (true)
40+
{
41+
TcpClient client = await _listener.AcceptTcpClientAsync();
42+
NetworkStream server = client.GetStream();
43+
Process(server, _logger);
44+
}
45+
}
46+
catch when (cancellationToken.IsCancellationRequested)
47+
{ }
48+
});
3449
}
3550
}
3651
}

0 commit comments

Comments
 (0)