@@ -36,9 +36,8 @@ protected virtual void OnWriteError(Exception exception)
3636 byte [ ] encryptBuffer1 = null ;
3737 byte [ ] encryptBuffer2 = null ;
3838
39- private async Task writePackageBuffer ( Stream stream , QpPackageType packageType , ReadOnlySequence < byte > packageBodyBuffer , Action afterSendHandler )
39+ private async Task writePackageBuffer ( Stream stream , PipeReader currentReader , QpPackageType packageType , ReadOnlySequence < byte > packageBodyBuffer , Action afterSendHandler )
4040 {
41- Pipe currentPipe = null ;
4241 var packageTotalLength = 0 ;
4342 Memory < byte > packageHeadMemory = default ;
4443
@@ -55,6 +54,8 @@ private async Task writePackageBuffer(Stream stream, QpPackageType packageType,
5554 using ( var gzStream = new GZipStream ( outStream , CompressionMode . Compress , true ) )
5655 await inStream . CopyToAsync ( gzStream ) . ConfigureAwait ( false ) ;
5756
57+ currentReader . AdvanceTo ( packageBodyBuffer . End ) ;
58+
5859 var readRet = await writeCompressPipe . Reader . ReadAsync ( ) . ConfigureAwait ( false ) ;
5960 packageBodyBuffer = readRet . Buffer ;
6061 var packageBodyLength = Convert . ToInt32 ( packageBodyBuffer . Length ) ;
@@ -63,7 +64,7 @@ private async Task writePackageBuffer(Stream stream, QpPackageType packageType,
6364 //准备包头
6465 writePackageTotalLengthToBuffer ( sendHeadBuffer , 0 , packageTotalLength ) ;
6566 packageHeadMemory = new Memory < byte > ( sendHeadBuffer , 0 , PACKAGE_TOTAL_LENGTH_LENGTH ) ;
66- currentPipe = writeCompressPipe ;
67+ currentReader = writeCompressPipe . Reader ;
6768 }
6869 //如果加密
6970 if ( options . InternalEncrypt )
@@ -75,7 +76,7 @@ private async Task writePackageBuffer(Stream stream, QpPackageType packageType,
7576 {
7677 encryptPipe = new Pipe ( ) ;
7778 encryptBuffer1 = new byte [ enc . InputBlockSize ] ;
78- encryptBuffer2 = new byte [ enc . InputBlockSize * 2 ] ;
79+ encryptBuffer2 = new byte [ enc . OutputBlockSize ] ;
7980 }
8081 //开始加密
8182 var toEncryptedBuffer = packageBodyBuffer ;
@@ -111,6 +112,7 @@ private async Task writePackageBuffer(Stream stream, QpPackageType packageType,
111112 _ = Task . Run ( async ( ) =>
112113 {
113114 await encryptPipe . Writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
115+ currentReader . AdvanceTo ( packageBodyBuffer . End ) ;
114116 } ) ;
115117
116118 var readRet = await encryptPipe . Reader . ReadAtLeastAsync ( packageTotalLength ) . ConfigureAwait ( false ) ;
@@ -121,7 +123,7 @@ private async Task writePackageBuffer(Stream stream, QpPackageType packageType,
121123 //准备包头
122124 writePackageTotalLengthToBuffer ( sendHeadBuffer , 0 , packageTotalLength ) ;
123125 packageHeadMemory = new Memory < byte > ( sendHeadBuffer , 0 , PACKAGE_TOTAL_LENGTH_LENGTH ) ;
124- currentPipe = encryptPipe ;
126+ currentReader = encryptPipe . Reader ;
125127 }
126128 catch ( Exception ex )
127129 {
@@ -171,8 +173,7 @@ await writeTask
171173 LogUtils . LogContent ?
172174 BitConverter . ToString ( sendHeadBuffer . Concat ( packageBodyBuffer . ToArray ( ) ) . ToArray ( ) )
173175 : LogUtils . NOT_SHOW_CONTENT_MESSAGE ) ;
174- if ( currentPipe != null )
175- currentPipe . Reader . AdvanceTo ( packageBodyBuffer . End ) ;
176+ currentReader . AdvanceTo ( packageBodyBuffer . End ) ;
176177 await stream . FlushAsync ( ) . ConfigureAwait ( false ) ;
177178 }
178179
@@ -198,26 +199,19 @@ private async Task writePackageAsync(Func<PipeWriter, Task<Tuple<QpPackageType,
198199 var ret = await getPackagePayloadFunc ( sendPipe . Writer ) . ConfigureAwait ( false ) ;
199200 var packageType = ret . Item1 ;
200201 var packageBodyLength = ret . Item2 ;
201- var packageTotalLength = packageBodyLength + PACKAGE_HEAD_LENGTH ;
202+ var packageTotalLength = packageBodyLength + PACKAGE_TOTAL_LENGTH_LENGTH ;
202203
203204 if ( packageTotalLength < PACKAGE_HEAD_LENGTH )
204205 throw new IOException ( $ "包大小[{ packageTotalLength } ]小于包头长度[{ PACKAGE_HEAD_LENGTH } ]") ;
205206 try
206207 {
207- ReadOnlySequence < byte > packageBuffer = default ;
208- if ( packageBodyLength > 0 )
209- {
210- var readRet = await sendPipe . Reader . ReadAtLeastAsync ( packageBodyLength ) . ConfigureAwait ( false ) ;
211- packageBuffer = readRet . Buffer ;
212- }
208+ var readRet = await sendPipe . Reader . ReadAtLeastAsync ( packageBodyLength ) . ConfigureAwait ( false ) ;
209+ var packageBuffer = readRet . Buffer ;
213210 await writePackageBuffer ( stream ,
211+ sendPipe . Reader ,
214212 packageType ,
215213 packageBuffer ,
216214 afterSendHandler ) . ConfigureAwait ( false ) ;
217- if ( packageBodyLength > 0 )
218- {
219- sendPipe . Reader . AdvanceTo ( packageBuffer . End ) ;
220- }
221215 lastSendPackageTime = DateTime . Now ;
222216 }
223217 catch ( Exception ex )
@@ -240,12 +234,13 @@ await writePackageBuffer(stream,
240234 /// </summary>
241235 public Task SendHeartbeatPackage ( )
242236 {
243- return writePackageAsync ( writer =>
237+ return writePackageAsync ( async writer =>
244238 {
245239 //写入包类型
246240 writer . GetSpan ( 1 ) [ 0 ] = ( byte ) QpPackageType . Heartbeat ;
247241 writer . Advance ( 1 ) ;
248- return Task . FromResult ( new Tuple < QpPackageType , int > ( QpPackageType . Heartbeat , 1 ) ) ;
242+ await writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
243+ return new Tuple < QpPackageType , int > ( QpPackageType . Heartbeat , 1 ) ;
249244 } , null ) ;
250245 }
251246
0 commit comments