@@ -838,15 +838,23 @@ protected void BeginReadPackage(CancellationToken token)
838838 Task . Run ( async ( ) =>
839839 {
840840 var pipe = new Pipe ( ) ;
841- var fillTask = FillRecvPipeAsync ( QpPackageHandler_Stream , pipe . Writer , token ) ;
842- var readTask = ReadRecvPipeAsync ( pipe . Reader , token ) ;
843- await Task . WhenAll ( fillTask , readTask ) ;
841+ var fillTask = FillRecvPipeAsync ( QpPackageHandler_Stream , pipe . Writer , token ) ;
842+ var readTask = ReadRecvPipeAsync ( pipe . Reader , token ) ;
843+ try
844+ {
845+ await Task . WhenAll ( fillTask , readTask ) ;
846+ }
847+ catch
848+ {
849+ pipe . Writer . Complete ( ) ;
850+ pipe . Reader . Complete ( ) ;
851+ }
844852 } ) ;
845853 }
846854
847855 private async Task FillRecvPipeAsync ( Stream stream , PipeWriter writer , CancellationToken token )
848856 {
849- while ( true )
857+ while ( ! token . IsCancellationRequested )
850858 {
851859 Memory < byte > memory = writer . GetMemory ( minimumBufferSize ) ;
852860 try
@@ -865,55 +873,48 @@ private async Task FillRecvPipeAsync(Stream stream, PipeWriter writer, Cancellat
865873 if ( result . IsCompleted )
866874 break ;
867875 }
868- writer . Complete ( ) ;
869876 }
870877
871- private async Task ReadRecvPipeAsync ( PipeReader reader , CancellationToken token )
878+ private async Task ReadRecvPipeAsync ( PipeReader recvReader , CancellationToken token )
872879 {
873- //是否正在读取拆分包
874- bool isReadingSplitPackage = false ;
875- int splitMsCapacity = 0 ;
876- MemoryStream splitMs = null ;
877-
880+ //暂存包头缓存
878881 var packageHeadBuffer = new byte [ PACKAGE_HEAD_LENGTH ] ;
882+ Pipe decryptPipe = null ;
883+ Pipe decompressPipe = null ;
884+
879885 while ( ! token . IsCancellationRequested )
880886 {
881- var ret = await reader . ReadAtLeastAsync ( PACKAGE_HEAD_LENGTH , token ) . ConfigureAwait ( false ) ;
887+ var currentReader = recvReader ;
888+ var ret = await currentReader . ReadAtLeastAsync ( PACKAGE_HEAD_LENGTH , token ) . ConfigureAwait ( false ) ;
882889 if ( ret . IsCanceled )
883890 return ;
884891
885- var headerBuffer = ret . Buffer . Slice ( 0 , PACKAGE_HEAD_LENGTH ) ;
886- headerBuffer . CopyTo ( packageHeadBuffer ) ;
887- reader . AdvanceTo ( headerBuffer . End ) ;
892+ ret . Buffer . Slice ( 0 , PACKAGE_TOTAL_LENGTH_LENGTH ) . CopyTo ( packageHeadBuffer ) ;
893+ currentReader . AdvanceTo ( ret . Buffer . Start ) ;
888894
889895 //包总长度
890896 var packageTotalLength = ByteUtils . B2I_BE ( packageHeadBuffer , 0 ) ;
891897 if ( packageTotalLength < PACKAGE_HEAD_LENGTH )
892- throw new ProtocolException ( headerBuffer , $ "包长度[{ packageTotalLength } ]必须大于等于{ PACKAGE_HEAD_LENGTH } !") ;
898+ throw new ProtocolException ( new ReadOnlySequence < byte > ( packageHeadBuffer ) , $ "包长度[{ packageTotalLength } ]必须大于等于{ PACKAGE_HEAD_LENGTH } !") ;
893899 if ( packageTotalLength > BufferSize )
894- throw new ProtocolException ( headerBuffer , $ "数据包总长度[{ packageTotalLength } ]大于缓存大小[{ BufferSize } ]") ;
900+ throw new ProtocolException ( new ReadOnlySequence < byte > ( packageHeadBuffer ) , $ "数据包总长度[{ packageTotalLength } ]大于缓存大小[{ BufferSize } ]") ;
895901
896- //包体长度
897- var packageBodyLength = packageTotalLength - PACKAGE_HEAD_LENGTH ;
898- //包类型
899- var packageType = ( QpPackageType ) packageHeadBuffer [ PACKAGE_TOTAL_LENGTH_LENGTH ] ;
900-
901- //读取包体
902- ret = await reader . ReadAtLeastAsync ( packageBodyLength , token ) . ConfigureAwait ( false ) ;
902+ //读取完整包
903+ ret = await recvReader . ReadAtLeastAsync ( packageTotalLength , token ) . ConfigureAwait ( false ) ;
903904 if ( ret . IsCanceled )
904905 return ;
905- var bodyBuffer = ret . Buffer . Slice ( 0 , packageBodyLength ) ;
906-
907- if ( bodyBuffer . Length < packageBodyLength )
908- throw new ProtocolException ( bodyBuffer , $ "包体读取错误!包体长度:{ packageBodyLength } ,读取数据长度:{ bodyBuffer . Length } ") ;
906+ var packageBuffer = ret . Buffer . Slice ( 0 , packageTotalLength ) ;
909907
910908 //如果设置了压缩或者加密
911- /*
912909 if ( options . InternalCompress || options . InternalEncrypt )
913910 {
911+ /*
914912 //如果设置了加密,则先解密
915913 if (options.InternalEncrypt)
916914 {
915+ if(decryptPipe==null)
916+ decryptPipe=new Pipe();
917+
917918 var retBuffer = dec.TransformFinalBlock(currentPackageBuffer.Array, PACKAGE_TOTAL_LENGTH_LENGTH + currentPackageBuffer.Offset, currentPackageBuffer.Count - PACKAGE_TOTAL_LENGTH_LENGTH);
918919 var currentBuffer = getFreeBuffer(currentPackageBuffer.Array, recvBuffer, recvBuffer2);
919920 packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH + retBuffer.Length;
@@ -939,9 +940,16 @@ private async Task ReadRecvPipeAsync(PipeReader reader, CancellationToken token)
939940 Array.Copy(retBuffer, 0, currentBuffer, PACKAGE_TOTAL_LENGTH_LENGTH, count);
940941 currentPackageBuffer = new ArraySegment<byte>(currentBuffer, 0, packageTotalLength);
941942 }
943+ */
944+ }
945+ else
946+ {
947+
942948 }
943- */
944- //buffer.CopyTo(packageHeadBuffer);
949+
950+ //包类型
951+ packageBuffer . Slice ( 0 , PACKAGE_HEAD_LENGTH ) . CopyTo ( packageHeadBuffer ) ;
952+ var packageType = ( QpPackageType ) packageHeadBuffer [ PACKAGE_TOTAL_LENGTH_LENGTH ] ;
945953
946954 if ( LogUtils . LogPackage )
947955 LogUtils . Log (
@@ -950,52 +958,16 @@ private async Task ReadRecvPipeAsync(PipeReader reader, CancellationToken token)
950958 packageTotalLength ,
951959 packageType ,
952960 LogUtils . LogContent ?
953- BitConverter . ToString ( packageHeadBuffer . Concat ( bodyBuffer . ToArray ( ) ) . ToArray ( ) )
961+ BitConverter . ToString ( packageBuffer . ToArray ( ) )
954962 : LogUtils . NOT_SHOW_CONTENT_MESSAGE ) ;
955- HandlePackage ( packageType , bodyBuffer ) ;
956- reader . AdvanceTo ( bodyBuffer . End ) ;
957- //如果当前包是拆分包
958- /*
959- if (packageType == QpPackageType.Split)
960- {
961- if (!isReadingSplitPackage)
962- {
963- var tmpPackageBodyLength = ByteUtils.B2I_BE(currentPackageBuffer.Array, currentPackageBuffer.Offset + PACKAGE_HEAD_LENGTH);
964- splitMsCapacity = tmpPackageBodyLength;
965- if (splitMsCapacity <= 0)
966- throw new IOException($"拆分包中包长度[{splitMsCapacity}]必须为正数!");
967- if (splitMsCapacity > options.MaxPackageSize)
968- throw new IOException($"拆分包中包长度[{splitMsCapacity}]大于最大包大小[{options.MaxPackageSize}]");
969- splitMs = new MemoryStream(splitMsCapacity);
970- isReadingSplitPackage = true;
971- }
972- await splitMs.WriteAsync(currentPackageBuffer.Array, currentPackageBuffer.Offset + PACKAGE_HEAD_LENGTH, currentPackageBuffer.Count - PACKAGE_HEAD_LENGTH).ConfigureAwait(false);
973-
974- //如果拆分包已经读取完成
975- if (splitMs.Position >= splitMsCapacity)
976- {
977- finalPackageBuffer = new ArraySegment<byte>(splitMs.ToArray());
978- splitMs.Dispose();
979- if (LogUtils.LogSplit)
980- LogUtils.Log("{0}: [Recv-SplitPackage]Length:{1}", DateTime.Now, finalPackageBuffer.Count);
981- break;
982- }
983- }
984- else
985- {
986- finalPackageBuffer = currentPackageBuffer;
987- break;
988- }
989- */
990-
991- }
992-
993- // 将PipeReader标记为完成
994- reader . Complete ( ) ;
963+ HandlePackage ( packageType , packageBuffer ) ;
964+ currentReader . AdvanceTo ( packageBuffer . End ) ;
965+ }
995966 }
996967
997- protected void HandlePackage ( QpPackageType packageType , ReadOnlySequence < byte > bodyBuffer )
968+ protected void HandlePackage ( QpPackageType packageType , ReadOnlySequence < byte > packageBuffer )
998969 {
970+ var bodyBuffer = packageBuffer . Slice ( PACKAGE_HEAD_LENGTH ) ;
999971 switch ( packageType )
1000972 {
1001973 case QpPackageType . Heartbeat :
0 commit comments