@@ -63,6 +63,7 @@ public abstract class TcpSocketClientBase(SocketClientOptions options) : ITcpSoc
6363 private IPEndPoint ? _remoteEndPoint ;
6464 private IPEndPoint ? _localEndPoint ;
6565 private CancellationTokenSource ? _receiveCancellationTokenSource ;
66+ private CancellationTokenSource ? _autoConnectTokenSource ;
6667
6768 /// <summary>
6869 /// <inheritdoc/>
@@ -72,39 +73,21 @@ public abstract class TcpSocketClientBase(SocketClientOptions options) : ITcpSoc
7273 /// <returns></returns>
7374 public async ValueTask < bool > ConnectAsync ( IPEndPoint endPoint , CancellationToken token = default )
7475 {
76+ if ( IsConnected )
77+ {
78+ return true ;
79+ }
80+
7581 var ret = false ;
7682 SocketClientProvider = ServiceProvider ? . GetRequiredService < ISocketClientProvider > ( )
7783 ?? throw new InvalidOperationException ( "SocketClientProvider is not registered in the service provider." ) ;
7884
7985 try
8086 {
81- // 释放资源
82- await CloseAsync ( ) ;
83-
84- // 创建新的 TcpClient 实例
85- SocketClientProvider . LocalEndPoint = Options . LocalEndPoint ;
86-
87- _localEndPoint = Options . LocalEndPoint ;
88- _remoteEndPoint = null ;
89-
90- var connectionToken = token ;
91- if ( Options . ConnectTimeout > 0 )
87+ ret = await ConnectCoreAsync ( SocketClientProvider , endPoint , token ) ;
88+ if ( ret == false )
9289 {
93- // 设置连接超时时间
94- var connectTokenSource = new CancellationTokenSource ( options . ConnectTimeout ) ;
95- connectionToken = CancellationTokenSource . CreateLinkedTokenSource ( token , connectTokenSource . Token ) . Token ;
96- }
97- ret = await SocketClientProvider . ConnectAsync ( endPoint , connectionToken ) ;
98-
99- if ( ret )
100- {
101- _localEndPoint = SocketClientProvider . LocalEndPoint ;
102- _remoteEndPoint = endPoint ;
103-
104- if ( options . IsAutoReceive )
105- {
106- _ = Task . Run ( AutoReceiveAsync , token ) ;
107- }
90+ Reconnect ( ) ;
10891 }
10992 }
11093 catch ( OperationCanceledException ex )
@@ -116,15 +99,75 @@ public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken
11699 else
117100 {
118101 Log ( LogLevel . Warning , ex , $ "TCP Socket connect operation timed out from { LocalEndPoint } to { endPoint } ") ;
102+ Reconnect ( ) ;
119103 }
120104 }
121105 catch ( Exception ex )
122106 {
123107 Log ( LogLevel . Error , ex , $ "TCP Socket connection failed from { LocalEndPoint } to { endPoint } ") ;
108+ Reconnect ( ) ;
109+ }
110+ return ret ;
111+ }
112+
113+ private void Reconnect ( )
114+ {
115+ if ( options . IsAutoReconnect )
116+ {
117+ Task . Run ( async ( ) =>
118+ {
119+ try
120+ {
121+ _autoConnectTokenSource ??= new ( ) ;
122+ await Task . Delay ( options . ReconnectInterval , _autoConnectTokenSource . Token ) . ConfigureAwait ( false ) ;
123+ HandleDisconnection ( ) ;
124+ }
125+ catch { }
126+ } , CancellationToken . None ) . ConfigureAwait ( false ) ;
127+ }
128+ }
129+
130+ private async ValueTask < bool > ConnectCoreAsync ( ISocketClientProvider provider , IPEndPoint endPoint , CancellationToken token )
131+ {
132+ // 释放资源
133+ await CloseAsync ( ) ;
134+
135+ // 创建新的 TcpClient 实例
136+ provider . LocalEndPoint = Options . LocalEndPoint ;
137+
138+ _localEndPoint = Options . LocalEndPoint ;
139+ _remoteEndPoint = endPoint ;
140+
141+ var connectionToken = token ;
142+ if ( Options . ConnectTimeout > 0 )
143+ {
144+ // 设置连接超时时间
145+ var connectTokenSource = new CancellationTokenSource ( options . ConnectTimeout ) ;
146+ connectionToken = CancellationTokenSource . CreateLinkedTokenSource ( token , connectTokenSource . Token ) . Token ;
147+ }
148+ var ret = await provider . ConnectAsync ( endPoint , connectionToken ) ;
149+
150+ if ( ret )
151+ {
152+ _localEndPoint = provider . LocalEndPoint ;
153+
154+ if ( options . IsAutoReceive )
155+ {
156+ _ = Task . Run ( AutoReceiveAsync , CancellationToken . None ) . ConfigureAwait ( false ) ;
157+ }
124158 }
125159 return ret ;
126160 }
127161
162+ private void HandleDisconnection ( )
163+ {
164+ if ( options . IsAutoReconnect && _remoteEndPoint != null )
165+ {
166+ _autoConnectTokenSource ??= new ( ) ;
167+ _ = Task . Run ( ( ) => ConnectAsync ( _remoteEndPoint , _autoConnectTokenSource . Token ) ) . ConfigureAwait ( false ) ;
168+ }
169+ }
170+
128171 /// <summary>
129172 /// <inheritdoc/>
130173 /// </summary>
@@ -149,6 +192,10 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
149192 sendToken = CancellationTokenSource . CreateLinkedTokenSource ( token , sendTokenSource . Token ) . Token ;
150193 }
151194 ret = await SocketClientProvider . SendAsync ( data , sendToken ) ;
195+ if ( ret == false )
196+ {
197+ HandleDisconnection ( ) ;
198+ }
152199 }
153200 catch ( OperationCanceledException ex )
154201 {
@@ -164,6 +211,13 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
164211 catch ( Exception ex )
165212 {
166213 Log ( LogLevel . Error , ex , $ "TCP Socket send failed from { _localEndPoint } to { _remoteEndPoint } ") ;
214+
215+ HandleDisconnection ( ) ;
216+ }
217+
218+ if ( options . EnableLog )
219+ {
220+ Log ( LogLevel . Information , null , $ "Sending data from { _localEndPoint } to { _remoteEndPoint } , Data Length: { data . Length } Data Content: { BitConverter . ToString ( data . ToArray ( ) ) } Result: { ret } ") ;
167221 }
168222 return ret ;
169223 }
@@ -188,14 +242,18 @@ public virtual async ValueTask<Memory<byte>> ReceiveAsync(CancellationToken toke
188242 using var block = MemoryPool < byte > . Shared . Rent ( options . ReceiveBufferSize ) ;
189243 var buffer = block . Memory ;
190244 var len = await ReceiveCoreAsync ( SocketClientProvider , buffer , token ) ;
245+ if ( len == 0 )
246+ {
247+ HandleDisconnection ( ) ;
248+ }
191249 return buffer [ ..len ] ;
192250 }
193251
194252 private async ValueTask AutoReceiveAsync ( )
195253 {
196254 // 自动接收方法
197255 _receiveCancellationTokenSource ??= new ( ) ;
198- while ( true )
256+ while ( _receiveCancellationTokenSource is { IsCancellationRequested : false } )
199257 {
200258 if ( SocketClientProvider is not { IsConnected : true } )
201259 {
@@ -211,6 +269,8 @@ private async ValueTask AutoReceiveAsync()
211269 break ;
212270 }
213271 }
272+
273+ HandleDisconnection ( ) ;
214274 }
215275
216276 private async ValueTask < int > ReceiveCoreAsync ( ISocketClientProvider client , Memory < byte > buffer , CancellationToken token )
@@ -231,16 +291,19 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
231291 {
232292 // 远端主机关闭链路
233293 Log ( LogLevel . Information , null , $ "TCP Socket { _localEndPoint } received 0 data closed by { _remoteEndPoint } ") ;
294+ buffer = Memory < byte > . Empty ;
295+
296+ HandleDisconnection ( ) ;
234297 }
235298 else
236299 {
237300 buffer = buffer [ ..len ] ;
301+ }
238302
239- if ( ReceivedCallBack != null )
240- {
241- // 如果订阅回调则触发回调
242- await ReceivedCallBack ( buffer ) ;
243- }
303+ if ( ReceivedCallBack != null )
304+ {
305+ // 如果订阅回调则触发回调
306+ await ReceivedCallBack ( buffer ) ;
244307 }
245308 }
246309 catch ( OperationCanceledException ex )
@@ -257,6 +320,13 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
257320 catch ( Exception ex )
258321 {
259322 Log ( LogLevel . Error , ex , $ "TCP Socket receive failed from { _localEndPoint } to { _remoteEndPoint } ") ;
323+
324+ HandleDisconnection ( ) ;
325+ }
326+
327+ if ( options . EnableLog )
328+ {
329+ Log ( LogLevel . Information , null , $ "Receiving data from { _localEndPoint } to { _remoteEndPoint } , Data Length: { len } Data Content: { BitConverter . ToString ( buffer . ToArray ( ) ) } ") ;
260330 }
261331 return len ;
262332 }
@@ -273,9 +343,19 @@ protected virtual void Log(LogLevel logLevel, Exception? ex, string? message)
273343 /// <summary>
274344 /// <inheritdoc/>
275345 /// </summary>
276- public virtual ValueTask CloseAsync ( )
346+ public virtual async ValueTask CloseAsync ( )
277347 {
278- return DisposeAsync ( true ) ;
348+ // 取消接收数据的任务
349+ if ( _receiveCancellationTokenSource != null )
350+ {
351+ _receiveCancellationTokenSource . Cancel ( ) ;
352+ _receiveCancellationTokenSource . Dispose ( ) ;
353+ _receiveCancellationTokenSource = null ;
354+ }
355+ if ( SocketClientProvider != null )
356+ {
357+ await SocketClientProvider . CloseAsync ( ) ;
358+ }
279359 }
280360
281361 /// <summary>
@@ -290,18 +370,15 @@ protected virtual async ValueTask DisposeAsync(bool disposing)
290370 {
291371 if ( disposing )
292372 {
293- // 取消接收数据的任务
294- if ( _receiveCancellationTokenSource != null )
373+ // 取消重连任务
374+ if ( _autoConnectTokenSource != null )
295375 {
296- _receiveCancellationTokenSource . Cancel ( ) ;
297- _receiveCancellationTokenSource . Dispose ( ) ;
298- _receiveCancellationTokenSource = null ;
376+ _autoConnectTokenSource . Cancel ( ) ;
377+ _autoConnectTokenSource . Dispose ( ) ;
378+ _autoConnectTokenSource = null ;
299379 }
300380
301- if ( SocketClientProvider != null )
302- {
303- await SocketClientProvider . CloseAsync ( ) ;
304- }
381+ await CloseAsync ( ) ;
305382 }
306383 }
307384
0 commit comments