Skip to content

Commit 2f7bbe8

Browse files
author
Jacques Kang
committed
support multi-threading
1 parent b5a8d9c commit 2f7bbe8

File tree

4 files changed

+81
-32
lines changed

4 files changed

+81
-32
lines changed

src/IpcServiceSample.ConsoleServer/Program.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ static void Main(string[] args)
1414
IServiceCollection services = ConfigureServices(new ServiceCollection());
1515
ServiceProvider serviceProvider = services.BuildServiceProvider();
1616

17-
// configure console logging
18-
serviceProvider.GetRequiredService<ILoggerFactory>()
19-
.AddConsole(LogLevel.Debug);
20-
2117
// TODO start IPC service host
2218
IpcServiceHostBuilder
2319
.Buid("pipeName", serviceProvider as IServiceProvider)
@@ -28,10 +24,17 @@ static void Main(string[] args)
2824
private static IServiceCollection ConfigureServices(IServiceCollection services)
2925
{
3026
services
31-
.AddLogging();
27+
.AddLogging(builder =>
28+
{
29+
builder.AddConsole();
30+
builder.SetMinimumLevel(LogLevel.Debug);
31+
});
3232

3333
services
34-
.AddIpc()
34+
.AddIpc(options =>
35+
{
36+
options.ThreadCount = 2;
37+
})
3538
.AddService<IComputingService, ComputingService>()
3639
;
3740

src/JKang.IpcServiceFramework.Server/IpcServiceHost.cs

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System;
66
using System.IO.Pipes;
77
using System.Reflection;
8+
using System.Threading;
89
using System.Threading.Tasks;
910

1011
namespace JKang.IpcServiceFramework
@@ -14,6 +15,7 @@ public class IpcServiceHost : IIpcServiceHost
1415
private readonly string _pipeName;
1516
private readonly IServiceProvider _serviceProvider;
1617
private readonly ILogger<IpcServiceHost> _logger;
18+
private readonly IpcServiceOptions _options;
1719
private readonly IIpcMessageSerializer _serializer;
1820
private readonly IValueConverter _converter;
1921

@@ -22,45 +24,68 @@ public IpcServiceHost(string pipeName, IServiceProvider serviceProvider)
2224
_pipeName = pipeName;
2325
_serviceProvider = serviceProvider;
2426
_logger = _serviceProvider.GetService<ILogger<IpcServiceHost>>();
27+
_options = _serviceProvider.GetRequiredService<IpcServiceOptions>();
2528
_serializer = _serviceProvider.GetRequiredService<IIpcMessageSerializer>();
2629
_converter = _serviceProvider.GetRequiredService<IValueConverter>();
2730
}
2831

2932
public void Start()
3033
{
31-
using (var server = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, 1))
32-
using (var writer = new IpcWriter(server, _serializer))
33-
using (var reader = new IpcReader(server, _serializer))
34+
Thread[] threads = new Thread[_options.ThreadCount];
35+
for (int i = 0; i < threads.Length; i++)
3436
{
35-
_logger?.LogInformation("IPC server started.");
36-
while (true)
37-
{
38-
_logger?.LogDebug("Waiting for connection...");
39-
server.WaitForConnection();
37+
threads[i] = new Thread(StartServerThread);
38+
threads[i].Start();
39+
}
40+
_logger?.LogInformation("IPC server started.");
4041

41-
try
42+
while (true)
43+
{
44+
Thread.Sleep(100);
45+
for (int i = 0; i < threads.Length; i++)
46+
{
47+
if (threads[i].Join(250))
4248
{
43-
_logger?.LogDebug("client connected, reading request...");
44-
IpcRequest request = reader.ReadIpcRequest();
49+
// thread is finished, starting a new thread
50+
threads[i] = new Thread(StartServerThread);
51+
threads[i].Start();
52+
}
53+
}
54+
}
55+
}
4556

46-
_logger?.LogDebug("request received, invoking corresponding method...");
47-
IpcResponse response;
48-
using (IServiceScope scope = _serviceProvider.CreateScope())
49-
{
50-
response = GetReponse(request, scope);
51-
}
57+
private void StartServerThread(object obj)
58+
{
59+
using (var server = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, _options.ThreadCount))
60+
using (var writer = new IpcWriter(server, _serializer))
61+
using (var reader = new IpcReader(server, _serializer))
62+
{
63+
server.WaitForConnection();
5264

53-
_logger?.LogDebug("sending response...");
54-
writer.Write(response);
65+
try
66+
{
67+
_logger?.LogDebug($"[thread {Thread.CurrentThread.ManagedThreadId}] client connected, reading request...");
68+
IpcRequest request = reader.ReadIpcRequest();
5569

56-
// disconnect client
57-
server.Disconnect();
58-
}
59-
catch (Exception ex)
70+
_logger?.LogDebug($"[thread {Thread.CurrentThread.ManagedThreadId}] request received, invoking corresponding method...");
71+
IpcResponse response;
72+
using (IServiceScope scope = _serviceProvider.CreateScope())
6073
{
61-
_logger?.LogError(ex, ex.Message);
62-
server.Disconnect();
74+
response = GetReponse(request, scope);
6375
}
76+
77+
_logger?.LogDebug($"[thread {Thread.CurrentThread.ManagedThreadId}] sending response...");
78+
writer.Write(response);
79+
80+
_logger?.LogDebug($"[thread {Thread.CurrentThread.ManagedThreadId}] done.");
81+
}
82+
catch (Exception ex)
83+
{
84+
_logger?.LogError(ex, ex.Message);
85+
}
86+
finally
87+
{
88+
server.Close();
6489
}
6590
}
6691
}
@@ -115,7 +140,7 @@ private IpcResponse GetReponse(IpcRequest request, IServiceScope scope)
115140
}
116141
else
117142
{
118-
return IpcResponse.Success(@return);
143+
return IpcResponse.Success(@return);
119144
}
120145
}
121146
catch (Exception ex)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace JKang.IpcServiceFramework
2+
{
3+
public class IpcServiceOptions
4+
{
5+
public int ThreadCount { get; set; } = 4;
6+
}
7+
}

src/JKang.IpcServiceFramework.Server/IpcServiceServiceCollectionExtensions.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,27 @@
11
using JKang.IpcServiceFramework.Services;
22
using Microsoft.Extensions.DependencyInjection;
3+
using System;
34

45
namespace JKang.IpcServiceFramework
56
{
67
public static class IpcServiceServiceCollectionExtensions
78
{
89
public static IIpcServiceCollection AddIpc(this IServiceCollection services)
10+
{
11+
return services.AddIpc(new IpcServiceOptions());
12+
}
13+
14+
public static IIpcServiceCollection AddIpc(this IServiceCollection services, Action<IpcServiceOptions> configure)
15+
{
16+
var options = new IpcServiceOptions();
17+
configure?.Invoke(options);
18+
return services.AddIpc(options);
19+
}
20+
21+
public static IIpcServiceCollection AddIpc(this IServiceCollection services, IpcServiceOptions options)
922
{
1023
services
24+
.AddSingleton(options)
1125
.AddScoped<IValueConverter, DefaultValueConverter>()
1226
.AddScoped<IIpcMessageSerializer, DefaultIpcMessageSerializer>()
1327
;

0 commit comments

Comments
 (0)