1010using System . Threading ;
1111using System . Threading . Tasks ;
1212using Nerdbank . Streams ;
13+ using Quick . Protocol . Streams ;
1314
1415namespace Quick . Protocol
1516{
@@ -39,6 +40,8 @@ protected virtual void OnWriteError(Exception exception)
3940 private async Task writePackageBuffer ( PipeReader currentReader , QpPackageType packageType , int packageBodyLength , bool ignoreCompressAndEncrypt = false )
4041 {
4142 var stream = QpPackageHandler_Stream ;
43+ if ( stream == null )
44+ throw new IOException ( "Not connected." ) ;
4245
4346 var readRet = await currentReader . ReadAtLeastAsync ( packageBodyLength ) ;
4447 var packageBodyBuffer = readRet . Buffer ;
@@ -55,16 +58,20 @@ private async Task writePackageBuffer(PipeReader currentReader, QpPackageType pa
5558 if ( writeCompressPipe == null )
5659 writeCompressPipe = new Pipe ( ) ;
5760 using ( var inStream = packageBodyBuffer . AsStream ( ) )
58- using ( var outStream = writeCompressPipe . Writer . AsStream ( true ) )
59- using ( var gzStream = new GZipStream ( outStream , CompressionMode . Compress , true ) )
61+ using ( var outStream = new PipeWriterStream ( writeCompressPipe . Writer , true ) )
6062 {
61- await inStream . CopyToAsync ( gzStream ) . ConfigureAwait ( false ) ;
63+ using ( var gzStream = new GZipStream ( outStream , CompressionMode . Compress , true ) )
64+ {
65+ await inStream . CopyToAsync ( gzStream ) . ConfigureAwait ( false ) ;
66+ }
67+ packageTotalLength = Convert . ToInt32 ( outStream . Length ) ;
68+ _ = writeCompressPipe . Writer . FlushAsync ( ) ;
6269 }
6370 currentReader . AdvanceTo ( packageBodyBuffer . End ) ;
71+
72+ readRet = await writeCompressPipe . Reader . ReadAtLeastAsync ( packageTotalLength ) . ConfigureAwait ( false ) ;
6473
65- readRet = await writeCompressPipe . Reader . ReadAsync ( ) . ConfigureAwait ( false ) ;
6674 packageBodyBuffer = readRet . Buffer ;
67- packageTotalLength += Convert . ToInt32 ( packageBodyBuffer . Length ) ;
6875
6976 //包总长度
7077 packageTotalLength += PACKAGE_TOTAL_LENGTH_LENGTH ;
@@ -194,7 +201,7 @@ private async Task UseSendPipe(Func<Pipe, Task> handler)
194201 {
195202 if ( options . EnableNetstat )
196203 Interlocked . Increment ( ref PackageSendQueueCount ) ;
197- await sendLock . WaitAsync ( 5000 ) . ConfigureAwait ( false ) ;
204+ await sendLock . WaitAsync ( ) . ConfigureAwait ( false ) ;
198205 try
199206 {
200207 await handler ( sendPipe ) ;
0 commit comments