Skip to content

Commit 8f86aef

Browse files
authored
Merge pull request #164 from dotnet-campus/t/lindexi/Ipc
修复线程安全
2 parents 9705009 + 096c9fb commit 8f86aef

File tree

9 files changed

+115
-43
lines changed

9 files changed

+115
-43
lines changed

.github/workflows/dotnet-core.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
run: dotnet build --configuration Release
2424

2525
- name: Test
26-
run: dotnet test --configuration Release -v:n --tl:off --no-build
26+
run: dotnet test --configuration Release -v:n --tl:off --no-build --logger:"console;verbosity=detailed"
2727

2828
- name: Pack
2929
run: dotnet pack --configuration Release --no-build

src/dotnetCampus.Ipc/Context/ConnectExistsPeerResult.cs renamed to src/dotnetCampus.Ipc/Context/ConnectToExistingPeerResult.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ namespace dotnetCampus.Ipc.Context;
1313
/// <summary>
1414
/// 连接已经存在的 Peer 的结果
1515
/// </summary>
16-
public readonly struct ConnectExistsPeerResult
16+
public readonly struct ConnectToExistingPeerResult
1717
{
18-
internal ConnectExistsPeerResult(PeerProxy? peerProxy, Task peerConnectFinishedTask)
18+
internal ConnectToExistingPeerResult(PeerProxy? peerProxy, Task peerConnectFinishedTask)
1919
{
2020
PeerProxy = peerProxy;
2121
PeerConnectFinishedTask = peerConnectFinishedTask;
@@ -37,5 +37,5 @@ internal ConnectExistsPeerResult(PeerProxy? peerProxy, Task peerConnectFinishedT
3737
[MemberNotNullWhen(true, nameof(PeerProxy))]
3838
public bool IsSuccess => PeerProxy != null;
3939

40-
internal static ConnectExistsPeerResult Fail() => new ConnectExistsPeerResult(null, TaskUtils.CompletedTask);
40+
internal static ConnectToExistingPeerResult Fail() => new ConnectToExistingPeerResult(null, TaskUtils.CompletedTask);
4141
}

src/dotnetCampus.Ipc/Context/LoggingContext/LoggerEventIds.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ namespace dotnetCampus.Ipc.Context.LoggingContext;
44

55
internal static class LoggerEventIds
66
{
7+
public static EventId CommonDebugEventId => new EventId(0, "Debug");
8+
79
/// <summary>
810
/// 发送消息
911
/// </summary>

src/dotnetCampus.Ipc/Internals/PeerReConnector.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ private async Task<bool> TryReconnectAsync(IpcClientService ipcClientService)
6363
{
6464
try
6565
{
66-
return await ipcClientService.StartInternalAsync(isReConnect: true, shouldRegisterToPeer: true, onlyConnectExistsPeer: false);
66+
return await ipcClientService.StartInternalAsync(isReConnect: true, shouldRegisterToPeer: true, onlyConnectToExistingPeer: false);
6767
}
6868
// ## 此异常有两种
6969
catch (FileNotFoundException)

src/dotnetCampus.Ipc/Pipes/IpcClientService.cs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public NamedPipeClientStreamResult(Exception exception)
107107
/// <returns></returns>
108108
public async Task Start(bool shouldRegisterToPeer = true)
109109
{
110-
var result = await StartInternalAsync(isReConnect: false, shouldRegisterToPeer, onlyConnectExistsPeer: false/*不是只连接存在的对方,如果对方还不存在,则进行等待*/);
110+
var result = await StartInternalAsync(isReConnect: false, shouldRegisterToPeer, onlyConnectToExistingPeer: false/*不是只连接存在的对方,如果对方还不存在,则进行等待*/);
111111

112112
if (!result)
113113
{
@@ -121,22 +121,22 @@ public async Task Start(bool shouldRegisterToPeer = true)
121121
/// <returns></returns>
122122
internal Task<bool> TryConnectToExistingPeerAsync()
123123
{
124-
return StartInternalAsync(isReConnect: false, shouldRegisterToPeer: true, onlyConnectExistsPeer: true);
124+
return StartInternalAsync(isReConnect: false, shouldRegisterToPeer: false, onlyConnectToExistingPeer: true);
125125
}
126126

127127
/// <inheritdoc cref="Start"/>
128128
/// <param name="isReConnect">是否属于重新连接</param>
129129
/// <param name="shouldRegisterToPeer">是否需要向对方注册</param>
130-
/// <param name="onlyConnectExistsPeer">只连接存在的对方</param>
130+
/// <param name="onlyConnectToExistingPeer">只连接存在的对方</param>
131131
/// <returns>True:启动成功</returns>
132-
internal async Task<bool> StartInternalAsync(bool isReConnect, bool shouldRegisterToPeer, bool onlyConnectExistsPeer)
132+
internal async Task<bool> StartInternalAsync(bool isReConnect, bool shouldRegisterToPeer, bool onlyConnectToExistingPeer)
133133
{
134134
var localClient = IpcContext.PipeName;
135135
var remoteServer = PeerName;
136136

137137
Logger.Trace($"StartInternalAsync Connecting NamedPipe. LocalClient:'{localClient}';RemoteServer:'{remoteServer}'");
138138

139-
if (onlyConnectExistsPeer)
139+
if (onlyConnectToExistingPeer)
140140
{
141141
if (!PipeHelper.IsPipeExists(remoteServer))
142142
{
@@ -151,7 +151,7 @@ internal async Task<bool> StartInternalAsync(bool isReConnect, bool shouldRegist
151151

152152
try
153153
{
154-
var result = await ConnectNamedPipeAsync(isReConnect, namedPipeClientStream, onlyConnectExistsPeer);
154+
var result = await ConnectNamedPipeAsync(isReConnect, namedPipeClientStream, onlyConnectToExistingPeer);
155155
if (!result)
156156
{
157157
_namedPipeClientStreamTaskCompletionSource.TrySetResult(new NamedPipeClientStreamResult(namedPipeClientStream: null));
@@ -175,7 +175,7 @@ internal async Task<bool> StartInternalAsync(bool isReConnect, bool shouldRegist
175175
if (shouldRegisterToPeer)
176176
{
177177
// 启动之后,向对方注册,此时对方是服务器
178-
await RegisterToPeer();
178+
await RegisterToPeerAsync();
179179
}
180180

181181
return true;
@@ -186,17 +186,17 @@ internal async Task<bool> StartInternalAsync(bool isReConnect, bool shouldRegist
186186
/// </summary>
187187
/// <param name="isReConnect">是否属于重新连接</param>
188188
/// <param name="namedPipeClientStream"></param>
189-
/// <param name="onlyConnectExistsPeer">只连接存在的对方</param>
189+
/// <param name="onlyConnectToExistingPeer">只连接存在的对方</param>
190190
/// <returns>True 连接成功</returns>
191191
/// 独立方法,方便 dnspy 调试
192-
private async Task<bool> ConnectNamedPipeAsync(bool isReConnect, NamedPipeClientStream namedPipeClientStream, bool onlyConnectExistsPeer)
192+
private async Task<bool> ConnectNamedPipeAsync(bool isReConnect, NamedPipeClientStream namedPipeClientStream, bool onlyConnectToExistingPeer)
193193
{
194194
var connector = IpcContext.IpcClientPipeConnector;
195195

196196
if (connector == null)
197197
{
198198
var timeout = Timeout.Infinite;
199-
if (onlyConnectExistsPeer)
199+
if (onlyConnectToExistingPeer)
200200
{
201201
// 如果是只连接存在的对方的情况,即使对方存在,也需要设置一个短暂的超时时间
202202
// 为什么这里还需要设置超时时间,这是因为可能上一步判断 IsPipeExists 时,对方还是存在的,然而当前准备连接的时候,对方已经挂了,因此不能无限等待,需要设置一个短暂的时间
@@ -209,10 +209,11 @@ private async Task<bool> ConnectNamedPipeAsync(bool isReConnect, NamedPipeClient
209209
}
210210
catch (Exception e)
211211
{
212-
if (onlyConnectExistsPeer && e is IpcPipeConnectionException ipcPipeConnectionException)
212+
if (onlyConnectToExistingPeer && e is IpcPipeConnectionException ipcPipeConnectionException)
213213
{
214214
if (ipcPipeConnectionException.InnerException is TimeoutException)
215215
{
216+
// 如果是只连接存在的对方,且连接超时了,则直接返回 false 证明对方现在无法被连接上,正常就是对方不存在
216217
return false;
217218
}
218219
}
@@ -223,7 +224,7 @@ private async Task<bool> ConnectNamedPipeAsync(bool isReConnect, NamedPipeClient
223224
}
224225
else
225226
{
226-
return await CustomConnectNamedPipeAsync(connector, isReConnect, namedPipeClientStream, onlyConnectExistsPeer);
227+
return await CustomConnectNamedPipeAsync(connector, isReConnect, namedPipeClientStream, onlyConnectToExistingPeer);
227228
}
228229
}
229230

@@ -233,16 +234,16 @@ private async Task<bool> ConnectNamedPipeAsync(bool isReConnect, NamedPipeClient
233234
/// <param name="ipcClientPipeConnector"></param>
234235
/// <param name="isReConnect">是否属于重新连接</param>
235236
/// <param name="namedPipeClientStream"></param>
236-
/// <param name="onlyConnectExistsPeer"></param>
237+
/// <param name="onlyConnectToExistingPeer"></param>
237238
/// <returns></returns>
238239
private async Task<bool> CustomConnectNamedPipeAsync(IIpcClientPipeConnector ipcClientPipeConnector,
239240
bool isReConnect,
240-
NamedPipeClientStream namedPipeClientStream, bool onlyConnectExistsPeer)
241+
NamedPipeClientStream namedPipeClientStream, bool onlyConnectToExistingPeer)
241242
{
242243
Logger.Trace($"Connecting NamedPipe by {nameof(CustomConnectNamedPipeAsync)}. LocalClient:'{IpcContext.PipeName}';RemoteServer:'{PeerName}'");
243244
var cancellationToken = CancellationToken.None;
244245

245-
if (onlyConnectExistsPeer)
246+
if (onlyConnectToExistingPeer)
246247
{
247248
cancellationToken = new CancellationTokenSource(TimeSpan.FromMilliseconds(10)).Token;
248249
}
@@ -292,7 +293,7 @@ void ConnectNamedPipe()
292293
}
293294
}
294295

295-
private async Task RegisterToPeer()
296+
internal async Task RegisterToPeerAsync()
296297
{
297298
Logger.Trace($"[{nameof(IpcClientService)}] StartRegisterToPeer PipeName={IpcContext.PipeName}");
298299

src/dotnetCampus.Ipc/Pipes/IpcProvider.cs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55

66
using dotnetCampus.Ipc.CompilerServices.GeneratedProxies;
77
using dotnetCampus.Ipc.Context;
8+
using dotnetCampus.Ipc.Context.LoggingContext;
89
using dotnetCampus.Ipc.Exceptions;
910
using dotnetCampus.Ipc.Internals;
1011
using dotnetCampus.Ipc.Utils;
1112
using dotnetCampus.Ipc.Utils.Extensions;
13+
using dotnetCampus.Ipc.Utils.Logging;
1214

1315
namespace dotnetCampus.Ipc.Pipes
1416
{
@@ -80,7 +82,7 @@ public async void StartServer()
8082
var ipcServerService = new IpcServerService(IpcContext);
8183
_ipcServerService = ipcServerService;
8284

83-
ipcServerService.PeerConnected += NamedPipeServerStreamPoolPeerConnected;
85+
ipcServerService.PeerConnected += IpcServerService_OnPeerConnected;
8486

8587
// 以下的 Start 是一个循环,不会返回的
8688
await ipcServerService.Start().ConfigureAwait(false);
@@ -100,13 +102,17 @@ public async void StartServer()
100102
/// </summary>
101103
/// <param name="sender"></param>
102104
/// <param name="e"></param>
103-
private async void NamedPipeServerStreamPoolPeerConnected(object? sender, IpcInternalPeerConnectedArgs e)
105+
private async void IpcServerService_OnPeerConnected(object? sender, IpcInternalPeerConnectedArgs e)
104106
{
105107
try
106108
{
109+
IpcContext.Logger.Debug($"[OnPeerConnected]IpcProvider.OnPeerConnected PeerName={e.PeerName};CurrentName={IpcContext.PipeName}");
110+
107111
// 也许是对方反过来连接
108112
if (PeerManager.TryGetValue(e.PeerName, out var peerProxy))
109113
{
114+
IpcContext.Logger.Debug($"[OnPeerConnected]PeerManager.TryGetValue Success. PeerName={e.PeerName};CurrentName={IpcContext.PipeName}");
115+
110116
// 如果当前的 Peer 已断开且不需要重新连接,那么重新创建 Peer 反过来连接对方的服务器端
111117
if (peerProxy.IsBroken && !IpcContext.IpcConfiguration.AutoReconnectPeers)
112118
{
@@ -121,6 +127,8 @@ private async void NamedPipeServerStreamPoolPeerConnected(object? sender, IpcInt
121127
}
122128
else
123129
{
130+
IpcContext.Logger.Debug($"[OnPeerConnected]PeerManager.TryGetValue Fail. ConnectBackToPeer. PeerName={e.PeerName};CurrentName={IpcContext.PipeName}");
131+
124132
// 其他客户端连接,需要反过来连接对方的服务器端
125133
await ConnectBackToPeer(e);
126134
}
@@ -136,6 +144,8 @@ private async Task ConnectBackToPeer(IpcInternalPeerConnectedArgs e)
136144
{
137145
try
138146
{
147+
IpcContext.Logger.Debug($"[OnPeerConnected] ConnectBackToPeer. PeerName={e.PeerName};CurrentName={IpcContext.PipeName}");
148+
139149
await ConnectBackToPeerCore(e);
140150
}
141151
catch (ObjectDisposedException)
@@ -318,10 +328,10 @@ public async Task<PeerProxy> GetAndConnectToPeerAsync(string peerName)
318328
/// 尝试获取或连接到已经存在的 Peer 上。如果当前的 Peer 还没起来,则不等待连接,直接返回失败
319329
/// </summary>
320330
/// <param name="peerName">对方</param>
321-
/// <param name="shouldWaitPeerConnectFinished">是否应该等待对方连接回来完成,完全完成双向连接。如设置为 false 则需要自己通过 <see cref="ConnectExistsPeerResult.PeerConnectFinishedTask"/> 进行等待。默认为 true 表示等待所有准备完成再返回</param>
331+
/// <param name="shouldWaitPeerConnectFinished">是否应该等待对方连接回来完成,完全完成双向连接。如设置为 false 则需要自己通过 <see cref="ConnectToExistingPeerResult.PeerConnectFinishedTask"/> 进行等待。默认为 true 表示等待所有准备完成再返回</param>
322332
/// <returns></returns>
323333
/// 为什么会存在 <paramref name="shouldWaitPeerConnectFinished"/> 参数,这是为了解决极端情况下,刚好本进程能连接到对方,连接完成瞬间,对方挂了,无法反向连接回来的情况。正常不需要设置此参数
324-
public async Task<ConnectExistsPeerResult> TryConnectToExistingPeerAsync(string peerName, bool shouldWaitPeerConnectFinished = true)
334+
public async Task<ConnectToExistingPeerResult> TryConnectToExistingPeerAsync(string peerName, bool shouldWaitPeerConnectFinished = true)
325335
{
326336
if (PeerManager.TryGetValue(peerName, out var peerProxy))
327337
{
@@ -336,16 +346,22 @@ public async Task<ConnectExistsPeerResult> TryConnectToExistingPeerAsync(string
336346

337347
var ipcClientService = CreateIpcClientService(peerName);
338348

349+
// 尝试连接对方
350+
// 连接的时候不会立刻向对方注册自己,只是建立连接关系而已
351+
// 这样是因为一旦向对方注册自己,那对方将会反过来向自己注册。然而此时存在多线程安全问题,此时的 PeerManager 还没加入对方。导致以下代码里面的 PeerManager.WaitForPeerConnectFinishedAsync 无法完成等待,导致单元测试失败
339352
var result = await ipcClientService.TryConnectToExistingPeerAsync().ConfigureAwait(false);
340353
if (!result)
341354
{
342355
// 对方不存在
343-
return ConnectExistsPeerResult.Fail();
356+
return ConnectToExistingPeerResult.Fail();
344357
}
345358

346359
// 需要确定能连接上对方了,才能加入到 PeerManager 里面。确保不会在下次进来的时候,拿到了一个无法建立连接的 Peer 对象。这里的添加顺序是先确保连接再添加,这就意味着在并行的时候,可能会多次尝试连接。这是符合预期的,本身连接也没有多少损耗,最多只会多创建一个管道而已
347360
peerProxy = new PeerProxy(peerName, ipcClientService, IpcContext);
348361
PeerManager.TryAdd(peerProxy);
362+
363+
// 在 PeerProxy 加入到管理之后,才能向对方注册自己,确保对方收到注册之后,反过来向自己注册时,可以从管理里面拿到注册的对方信息,从而让 PeerManager.WaitForPeerConnectFinishedAsync 能够完成
364+
await ipcClientService.RegisterToPeerAsync();
349365
}
350366

351367
// 等待对方回连,建立双向连接
@@ -364,11 +380,11 @@ public async Task<ConnectExistsPeerResult> TryConnectToExistingPeerAsync(string
364380
catch (IpcPeerConnectionBrokenException e)
365381
{
366382
// 对方连接断开了
367-
return ConnectExistsPeerResult.Fail();
383+
return ConnectToExistingPeerResult.Fail();
368384
}
369385
}
370386

371-
return new ConnectExistsPeerResult(peerProxy, peerConnectFinishedTask);
387+
return new ConnectToExistingPeerResult(peerProxy, peerConnectFinishedTask);
372388
}
373389

374390
/// <summary>

src/dotnetCampus.Ipc/Utils/Extensions/LoggerExtensions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22

3+
using dotnetCampus.Ipc.Context.LoggingContext;
34
using dotnetCampus.Ipc.Utils.Logging;
45

56
namespace dotnetCampus.Ipc.Utils.Extensions
@@ -13,7 +14,7 @@ public static void Trace(this ILogger? logger, string message)
1314

1415
public static void Debug(this ILogger? logger, string message)
1516
{
16-
logger?.Log(LogLevel.Debug, default, message, null, FormatOnlyMessage);
17+
logger?.Log(LogLevel.Debug, LoggerEventIds.CommonDebugEventId, message, null, FormatOnlyMessage);
1718
}
1819

1920
public static void Information(this ILogger? logger, string message)

0 commit comments

Comments
 (0)