Skip to content

Commit a0992c1

Browse files
committed
发送压缩功能重构完成。
1 parent 7515bb4 commit a0992c1

File tree

1 file changed

+42
-26
lines changed

1 file changed

+42
-26
lines changed

Quick.Protocol/QpChannel.cs

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -275,29 +275,36 @@ protected virtual void OnReadError(Exception exception)
275275
Disconnect();
276276
}
277277

278+
private Pipe writeCompressPipe = new Pipe();
279+
278280
private async Task writePackageBuffer(Stream stream, QpPackageType packageType, ReadOnlySequence<byte> packageBodyBuffer, Action afterSendHandler)
279281
{
280-
/*
282+
Pipe currentPipe = null;
283+
var packageTotalLength = 0;
284+
Memory<byte> packageHeadMemory = default;
281285
//如果压缩或者加密
282286
if (options.InternalCompress || options.InternalEncrypt)
283287
{
284288
//如果压缩
285289
if (options.InternalCompress)
286290
{
287-
var currentBuffer = getFreeBuffer(packageBuffer.Array, sendBuffer, sendBuffer2);
288-
using (var ms = new MemoryStream(currentBuffer))
291+
using (var inStream = packageBodyBuffer.AsStream())
292+
using (var outStream = writeCompressPipe.Writer.AsStream(true))
293+
using (var gzStream = new GZipStream(outStream, CompressionMode.Compress, true))
289294
{
290-
//写入包长度
291-
for (var i = 0; i < PACKAGE_TOTAL_LENGTH_LENGTH; i++)
292-
ms.WriteByte(0);
293-
using (var gzStream = new GZipStream(ms, CompressionMode.Compress, true))
294-
await gzStream.WriteAsync(packageBuffer.Array, packageBuffer.Offset + PACKAGE_TOTAL_LENGTH_LENGTH, packageBuffer.Count - PACKAGE_TOTAL_LENGTH_LENGTH)
295-
.ConfigureAwait(false);
296-
var packageTotalLength = Convert.ToInt32(ms.Position);
297-
writePackageTotalLengthToBuffer(currentBuffer, 0, packageTotalLength);
298-
packageBuffer = new ArraySegment<byte>(currentBuffer, 0, packageTotalLength);
295+
gzStream.WriteByte((byte)packageType);
296+
await inStream.CopyToAsync(gzStream).ConfigureAwait(false);
299297
}
298+
currentPipe = writeCompressPipe;
299+
var readRet = await writeCompressPipe.Reader.ReadAsync().ConfigureAwait(false);
300+
packageBodyBuffer = readRet.Buffer;
301+
var packageBodyLength = Convert.ToInt32(packageBodyBuffer.Length);
302+
//准备包头
303+
packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH + packageBodyLength;
304+
writePackageTotalLengthToBuffer(sendHeadBuffer, 0, packageTotalLength);
305+
packageHeadMemory = new Memory<byte>(sendHeadBuffer, 0, PACKAGE_TOTAL_LENGTH_LENGTH);
300306
}
307+
/*
301308
//如果加密
302309
if (options.InternalEncrypt)
303310
{
@@ -309,18 +316,20 @@ await gzStream.WriteAsync(packageBuffer.Array, packageBuffer.Offset + PACKAGE_TO
309316
Array.Copy(retBuffer, 0, currentBuffer, PACKAGE_TOTAL_LENGTH_LENGTH, retBuffer.Length);
310317
packageBuffer = new ArraySegment<byte>(currentBuffer, 0, packageTotalLength);
311318
}
319+
*/
320+
}
321+
else
322+
{
323+
//准备包头
324+
packageTotalLength = PACKAGE_HEAD_LENGTH + Convert.ToInt32(packageBodyBuffer.Length);
325+
writePackageTotalLengthToBuffer(sendHeadBuffer, 0, packageTotalLength);
326+
sendHeadBuffer[PACKAGE_TOTAL_LENGTH_LENGTH] = (byte)packageType;
327+
packageHeadMemory = new Memory<byte>(sendHeadBuffer, 0, PACKAGE_HEAD_LENGTH);
312328
}
313-
*/
314-
315329
//执行AfterSendHandler
316330
afterSendHandler?.Invoke();
317-
318331
//写入包头
319-
var packageTotalLength = PACKAGE_HEAD_LENGTH + Convert.ToInt32(packageBodyBuffer.Length);
320-
writePackageTotalLengthToBuffer(sendHeadBuffer, 0, packageTotalLength);
321-
sendHeadBuffer[PACKAGE_TOTAL_LENGTH_LENGTH] = (byte)packageType;
322-
await stream.WriteAsync(sendHeadBuffer).ConfigureAwait(false);
323-
332+
await stream.WriteAsync(packageHeadMemory).ConfigureAwait(false);
324333
//如果有包内容,写入包内容
325334
if (packageBodyBuffer.Length > 0)
326335
{
@@ -337,7 +346,7 @@ await writeTask
337346

338347
if (options.EnableNetstat)
339348
{
340-
BytesSent += packageBodyBuffer.Length;
349+
BytesSent += packageHeadMemory.Length + packageBodyBuffer.Length;
341350
if (BytesSent > LONG_HALF_MAX_VALUE)
342351
BytesSent = 0;
343352
}
@@ -350,6 +359,8 @@ await writeTask
350359
LogUtils.LogContent ?
351360
BitConverter.ToString(sendHeadBuffer.Concat(packageBodyBuffer.ToArray()).ToArray())
352361
: LogUtils.NOT_SHOW_CONTENT_MESSAGE);
362+
if (currentPipe != null)
363+
currentPipe.Reader.AdvanceTo(packageBodyBuffer.End);
353364
await stream.FlushAsync().ConfigureAwait(false);
354365
}
355366

@@ -911,13 +922,18 @@ private async Task ReadRecvPipeAsync(PipeReader recvReader, CancellationToken to
911922
packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH;
912923

913924
//开始解压
914-
var compressedBuffer = packageBuffer.Slice(PACKAGE_TOTAL_LENGTH_LENGTH);
925+
var compressedBuffer = packageBuffer.Slice(PACKAGE_TOTAL_LENGTH_LENGTH);
915926
using (var readMs = compressedBuffer.AsStream())
916927
using (var gzStream = new GZipStream(readMs, CompressionMode.Decompress, true))
917-
{
918-
var count = await gzStream.ReadAsync(decompressPipe.Writer.GetMemory(minimumBufferSize),token).ConfigureAwait(false);
919-
decompressPipe.Writer.Advance(count);
920-
packageTotalLength += count;
928+
{
929+
while (true)
930+
{
931+
var count = await gzStream.ReadAsync(decompressPipe.Writer.GetMemory(minimumBufferSize), token).ConfigureAwait(false);
932+
if (count <= 0)
933+
break;
934+
decompressPipe.Writer.Advance(count);
935+
packageTotalLength += count;
936+
}
921937
}
922938
await decompressPipe.Writer.FlushAsync().ConfigureAwait(false);
923939
//解压完成,释放缓存

0 commit comments

Comments
 (0)