Skip to content

Commit f06cfe6

Browse files
committed
修复解压和解密时内容太大卡住的问题。
1 parent 78826a6 commit f06cfe6

File tree

2 files changed

+38
-17
lines changed

2 files changed

+38
-17
lines changed

Quick.Protocol/QpChannel_Recv.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,14 @@ private async Task ReadRecvPipeAsync(PipeReader recvReader, CancellationToken to
235235
decryptPipe.Writer.Advance(finalData.Length);
236236
packageTotalLength += finalData.Length;
237237
}
238-
await decryptPipe.Writer.FlushAsync().ConfigureAwait(false);
238+
_ = Task.Run(async () =>
239+
{
240+
await decryptPipe.Writer.FlushAsync().ConfigureAwait(false);
241+
});
242+
ret = await decryptPipe.Reader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
239243
//解密完成,释放缓存
240244
currentReader.AdvanceTo(packageBuffer.End);
241245

242-
ret = await decryptPipe.Reader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
243246
packageBuffer = ret.Buffer;
244247
currentReader = decryptPipe.Reader;
245248
}
@@ -270,11 +273,13 @@ private async Task ReadRecvPipeAsync(PipeReader recvReader, CancellationToken to
270273
packageTotalLength += count;
271274
}
272275
}
273-
await decompressPipe.Writer.FlushAsync().ConfigureAwait(false);
276+
_ = Task.Run(async () =>
277+
{
278+
await decompressPipe.Writer.FlushAsync().ConfigureAwait(false);
279+
});
280+
ret = await decompressPipe.Reader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
274281
//解压完成,释放缓存
275282
currentReader.AdvanceTo(packageBuffer.End);
276-
277-
ret = await decompressPipe.Reader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
278283
packageBuffer = ret.Buffer;
279284
currentReader = decompressPipe.Reader;
280285
}

Quick.Protocol/QpChannel_Send.cs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ private async Task writePackageBuffer(Stream stream, QpPackageType packageType,
3737
Pipe currentPipe = null;
3838
var packageTotalLength = 0;
3939
Memory<byte> packageHeadMemory = default;
40+
4041
//如果压缩或者加密
4142
if (options.InternalCompress || options.InternalEncrypt)
4243
{
@@ -45,17 +46,16 @@ private async Task writePackageBuffer(Stream stream, QpPackageType packageType,
4546
{
4647
using (var inStream = packageBodyBuffer.AsStream())
4748
using (var outStream = writeCompressPipe.Writer.AsStream(true))
48-
using (var gzStream = new GZipStream(outStream, CompressionMode.Compress, true))
49-
{
50-
gzStream.WriteByte((byte)packageType);
49+
using (var gzStream = new GZipStream(outStream, CompressionMode.Compress, true))
5150
await inStream.CopyToAsync(gzStream).ConfigureAwait(false);
52-
}
51+
5352
currentPipe = writeCompressPipe;
5453
var readRet = await writeCompressPipe.Reader.ReadAsync().ConfigureAwait(false);
5554
packageBodyBuffer = readRet.Buffer;
5655
var packageBodyLength = Convert.ToInt32(packageBodyBuffer.Length);
57-
//准备包头
56+
//包总长度
5857
packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH + packageBodyLength;
58+
//准备包头
5959
writePackageTotalLengthToBuffer(sendHeadBuffer, 0, packageTotalLength);
6060
packageHeadMemory = new Memory<byte>(sendHeadBuffer, 0, PACKAGE_TOTAL_LENGTH_LENGTH);
6161
}
@@ -75,16 +75,17 @@ private async Task writePackageBuffer(Stream stream, QpPackageType packageType,
7575
}
7676
else
7777
{
78+
//包总长度
79+
packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH + Convert.ToInt32(packageBodyBuffer.Length);
7880
//准备包头
79-
packageTotalLength = PACKAGE_HEAD_LENGTH + Convert.ToInt32(packageBodyBuffer.Length);
8081
writePackageTotalLengthToBuffer(sendHeadBuffer, 0, packageTotalLength);
81-
sendHeadBuffer[PACKAGE_TOTAL_LENGTH_LENGTH] = (byte)packageType;
82-
packageHeadMemory = new Memory<byte>(sendHeadBuffer, 0, PACKAGE_HEAD_LENGTH);
82+
packageHeadMemory = new Memory<byte>(sendHeadBuffer, 0, PACKAGE_TOTAL_LENGTH_LENGTH);
8383
}
8484
//执行AfterSendHandler
8585
afterSendHandler?.Invoke();
8686
//写入包头
8787
await stream.WriteAsync(packageHeadMemory).ConfigureAwait(false);
88+
8889
//如果有包内容,写入包内容
8990
if (packageBodyBuffer.Length > 0)
9091
{
@@ -185,17 +186,24 @@ public Task SendHeartbeatPackage()
185186
{
186187
return writePackageAsync(writer =>
187188
{
188-
return Task.FromResult(new Tuple<QpPackageType, int>(QpPackageType.Heartbeat, 0));
189+
//写入包类型
190+
writer.GetSpan(1)[0] = (byte)QpPackageType.Heartbeat;
191+
writer.Advance(1);
192+
return Task.FromResult(new Tuple<QpPackageType, int>(QpPackageType.Heartbeat, 1));
189193
}, null);
190194
}
191195

192196
public Task SendNoticePackage(string noticePackageTypeName, string noticePackageContent)
193197
{
194198
return writePackageAsync(async writer =>
195199
{
200+
//写入包类型
201+
writer.GetSpan(1)[0] = (byte)QpPackageType.Notice;
202+
writer.Advance(1);
203+
196204
var typeName = noticePackageTypeName;
197205
var content = noticePackageContent;
198-
var bodyLength = 0;
206+
var bodyLength = 1;
199207
//写入类名和长度
200208
{
201209
var typeNameByteLength = encoding.GetByteCount(typeName);
@@ -237,7 +245,11 @@ public Task SendCommandRequestPackage(string commandId, string typeName, string
237245
{
238246
return writePackageAsync(async writer =>
239247
{
240-
var bodyLength = 0;
248+
//写入包类型
249+
writer.GetSpan(1)[0] = (byte)QpPackageType.CommandRequest;
250+
writer.Advance(1);
251+
252+
var bodyLength = 1;
241253
//写入指令编号
242254
{
243255
var commandIdLength = commandId.Length / 2;
@@ -279,7 +291,11 @@ public Task SendCommandResponsePackage(string commandId, byte code, string messa
279291
{
280292
return writePackageAsync(async writer =>
281293
{
282-
var bodyLength = 0;
294+
//写入包类型
295+
writer.GetSpan(1)[0] = (byte)QpPackageType.CommandResponse;
296+
writer.Advance(1);
297+
298+
var bodyLength = 1;
283299
//写入指令编号
284300
{
285301
var commandIdLength = commandId.Length / 2;

0 commit comments

Comments
 (0)