Skip to content

Commit 9c4ff1e

Browse files
committed
supported set length
1 parent 91ff97d commit 9c4ff1e

File tree

2 files changed

+26
-22
lines changed

2 files changed

+26
-22
lines changed

src/CatLib.Core.Tests/Support/Stream/PipelineStreamTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ public void TestPipeline()
2828
stream = new PipelineStream(256);
2929
ThreadPool.QueueUserWorkItem(WriteThread);
3030

31-
var wrote = false;
32-
stream.OnWrote += (_) =>
31+
var readed = false;
32+
stream.OnRead += (_) =>
3333
{
34-
wrote = true;
34+
readed = true;
3535
};
3636
var data = new byte[100];
3737
int read;
@@ -52,7 +52,7 @@ public void TestPipeline()
5252
}
5353

5454
Assert.AreEqual(expected.ToString(), actual.ToString());
55-
Assert.AreEqual(true, wrote);
55+
Assert.AreEqual(true, readed);
5656
}
5757

5858
public void WriteThread(object obj)

src/CatLib.Core/Support/Stream/PipelineStream.cs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ public class PipelineStream : Stream
4242
private readonly RingBuffer ringBuffer;
4343

4444
/// <summary>
45-
/// 当写入完成后触发
45+
/// 当完成读取后触发
4646
/// </summary>
47-
public event Action<Stream> OnWrote;
47+
public event Action<Stream> OnRead;
4848

4949
/// <summary>
5050
/// 是否已经被释放了
@@ -56,11 +56,6 @@ public class PipelineStream : Stream
5656
/// </summary>
5757
private volatile bool closed;
5858

59-
/// <summary>
60-
/// 总流量
61-
/// </summary>
62-
public long TotalFlow { get; private set; }
63-
6459
/// <summary>
6560
/// 是否可以被读取
6661
/// </summary>
@@ -77,21 +72,31 @@ public override bool CanWrite
7772
get { return count < capacity && !closed; }
7873
}
7974

75+
/// <summary>
76+
/// 当前流的位置
77+
/// </summary>
78+
private long position;
79+
8080
/// <summary>
8181
/// 偏移位置(不支持)
8282
/// </summary>
8383
public override long Position
8484
{
85-
get { throw new NotSupportedException(); }
85+
get { return position; }
8686
set { throw new NotSupportedException(); }
8787
}
8888

89+
/// <summary>
90+
/// 流的长度
91+
/// </summary>
92+
private long length;
93+
8994
/// <summary>
9095
/// 流的长度
9196
/// </summary>
9297
public override long Length
9398
{
94-
get { throw new NotSupportedException(); }
99+
get { return length; }
95100
}
96101

97102
/// <summary>
@@ -119,7 +124,6 @@ public PipelineStream(int capacity = 4096, int sleep = 1)
119124
{
120125
this.capacity = capacity.ToPrime();
121126
this.sleep = Math.Max(0, sleep);
122-
TotalFlow = 0;
123127
ringBuffer = new RingBuffer(this.capacity, false);
124128
}
125129

@@ -148,7 +152,7 @@ public override long Seek(long offset, SeekOrigin origin)
148152
/// <param name="value">长度</param>
149153
public override void SetLength(long value)
150154
{
151-
throw new NotSupportedException();
155+
length = Math.Max(0, value);
152156
}
153157

154158
/// <summary>
@@ -198,7 +202,13 @@ public override int Read(byte[] buffer, int offset, int count)
198202
{
199203
var read = ringBuffer.Read(buffer, offset, count);
200204
this.count -= read;
201-
TotalFlow += read;
205+
position += read;
206+
207+
if (OnRead != null)
208+
{
209+
OnRead(this);
210+
}
211+
202212
return read;
203213
}
204214
finally
@@ -242,12 +252,6 @@ public override void Write(byte[] buffer, int offset, int count)
242252

243253
Guard.Requires<AssertException>(ringBuffer.Write(buffer, offset, count) == count);
244254
this.count += count;
245-
246-
if (OnWrote != null)
247-
{
248-
OnWrote(this);
249-
}
250-
251255
return;
252256
}
253257
}

0 commit comments

Comments
 (0)