Skip to content

Commit 364fd4b

Browse files
committed
发送压缩功能重构中。。。
1 parent 1c45aee commit 364fd4b

File tree

4 files changed

+190
-32
lines changed

4 files changed

+190
-32
lines changed

Quick.Protocol/QpChannel_Recv.cs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -219,26 +219,39 @@ private async Task ReadRecvPipeAsync(PipeReader recvReader, CancellationToken to
219219

220220
//开始解密
221221
var encryptedBuffer = packageBuffer.Slice(PACKAGE_TOTAL_LENGTH_LENGTH);
222+
var inLength = 0;
222223
while (encryptedBuffer.Length > 0)
223224
{
224-
var inLength = Math.Min(decryptBuffer1.Length, (int)encryptedBuffer.Length);
225+
inLength = Math.Min(decryptBuffer1.Length, (int)encryptedBuffer.Length);
225226
encryptedBuffer.Slice(0, inLength).CopyTo(decryptBuffer1);
227+
encryptedBuffer = encryptedBuffer.Slice(inLength);
228+
if (inLength < dec.InputBlockSize)
229+
{
230+
var v = dec.InputBlockSize - inLength;
231+
Array.Fill(decryptBuffer1, (byte)v, inLength, v);
232+
}
226233
var outLength = dec.TransformBlock(decryptBuffer1, 0, inLength, decryptBuffer2, 0);
227234
decryptBuffer2.CopyTo(decryptPipe.Writer.GetMemory(outLength));
228235
decryptPipe.Writer.Advance(outLength);
229-
encryptedBuffer = encryptedBuffer.Slice(inLength);
236+
230237
packageTotalLength += outLength;
231238
}
232239
{
233-
var finalData = dec.TransformFinalBlock(decryptBuffer1, 0, 0);
234-
finalData.CopyTo(decryptPipe.Writer.GetMemory(finalData.Length));
235-
decryptPipe.Writer.Advance(finalData.Length);
236-
packageTotalLength += finalData.Length;
240+
var finnalInLength = 0;
241+
if (inLength == dec.InputBlockSize)
242+
{
243+
encryptBuffer1[0] = (byte)enc.InputBlockSize;
244+
finnalInLength = 1;
245+
}
246+
var finalData = dec.TransformFinalBlock(decryptBuffer1, 0, finnalInLength);
247+
if (finalData.Length > 0)
248+
{
249+
finalData.CopyTo(decryptPipe.Writer.GetMemory(finalData.Length));
250+
decryptPipe.Writer.Advance(finalData.Length);
251+
packageTotalLength += finalData.Length;
252+
}
237253
}
238-
_ = Task.Run(async () =>
239-
{
240-
await decryptPipe.Writer.FlushAsync().ConfigureAwait(false);
241-
});
254+
_ = decryptPipe.Writer.FlushAsync();
242255
ret = await decryptPipe.Reader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
243256
//解密完成,释放缓存
244257
currentReader.AdvanceTo(packageBuffer.End);

Quick.Protocol/QpChannel_Send.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,14 @@ private async Task writePackageBuffer(PipeReader currentReader, QpPackageType pa
5757
using (var inStream = packageBodyBuffer.AsStream())
5858
using (var outStream = writeCompressPipe.Writer.AsStream(true))
5959
using (var gzStream = new GZipStream(outStream, CompressionMode.Compress, true))
60+
{
6061
await inStream.CopyToAsync(gzStream).ConfigureAwait(false);
61-
62+
}
6263
currentReader.AdvanceTo(packageBodyBuffer.End);
63-
64+
6465
readRet = await writeCompressPipe.Reader.ReadAsync().ConfigureAwait(false);
6566
packageBodyBuffer = readRet.Buffer;
67+
packageTotalLength += Convert.ToInt32(packageBodyBuffer.Length);
6668

6769
//包总长度
6870
packageTotalLength += PACKAGE_TOTAL_LENGTH_LENGTH;
@@ -109,18 +111,15 @@ private async Task writePackageBuffer(PipeReader currentReader, QpPackageType pa
109111
encryptBuffer1[0] = (byte)enc.InputBlockSize;
110112
finnalInLength = 1;
111113
}
112-
var finalData = dec.TransformFinalBlock(encryptBuffer1, 0, finnalInLength);
114+
var finalData = enc.TransformFinalBlock(encryptBuffer1, 0, finnalInLength);
113115
if (finalData.Length > 0)
114116
{
115117
finalData.CopyTo(encryptPipe.Writer.GetMemory(finalData.Length));
116118
encryptPipe.Writer.Advance(finalData.Length);
117119
packageTotalLength += finalData.Length;
118120
}
119121
}
120-
_ = Task.Run(async () =>
121-
{
122-
await encryptPipe.Writer.FlushAsync().ConfigureAwait(false);
123-
});
122+
_ = encryptPipe.Writer.FlushAsync();
124123

125124
readRet = await encryptPipe.Reader.ReadAtLeastAsync(packageTotalLength).ConfigureAwait(false);
126125
currentReader.AdvanceTo(packageBodyBuffer.End);
@@ -175,9 +174,9 @@ await writeTask
175174
"{0}: [Send-Package]Length:{1},Type:{2},Content:{3}",
176175
DateTime.Now,
177176
packageTotalLength,
178-
(QpPackageType)packageType,
177+
packageType,
179178
LogUtils.LogContent ?
180-
BitConverter.ToString(sendHeadBuffer.Concat(packageBodyBuffer.ToArray()).ToArray())
179+
BitConverter.ToString(packageHeadMemory.ToArray().Concat(packageBodyBuffer.ToArray()).ToArray())
181180
: LogUtils.NOT_SHOW_CONTENT_MESSAGE);
182181
currentReader.AdvanceTo(packageBodyBuffer.End);
183182
await stream.FlushAsync().ConfigureAwait(false);
@@ -224,10 +223,11 @@ await UseSendPipe(async pipe =>
224223
//写入包类型
225224
writer.GetSpan(1)[0] = (byte)QpPackageType.Heartbeat;
226225
writer.Advance(1);
226+
var bodyLength = 1;
227227
_ = writer.FlushAsync();
228228
await writePackageBuffer(pipe.Reader,
229229
QpPackageType.Heartbeat,
230-
1).ConfigureAwait(false);
230+
bodyLength).ConfigureAwait(false);
231231
});
232232
}
233233

