1414using System . Text . Json . Serialization ;
1515using System . IO . Pipelines ;
1616using System . Buffers ;
17+ using System . Collections . ObjectModel ;
1718
1819namespace Quick . Protocol
1920{
@@ -36,7 +37,6 @@ public abstract class QpChannel
3637 /// 心跳包
3738 /// </summary>
3839 private static readonly byte [ ] HEARTBEAT_PACKAGHE = new byte [ ] { 0 , 0 , 0 , 5 , 0 } ;
39- private static readonly ArraySegment < byte > nullArraySegment = new ArraySegment < byte > ( ) ;
4040 private const int minimumBufferSize = 1024 ;
4141
4242 //发送缓存
@@ -854,115 +854,159 @@ protected void BeginReadPackage(CancellationToken token)
854854
855855 private async Task FillRecvPipeAsync ( Stream stream , PipeWriter writer , CancellationToken token )
856856 {
857- while ( ! token . IsCancellationRequested )
857+ try
858858 {
859- Memory < byte > memory = writer . GetMemory ( minimumBufferSize ) ;
860- try
859+ while ( ! token . IsCancellationRequested )
861860 {
861+ Memory < byte > memory = writer . GetMemory ( minimumBufferSize ) ;
862862 int bytesRead = await stream . ReadAsync ( memory , token ) ;
863863 if ( bytesRead == 0 )
864864 break ;
865865 writer . Advance ( bytesRead ) ;
866+ await writer . FlushAsync ( token ) ;
866867 }
867- catch ( Exception ex )
868- {
869- OnReadError ( ex ) ;
870- break ;
871- }
872- FlushResult result = await writer . FlushAsync ( token ) ;
873- if ( result . IsCompleted )
874- break ;
875868 }
869+ catch ( Exception ex )
870+ {
871+ OnReadError ( ex ) ;
872+ }
873+ }
874+
875+ //解析包总长度
876+ private int parsePackageTotalLength ( ReadOnlySequence < byte > sequence , byte [ ] buffer )
877+ {
878+ sequence . Slice ( 0 , PACKAGE_TOTAL_LENGTH_LENGTH ) . CopyTo ( buffer ) ;
879+ var packageTotalLength = ByteUtils . B2I_BE ( buffer , 0 ) ;
880+ if ( packageTotalLength < PACKAGE_HEAD_LENGTH )
881+ throw new ProtocolException ( new ReadOnlySequence < byte > ( buffer ) , $ "包长度[{ packageTotalLength } ]必须大于等于{ PACKAGE_HEAD_LENGTH } !") ;
882+ if ( packageTotalLength > BufferSize )
883+ throw new ProtocolException ( new ReadOnlySequence < byte > ( buffer ) , $ "数据包总长度[{ packageTotalLength } ]大于缓存大小[{ BufferSize } ]") ;
884+ return packageTotalLength ;
876885 }
877886
878887 private async Task ReadRecvPipeAsync ( PipeReader recvReader , CancellationToken token )
879888 {
880889 //暂存包头缓存
881890 var packageHeadBuffer = new byte [ PACKAGE_HEAD_LENGTH ] ;
891+ //包总长度
892+ var packageTotalLength = 0 ;
893+ //解密相关变量
882894 Pipe decryptPipe = null ;
883- Pipe decompressPipe = null ;
884-
885- while ( ! token . IsCancellationRequested )
895+ byte [ ] decryptBuffer1 = null ;
896+ byte [ ] decryptBuffer2 = null ;
897+ //Pipe decompressPipe = null;
898+ try
886899 {
887- var currentReader = recvReader ;
888- var ret = await currentReader . ReadAtLeastAsync ( PACKAGE_HEAD_LENGTH , token ) . ConfigureAwait ( false ) ;
889- if ( ret . IsCanceled )
890- return ;
891-
892- ret . Buffer . Slice ( 0 , PACKAGE_TOTAL_LENGTH_LENGTH ) . CopyTo ( packageHeadBuffer ) ;
893- currentReader . AdvanceTo ( ret . Buffer . Start ) ;
900+ while ( ! token . IsCancellationRequested )
901+ {
902+ var currentReader = recvReader ;
903+ var ret = await currentReader . ReadAtLeastAsync ( PACKAGE_HEAD_LENGTH , token ) . ConfigureAwait ( false ) ;
904+ if ( ret . IsCanceled )
905+ return ;
906+ //解析包总长度
907+ packageTotalLength = parsePackageTotalLength ( ret . Buffer , packageHeadBuffer ) ;
908+ currentReader . AdvanceTo ( ret . Buffer . Start ) ;
894909
895- //包总长度
896- var packageTotalLength = ByteUtils . B2I_BE ( packageHeadBuffer , 0 ) ;
897- if ( packageTotalLength < PACKAGE_HEAD_LENGTH )
898- throw new ProtocolException ( new ReadOnlySequence < byte > ( packageHeadBuffer ) , $ "包长度[{ packageTotalLength } ]必须大于等于{ PACKAGE_HEAD_LENGTH } !") ;
899- if ( packageTotalLength > BufferSize )
900- throw new ProtocolException ( new ReadOnlySequence < byte > ( packageHeadBuffer ) , $ "数据包总长度[{ packageTotalLength } ]大于缓存大小[{ BufferSize } ]") ;
901-
902- //读取完整包
903- ret = await recvReader . ReadAtLeastAsync ( packageTotalLength , token ) . ConfigureAwait ( false ) ;
904- if ( ret . IsCanceled )
905- return ;
906- var packageBuffer = ret . Buffer . Slice ( 0 , packageTotalLength ) ;
910+ //读取完整包
911+ ret = await recvReader . ReadAtLeastAsync ( packageTotalLength , token ) . ConfigureAwait ( false ) ;
912+ if ( ret . IsCanceled )
913+ return ;
914+ if ( ret . Buffer . Length < packageTotalLength )
915+ throw new ProtocolException ( ret . Buffer , $ "包读取错误!包总长度:{ packageTotalLength } ,读取数据长度:{ ret . Buffer . Length } ") ;
916+ var packageBuffer = ret . Buffer . Slice ( 0 , packageTotalLength ) ;
907917
908- //如果设置了压缩或者加密
909- if ( options . InternalCompress || options . InternalEncrypt )
910- {
911- /*
912- //如果设置了加密,则先解密
913- if (options.InternalEncrypt)
918+ //如果设置了压缩或者加密
919+ if ( options . InternalCompress || options . InternalEncrypt )
914920 {
915- if(decryptPipe==null)
916- decryptPipe=new Pipe();
921+ //如果设置了加密,则先解密
922+ if ( options . InternalEncrypt )
923+ {
924+ if ( decryptPipe == null )
925+ {
926+ decryptPipe = new Pipe ( ) ;
927+ decryptBuffer1 = new byte [ 1024 ] ;
928+ decryptBuffer2 = new byte [ 1024 ] ;
929+ }
930+
931+ //写入包头
932+ decryptPipe . Writer . GetMemory ( PACKAGE_TOTAL_LENGTH_LENGTH ) ;
933+ decryptPipe . Writer . Advance ( PACKAGE_TOTAL_LENGTH_LENGTH ) ;
934+ packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH ;
935+
936+ //开始解密
937+ var encryptedBuffer = packageBuffer . Slice ( PACKAGE_TOTAL_LENGTH_LENGTH ) ;
938+ while ( encryptedBuffer . Length > 0 )
939+ {
940+ var inLength = Math . Min ( decryptBuffer1 . Length , ( int ) encryptedBuffer . Length ) ;
941+ encryptedBuffer . Slice ( 0 , inLength ) . CopyTo ( decryptBuffer1 ) ;
942+ var outLength = dec . TransformBlock ( decryptBuffer1 , 0 , inLength , decryptBuffer2 , 0 ) ;
943+ decryptBuffer2 . CopyTo ( decryptPipe . Writer . GetMemory ( outLength ) ) ;
944+ decryptPipe . Writer . Advance ( outLength ) ;
945+ encryptedBuffer = encryptedBuffer . Slice ( inLength ) ;
946+ packageTotalLength += outLength ;
947+ }
948+ {
949+ var finalData = dec . TransformFinalBlock ( decryptBuffer1 , 0 , 0 ) ;
950+ finalData . CopyTo ( decryptPipe . Writer . GetMemory ( finalData . Length ) ) ;
951+ decryptPipe . Writer . Advance ( finalData . Length ) ;
952+ packageTotalLength += finalData . Length ;
953+ }
954+ await decryptPipe . Writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
955+ //解密完成,释放缓存
956+ currentReader . AdvanceTo ( packageBuffer . End ) ;
957+
958+ ret = await decryptPipe . Reader . ReadAtLeastAsync ( packageTotalLength , token ) . ConfigureAwait ( false ) ;
959+ packageBuffer = ret . Buffer ;
960+ currentReader = decryptPipe . Reader ;
961+ }
917962
918- var retBuffer = dec.TransformFinalBlock(currentPackageBuffer.Array, PACKAGE_TOTAL_LENGTH_LENGTH + currentPackageBuffer.Offset, currentPackageBuffer.Count - PACKAGE_TOTAL_LENGTH_LENGTH);
919- var currentBuffer = getFreeBuffer(currentPackageBuffer.Array, recvBuffer, recvBuffer2);
920- packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH + retBuffer.Length;
921- writePackageTotalLengthToBuffer(currentBuffer, 0, packageTotalLength);
922- Array.Copy(retBuffer, 0, currentBuffer, PACKAGE_TOTAL_LENGTH_LENGTH, retBuffer.Length);
923- currentPackageBuffer = new ArraySegment<byte>(currentBuffer, 0, packageTotalLength);
924- }
925- //如果设置了压缩,则先解压
926- if (options.InternalCompress)
927- {
928- var retBuffer = getFreeBuffer(currentPackageBuffer.Array, recvBuffer, recvBuffer2);
929- var count = 0;
930- using (var readMs = new MemoryStream(currentPackageBuffer.Array, PACKAGE_TOTAL_LENGTH_LENGTH + currentPackageBuffer.Offset, currentPackageBuffer.Count - PACKAGE_TOTAL_LENGTH_LENGTH, false))
931- using (var writeMs = new MemoryStream(retBuffer, 0, retBuffer.Length))
963+ /*
964+ //如果设置了压缩,则先解压
965+ if (options.InternalCompress)
932966 {
933- using (var gzStream = new GZipStream(readMs, CompressionMode.Decompress, true))
934- gzStream.CopyTo(writeMs);
935- count = Convert.ToInt32(writeMs.Position);
967+ var retBuffer = getFreeBuffer(currentPackageBuffer.Array, recvBuffer, recvBuffer2);
968+ var count = 0;
969+ using (var readMs = new MemoryStream(currentPackageBuffer.Array, PACKAGE_TOTAL_LENGTH_LENGTH + currentPackageBuffer.Offset, currentPackageBuffer.Count - PACKAGE_TOTAL_LENGTH_LENGTH, false))
970+ using (var writeMs = new MemoryStream(retBuffer, 0, retBuffer.Length))
971+ {
972+ using (var gzStream = new GZipStream(readMs, CompressionMode.Decompress, true))
973+ gzStream.CopyTo(writeMs);
974+ count = Convert.ToInt32(writeMs.Position);
975+ }
976+ var currentBuffer = getFreeBuffer(retBuffer, recvBuffer, recvBuffer2);
977+ packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH + count;
978+ writePackageTotalLengthToBuffer(currentBuffer, 0, packageTotalLength);
979+ Array.Copy(retBuffer, 0, currentBuffer, PACKAGE_TOTAL_LENGTH_LENGTH, count);
980+ currentPackageBuffer = new ArraySegment<byte>(currentBuffer, 0, packageTotalLength);
936981 }
937- var currentBuffer = getFreeBuffer(retBuffer, recvBuffer, recvBuffer2);
938- packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH + count;
939- writePackageTotalLengthToBuffer(currentBuffer, 0, packageTotalLength);
940- Array.Copy(retBuffer, 0, currentBuffer, PACKAGE_TOTAL_LENGTH_LENGTH, count);
941- currentPackageBuffer = new ArraySegment<byte>(currentBuffer, 0, packageTotalLength);
982+ */
942983 }
943- */
944- }
945- else
946- {
947-
948- }
984+ else
985+ {
949986
950- //包类型
951- packageBuffer . Slice ( 0 , PACKAGE_HEAD_LENGTH ) . CopyTo ( packageHeadBuffer ) ;
952- var packageType = ( QpPackageType ) packageHeadBuffer [ PACKAGE_TOTAL_LENGTH_LENGTH ] ;
987+ }
953988
954- if ( LogUtils . LogPackage )
955- LogUtils . Log (
956- "{0}: [Recv-Package]Length:{1},Type:{2},Content:{3}" ,
957- DateTime . Now ,
958- packageTotalLength ,
959- packageType ,
960- LogUtils . LogContent ?
961- BitConverter . ToString ( packageBuffer . ToArray ( ) )
962- : LogUtils . NOT_SHOW_CONTENT_MESSAGE ) ;
963- HandlePackage ( packageType , packageBuffer ) ;
964- currentReader . AdvanceTo ( packageBuffer . End ) ;
965- }
989+ //包类型
990+ packageBuffer . Slice ( 0 , PACKAGE_HEAD_LENGTH ) . CopyTo ( packageHeadBuffer ) ;
991+ var packageType = ( QpPackageType ) packageHeadBuffer [ PACKAGE_TOTAL_LENGTH_LENGTH ] ;
992+
993+ if ( LogUtils . LogPackage )
994+ LogUtils . Log (
995+ "{0}: [Recv-Package]Length:{1},Type:{2},Content:{3}" ,
996+ DateTime . Now ,
997+ packageTotalLength ,
998+ packageType ,
999+ LogUtils . LogContent ?
1000+ BitConverter . ToString ( packageBuffer . ToArray ( ) )
1001+ : LogUtils . NOT_SHOW_CONTENT_MESSAGE ) ;
1002+ HandlePackage ( packageType , packageBuffer ) ;
1003+ currentReader . AdvanceTo ( packageBuffer . End ) ;
1004+ }
1005+ }
1006+ catch ( Exception ex )
1007+ {
1008+ OnReadError ( ex ) ;
1009+ }
9661010 }
9671011
9681012 protected void HandlePackage ( QpPackageType packageType , ReadOnlySequence < byte > packageBuffer )
@@ -1000,7 +1044,10 @@ protected void HandlePackage(QpPackageType packageType, ReadOnlySequence<byte> p
10001044
10011045 var typeNameLength = bodyBuffer . First . Span [ 0 ] ;
10021046 bodyBuffer = bodyBuffer . Slice ( 1 ) ;
1003-
1047+ if ( bodyBuffer . Length < typeNameLength )
1048+ {
1049+ throw new IOException ( $ "bodyBuffer.Length:{ bodyBuffer . Length } < TypeNameLength: { typeNameLength } ,Content:{ encoding . GetString ( bodyBuffer ) } ") ;
1050+ }
10041051 var typeName = encoding . GetString ( bodyBuffer . Slice ( 0 , typeNameLength ) ) ;
10051052 bodyBuffer = bodyBuffer . Slice ( typeNameLength ) ;
10061053
@@ -1030,6 +1077,10 @@ protected void HandlePackage(QpPackageType packageType, ReadOnlySequence<byte> p
10301077 var typeNameLength = bodyBuffer . First . Span [ 0 ] ;
10311078 bodyBuffer = bodyBuffer . Slice ( 1 ) ;
10321079
1080+ if ( bodyBuffer . Length < typeNameLength )
1081+ {
1082+ throw new IOException ( $ "bodyBuffer.Length:{ bodyBuffer . Length } < TypeNameLength: { typeNameLength } ,Content:{ encoding . GetString ( bodyBuffer ) } ") ;
1083+ }
10331084 typeName = encoding . GetString ( bodyBuffer . Slice ( 0 , typeNameLength ) ) ;
10341085 bodyBuffer = bodyBuffer . Slice ( typeNameLength ) ;
10351086
0 commit comments