@@ -36,13 +36,18 @@ protected virtual void OnWriteError(Exception exception)
3636 byte [ ] encryptBuffer1 = null ;
3737 byte [ ] encryptBuffer2 = null ;
3838
39- private async Task writePackageBuffer ( Stream stream , PipeReader currentReader , QpPackageType packageType , ReadOnlySequence < byte > packageBodyBuffer , Action afterSendHandler )
39+ private async Task writePackageBuffer ( PipeReader currentReader , QpPackageType packageType , int packageBodyLength , bool ignoreCompressAndEncrypt = false )
4040 {
41+ var stream = QpPackageHandler_Stream ;
42+
43+ var readRet = await currentReader . ReadAtLeastAsync ( packageBodyLength ) ;
44+ var packageBodyBuffer = readRet . Buffer ;
45+
4146 var packageTotalLength = 0 ;
4247 Memory < byte > packageHeadMemory = default ;
4348
4449 //如果压缩或者加密
45- if ( options . InternalCompress || options . InternalEncrypt )
50+ if ( ! ignoreCompressAndEncrypt && ( options . InternalCompress || options . InternalEncrypt ) )
4651 {
4752 //如果压缩
4853 if ( options . InternalCompress )
@@ -51,12 +56,12 @@ private async Task writePackageBuffer(Stream stream,PipeReader currentReader, Qp
5156 writeCompressPipe = new Pipe ( ) ;
5257 using ( var inStream = packageBodyBuffer . AsStream ( ) )
5358 using ( var outStream = writeCompressPipe . Writer . AsStream ( true ) )
54- using ( var gzStream = new GZipStream ( outStream , CompressionMode . Compress , true ) )
59+ using ( var gzStream = new GZipStream ( outStream , CompressionMode . Compress , true ) )
5560 await inStream . CopyToAsync ( gzStream ) . ConfigureAwait ( false ) ;
5661
5762 currentReader . AdvanceTo ( packageBodyBuffer . End ) ;
5863
59- var readRet = await writeCompressPipe . Reader . ReadAsync ( ) . ConfigureAwait ( false ) ;
64+ readRet = await writeCompressPipe . Reader . ReadAsync ( ) . ConfigureAwait ( false ) ;
6065 packageBodyBuffer = readRet . Buffer ;
6166
6267 //包总长度
@@ -114,10 +119,10 @@ private async Task writePackageBuffer(Stream stream,PipeReader currentReader, Qp
114119 }
115120 _ = Task . Run ( async ( ) =>
116121 {
117- await encryptPipe . Writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
122+ await encryptPipe . Writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
118123 } ) ;
119124
120- var readRet = await encryptPipe . Reader . ReadAtLeastAsync ( packageTotalLength ) . ConfigureAwait ( false ) ;
125+ readRet = await encryptPipe . Reader . ReadAtLeastAsync ( packageTotalLength ) . ConfigureAwait ( false ) ;
121126 currentReader . AdvanceTo ( packageBodyBuffer . End ) ;
122127
123128 packageBodyBuffer = readRet . Buffer ;
@@ -128,7 +133,7 @@ private async Task writePackageBuffer(Stream stream,PipeReader currentReader, Qp
128133 packageHeadMemory = new Memory < byte > ( sendHeadBuffer , 0 , PACKAGE_TOTAL_LENGTH_LENGTH ) ;
129134 currentReader = encryptPipe . Reader ;
130135 }
131- catch ( Exception ex )
136+ catch ( Exception ex )
132137 {
133138 throw new IOException ( "发送数据加密时出错" , ex ) ;
134139 }
@@ -142,8 +147,6 @@ private async Task writePackageBuffer(Stream stream,PipeReader currentReader, Qp
142147 writePackageTotalLengthToBuffer ( sendHeadBuffer , 0 , packageTotalLength ) ;
143148 packageHeadMemory = new Memory < byte > ( sendHeadBuffer , 0 , PACKAGE_TOTAL_LENGTH_LENGTH ) ;
144149 }
145- //执行AfterSendHandler
146- afterSendHandler ? . Invoke ( ) ;
147150 //写入包头
148151 await stream . WriteAsync ( packageHeadMemory ) . ConfigureAwait ( false ) ;
149152
@@ -188,38 +191,19 @@ private static void writePackageTotalLengthToBuffer(byte[] buffer, int offset, i
188191 Array . Reverse ( buffer , offset , sizeof ( int ) ) ;
189192 }
190193
191- private async Task writePackageAsync ( Func < PipeWriter , Task < Tuple < QpPackageType , int > > > getPackagePayloadFunc , Action afterSendHandler )
194+ private async Task UseSendPipe ( Func < Pipe , Task > handler )
192195 {
196+ if ( options . EnableNetstat )
197+ Interlocked . Increment ( ref PackageSendQueueCount ) ;
198+ await sendLock . WaitAsync ( 5000 ) . ConfigureAwait ( false ) ;
193199 try
194200 {
195- if ( options . EnableNetstat )
196- Interlocked . Increment ( ref PackageSendQueueCount ) ;
197- await sendLock . WaitAsync ( 5000 ) . ConfigureAwait ( false ) ;
198- var stream = QpPackageHandler_Stream ;
199- if ( stream == null )
200- throw new IOException ( "Connection is disconnected." ) ;
201-
202- var ret = await getPackagePayloadFunc ( sendPipe . Writer ) . ConfigureAwait ( false ) ;
203- var packageType = ret . Item1 ;
204- var packageBodyLength = ret . Item2 ;
205- var packageTotalLength = packageBodyLength + PACKAGE_TOTAL_LENGTH_LENGTH ;
206-
207- if ( packageTotalLength < PACKAGE_HEAD_LENGTH )
208- throw new IOException ( $ "包大小[{ packageTotalLength } ]小于包头长度[{ PACKAGE_HEAD_LENGTH } ]") ;
209-
210- var readRet = await sendPipe . Reader . ReadAtLeastAsync ( packageBodyLength ) . ConfigureAwait ( false ) ;
211- var packageBuffer = readRet . Buffer ;
212- await writePackageBuffer ( stream ,
213- sendPipe . Reader ,
214- packageType ,
215- packageBuffer ,
216- afterSendHandler ) . ConfigureAwait ( false ) ;
217- lastSendPackageTime = DateTime . Now ;
201+ await handler ( sendPipe ) ;
218202 }
219203 catch ( Exception ex )
220204 {
221205 OnWriteError ( ex ) ;
222- throw ;
206+ throw new Exception ( "发送数据时出错" , ex ) ;
223207 }
224208 finally
225209 {
@@ -229,26 +213,29 @@ await writePackageBuffer(stream,
229213 }
230214 }
231215
232-
233216 /// <summary>
234217 /// 发送心跳包
235218 /// </summary>
236- public Task SendHeartbeatPackage ( )
219+ public async Task SendHeartbeatPackage ( )
237220 {
238- return writePackageAsync ( async writer =>
221+ await UseSendPipe ( async pipe =>
239222 {
223+ var writer = pipe . Writer ;
240224 //写入包类型
241225 writer . GetSpan ( 1 ) [ 0 ] = ( byte ) QpPackageType . Heartbeat ;
242226 writer . Advance ( 1 ) ;
243- await writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
244- return new Tuple < QpPackageType , int > ( QpPackageType . Heartbeat , 1 ) ;
245- } , null ) ;
227+ _ = writer . FlushAsync ( ) ;
228+ await writePackageBuffer ( pipe . Reader ,
229+ QpPackageType . Heartbeat ,
230+ 1 ) . ConfigureAwait ( false ) ;
231+ } ) ;
246232 }
247233
248- public Task SendNoticePackage ( string noticePackageTypeName , string noticePackageContent )
234+ public async Task SendNoticePackage ( string noticePackageTypeName , string noticePackageContent )
249235 {
250- return writePackageAsync ( async writer =>
236+ await UseSendPipe ( async pipe =>
251237 {
238+ var writer = pipe . Writer ;
252239 //写入包类型
253240 writer . GetSpan ( 1 ) [ 0 ] = ( byte ) QpPackageType . Notice ;
254241 writer . Advance ( 1 ) ;
@@ -282,21 +269,28 @@ public Task SendNoticePackage(string noticePackageTypeName, string noticePackage
282269 bodyLength += byteCount ;
283270 }
284271 }
285- await writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
272+ _ = writer . FlushAsync ( ) ;
286273 if ( LogUtils . LogNotice )
287274 LogUtils . Log ( "{0}: [Send-NoticePackage]Type:{1},Content:{2}" , DateTime . Now , typeName , LogUtils . LogContent ? content : LogUtils . NOT_SHOW_CONTENT_MESSAGE ) ;
275+ await writePackageBuffer ( pipe . Reader ,
276+ QpPackageType . Heartbeat ,
277+ 1 ) . ConfigureAwait ( false ) ;
278+ } ) ;
279+ }
288280
289- return new Tuple < QpPackageType , int > ( QpPackageType . Notice , bodyLength ) ;
290- } , null ) ;
281+ public Task SendCommandRequestPackage ( string commandId , string typeName , string content )
282+ {
283+ return SendCommandRequestPackage ( commandId , typeName , content , false ) ;
291284 }
292285
293286 /// <summary>
294287 /// 发送命令请求包
295288 /// </summary>
296- public Task SendCommandRequestPackage ( string commandId , string typeName , string content , Action afterSendHandler = null )
289+ private async Task SendCommandRequestPackage ( string commandId , string typeName , string content , bool ignoreCompressAndEncrypt )
297290 {
298- return writePackageAsync ( async writer =>
291+ await UseSendPipe ( async pipe =>
299292 {
293+ var writer = pipe . Writer ;
300294 //写入包类型
301295 writer . GetSpan ( 1 ) [ 0 ] = ( byte ) QpPackageType . CommandRequest ;
302296 writer . Advance ( 1 ) ;
@@ -327,22 +321,26 @@ public Task SendCommandRequestPackage(string commandId, string typeName, string
327321 writer . Advance ( contentLength ) ;
328322 bodyLength += contentLength ;
329323 }
330- await writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
324+ _ = writer . FlushAsync ( ) ;
331325
332326 if ( LogUtils . LogCommand )
333327 LogUtils . Log ( "{0}: [Send-CommandRequestPackage]CommandId:{1},Type:{2},Content:{3}" , DateTime . Now , commandId , typeName , LogUtils . LogContent ? content : LogUtils . NOT_SHOW_CONTENT_MESSAGE ) ;
334328
335- return new Tuple < QpPackageType , int > ( QpPackageType . CommandRequest , bodyLength ) ;
336- } , afterSendHandler ) ;
329+ await writePackageBuffer ( pipe . Reader ,
330+ QpPackageType . Heartbeat ,
331+ 1 ,
332+ ignoreCompressAndEncrypt ) . ConfigureAwait ( false ) ;
333+ } ) ;
337334 }
338335
339336 /// <summary>
340337 /// 发送命令响应包
341338 /// </summary>
342- public Task SendCommandResponsePackage ( string commandId , byte code , string message , string typeName , string content )
339+ public async Task SendCommandResponsePackage ( string commandId , byte code , string message , string typeName , string content )
343340 {
344- return writePackageAsync ( async writer =>
341+ await UseSendPipe ( async pipe =>
345342 {
343+ var writer = pipe . Writer ;
346344 //写入包类型
347345 writer . GetSpan ( 1 ) [ 0 ] = ( byte ) QpPackageType . CommandResponse ;
348346 writer . Advance ( 1 ) ;
@@ -400,9 +398,11 @@ public Task SendCommandResponsePackage(string commandId, byte code, string messa
400398 if ( LogUtils . LogNotice )
401399 LogUtils . Log ( "{0}: [Send-CommandResponsePackage]CommandId:{1},Code:{2},Message:{3}" , DateTime . Now , commandId , code , message ) ;
402400 }
403- await writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
404- return new Tuple < QpPackageType , int > ( QpPackageType . CommandResponse , bodyLength ) ;
405- } , null ) ;
401+ _ = writer . FlushAsync ( ) ;
402+ await writePackageBuffer ( pipe . Reader ,
403+ QpPackageType . Heartbeat ,
404+ 1 ) . ConfigureAwait ( false ) ;
405+ } ) ;
406406 }
407407 }
408408}
0 commit comments