@@ -273,8 +273,8 @@ await UseSendPipe(async pipe =>
273273
if (LogUtils.LogNotice)
274274
LogUtils.Log("{0}: [Send-NoticePackage]Type:{1},Content:{2}", DateTime.Now, typeName, LogUtils.LogContent ? content : LogUtils.NOT_SHOW_CONTENT_MESSAGE);
275275
await writePackageBuffer(pipe.Reader,
276-
QpPackageType.Heartbeat,
277-
1).ConfigureAwait(false);
276+
QpPackageType.Notice,
277+
bodyLength).ConfigureAwait(false);
278278
});
279279
}
280280

@@ -327,8 +327,8 @@ await UseSendPipe(async pipe =>
327327
LogUtils.Log("{0}: [Send-CommandRequestPackage]CommandId:{1},Type:{2},Content:{3}", DateTime.Now, commandId, typeName, LogUtils.LogContent ? content : LogUtils.NOT_SHOW_CONTENT_MESSAGE);
328328

329329
await writePackageBuffer(pipe.Reader,
330-
QpPackageType.Heartbeat,
331-
1,
330+
QpPackageType.CommandRequest,
331+
bodyLength,
332332
ignoreCompressAndEncrypt).ConfigureAwait(false);
333333
});
334334
}
@@ -400,8 +400,8 @@ await UseSendPipe(async pipe =>
400400
}
401401
_ = writer.FlushAsync();
402402
await writePackageBuffer(pipe.Reader,
403-
QpPackageType.Heartbeat,
404-
1).ConfigureAwait(false);
403+
QpPackageType.CommandResponse,
404+
bodyLength).ConfigureAwait(false);
405405
});
406406
}
407407
}

Quick.Protocol/QpClient.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,21 @@ public async Task ConnectAsync()
4747
InstructionIds = Options.InstructionSet.Select(t => t.Id).ToArray()
4848
}).ConfigureAwait(false);
4949
AuthenticateQuestion = repConnect.Question;
50-
51-
Options.OnAuthPassed();
52-
IsConnected = true;
53-
50+
5451
var repAuth = await SendCommand(new Commands.Authenticate.Request()
5552
{
5653
Answer = CryptographyUtils.ComputeMD5Hash(AuthenticateQuestion + Options.Password)
57-
}, 30000, true).ConfigureAwait(false);
54+
}).ConfigureAwait(false);
55+
56+
Options.OnAuthPassed();
57+
IsConnected = true;
5858

5959
var repHandShake = await SendCommand(new Commands.HandShake.Request()
6060
{
6161
EnableCompress = Options.EnableCompress,
6262
EnableEncrypt = Options.EnableEncrypt,
6363
TransportTimeout = Options.TransportTimeout
64-
}, 5000).ConfigureAwait(false);
64+
}, 5000, true).ConfigureAwait(false);
6565

6666
//开始心跳
6767
if (Options.HeartBeatInterval > 0)
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO.Pipelines;
4+
using System.IO;
5+
using System.Linq;
6+
using System.Text;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
10+
namespace Quick.Protocol.Streams
11+
{
12+
internal sealed class PipeWriterStream : Stream
13+
{
14+
private readonly PipeWriter _pipeWriter;
15+
16+
internal bool LeaveOpen { get; set; }
17+
18+
public override bool CanRead => false;
19+
20+
public override bool CanSeek => false;
21+
22+
public override bool CanWrite => true;
23+
private long _Length = 0;
24+
public override long Length => _Length;
25+
26+
public override long Position
27+
{
28+
get
29+
{
30+
throw new NotSupportedException();
31+
}
32+
set
33+
{
34+
throw new NotSupportedException();
35+
}
36+
}
37+
38+
public PipeWriterStream(PipeWriter pipeWriter, bool leaveOpen)
39+
{
40+
_pipeWriter = pipeWriter;
41+
LeaveOpen = leaveOpen;
42+
}
43+
44+
protected override void Dispose(bool disposing)
45+
{
46+
if (!LeaveOpen)
47+
{
48+
_pipeWriter.Complete();
49+
}
50+
}
51+
52+
public override ValueTask DisposeAsync()
53+
{
54+
if (!LeaveOpen)
55+
{
56+
return _pipeWriter.CompleteAsync();
57+
}
58+
59+
return default(ValueTask);
60+
}
61+
62+
public override void Flush()
63+
{
64+
FlushAsync().GetAwaiter().GetResult();
65+
}
66+
67+
public override int Read(byte[] buffer, int offset, int count)
68+
{
69+
throw new NotSupportedException();
70+
}
71+
72+
public override long Seek(long offset, SeekOrigin origin)
73+
{
74+
throw new NotSupportedException();
75+
}
76+
77+
public override void SetLength(long value)
78+
{
79+
throw new NotSupportedException();
80+
}
81+
82+
public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
83+
{
84+
throw new NotSupportedException();
85+
}
86+
87+
public sealed override void EndWrite(IAsyncResult asyncResult)
88+
{
89+
throw new NotSupportedException();
90+
}
91+
92+
public override void Write(byte[] buffer, int offset, int count)
93+
{
94+
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
95+
_Length += count;
96+
}
97+
98+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
99+
{
100+
if (buffer == null)
101+
{
102+
throw new ArgumentNullException(nameof(buffer));
103+
}
104+
105+
ValueTask<FlushResult> valueTask = _pipeWriter.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
106+
_Length += count;
107+
return GetFlushResultAsTask(valueTask);
108+
}
109+
110+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
111+
{
112+
ValueTask<FlushResult> valueTask = _pipeWriter.WriteAsync(buffer, cancellationToken);
113+
_Length += buffer.Length;
114+
return new ValueTask(GetFlushResultAsTask(valueTask));
115+
}
116+
117+
public override Task FlushAsync(CancellationToken cancellationToken)
118+
{
119+
ValueTask<FlushResult> valueTask = _pipeWriter.FlushAsync(cancellationToken);
120+
return GetFlushResultAsTask(valueTask);
121+
}
122+
123+
private static Task GetFlushResultAsTask(ValueTask<FlushResult> valueTask)
124+
{
125+
if (valueTask.IsCompletedSuccessfully)
126+
{
127+
if (valueTask.Result.IsCanceled)
128+
{
129+
throw new OperationCanceledException();
130+
}
131+
132+
return Task.CompletedTask;
133+
}
134+
135+
return AwaitTask(valueTask);
136+
static async Task AwaitTask(ValueTask<FlushResult> valueTask)
137+
{
138+
if ((await valueTask.ConfigureAwait(continueOnCapturedContext: false)).IsCanceled)
139+
{
140+
throw new OperationCanceledException();
141+
}
142+
}
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)