Skip to content

Commit 3e94068

Browse files
authored
Improve Watcher thread utilisation. (#386)
* Add Watcher constructor overload which accepts the more general TextReader. * Make CancelableStream delegate the async methods to the inner stream. * Make CancelableStream dispose of the inner stream. * Make PeekableStreamReader a decorator enabling direct delegation to the inner's asynchronous methods. * Make CancelableStream.Flush respect the cancellation token. * Rename CancellationTokenSourceSlim -> LinkedCancellationTokenSource. * Specify 7.3 language version.
1 parent ddb4b21 commit 3e94068

File tree

3 files changed

+107
-16
lines changed

3 files changed

+107
-16
lines changed

src/KubernetesClient/KubernetesClient.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
<PackageIconUrl>https://raw.githubusercontent.com/kubernetes/kubernetes/master/logo/logo.png</PackageIconUrl>
1010
<PackageTags>kubernetes;docker;containers;</PackageTags>
1111

12+
<LangVersion>7.3</LangVersion>
1213
<TargetFrameworks>netstandard2.0;net452;netcoreapp2.1</TargetFrameworks>
1314
<TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">netstandard2.0;netcoreapp2.1</TargetFrameworks>
1415
<RootNamespace>k8s</RootNamespace>

src/KubernetesClient/Watcher.cs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ public class Watcher<T> : IDisposable
2929
public bool Watching { get; private set; }
3030

3131
private readonly CancellationTokenSource _cts;
32-
private readonly Func<Task<StreamReader>> _streamReaderCreator;
32+
private readonly Func<Task<TextReader>> _streamReaderCreator;
3333

34-
private StreamReader _streamReader;
34+
private TextReader _streamReader;
3535
private readonly Task _watcherLoop;
3636

3737
/// <summary>
@@ -50,6 +50,28 @@ public class Watcher<T> : IDisposable
5050
/// The action to invoke when the server closes the connection.
5151
/// </param>
5252
public Watcher(Func<Task<StreamReader>> streamReaderCreator, Action<WatchEventType, T> onEvent, Action<Exception> onError, Action onClosed = null)
53+
: this(
54+
async () => (TextReader)await streamReaderCreator().ConfigureAwait(false),
55+
onEvent, onError, onClosed)
56+
{
57+
}
58+
59+
/// <summary>
60+
/// Initializes a new instance of the <see cref="Watcher{T}"/> class.
61+
/// </summary>
62+
/// <param name="streamReader">
63+
/// A <see cref="StreamReader"/> from which to read the events.
64+
/// </param>
65+
/// <param name="onEvent">
66+
/// The action to invoke when the server sends a new event.
67+
/// </param>
68+
/// <param name="onError">
69+
/// The action to invoke when an error occurs.
70+
/// </param>
71+
/// <param name="onClosed">
72+
/// The action to invoke when the server closes the connection.
73+
/// </param>
74+
public Watcher(Func<Task<TextReader>> streamReaderCreator, Action<WatchEventType, T> onEvent, Action<Exception> onError, Action onClosed = null)
5375
{
5476
_streamReaderCreator = streamReaderCreator;
5577
OnEvent += onEvent;

src/KubernetesClient/WatcherDelegatingHandler.cs

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,43 @@ public CancelableStream(Stream innerStream, CancellationToken cancellationToken)
4545
_cancellationToken = cancellationToken;
4646
}
4747

48-
public override void Flush() => _innerStream.Flush();
48+
public override void Flush() =>
49+
_innerStream.FlushAsync(_cancellationToken).GetAwaiter().GetResult();
50+
51+
public override async Task FlushAsync(CancellationToken cancellationToken)
52+
{
53+
using (var cancellationTokenSource = CreateCancellationTokenSource(cancellationToken))
54+
{
55+
await _innerStream.FlushAsync(cancellationTokenSource.Token).ConfigureAwait(false);
56+
}
57+
}
4958

5059
public override int Read(byte[] buffer, int offset, int count) =>
5160
_innerStream.ReadAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult();
5261

62+
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
63+
{
64+
using (var cancellationTokenSource = CreateCancellationTokenSource(cancellationToken))
65+
{
66+
return await _innerStream.ReadAsync(buffer, offset, count, cancellationTokenSource.Token).ConfigureAwait(false);
67+
}
68+
}
69+
5370
public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin);
5471

5572
public override void SetLength(long value) => _innerStream.SetLength(value);
5673

5774
public override void Write(byte[] buffer, int offset, int count) =>
5875
_innerStream.WriteAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult();
5976

77+
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
78+
{
79+
using (var cancellationTokenSource = CreateCancellationTokenSource(cancellationToken))
80+
{
81+
await _innerStream.WriteAsync(buffer, offset, count, cancellationTokenSource.Token).ConfigureAwait(false);
82+
}
83+
}
84+
6085
public override bool CanRead => _innerStream.CanRead;
6186

6287
public override bool CanSeek => _innerStream.CanSeek;
@@ -70,6 +95,46 @@ public override long Position
7095
get => _innerStream.Position;
7196
set => _innerStream.Position = value;
7297
}
98+
99+
protected override void Dispose(bool disposing)
100+
{
101+
if (disposing)
102+
{
103+
_innerStream.Dispose();
104+
}
105+
base.Dispose(disposing);
106+
}
107+
108+
private LinkedCancellationTokenSource CreateCancellationTokenSource(CancellationToken userCancellationToken)
109+
{
110+
return new LinkedCancellationTokenSource(_cancellationToken, userCancellationToken);
111+
}
112+
113+
private readonly struct LinkedCancellationTokenSource : IDisposable
114+
{
115+
private readonly CancellationTokenSource _cancellationTokenSource;
116+
117+
public LinkedCancellationTokenSource(CancellationToken token1, CancellationToken token2)
118+
{
119+
if (token1.CanBeCanceled && token2.CanBeCanceled)
120+
{
121+
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token1, token2);
122+
Token = _cancellationTokenSource.Token;
123+
}
124+
else
125+
{
126+
_cancellationTokenSource = null;
127+
Token = token1.CanBeCanceled ? token1 : token2;
128+
}
129+
}
130+
131+
public CancellationToken Token { get; }
132+
133+
public void Dispose()
134+
{
135+
_cancellationTokenSource?.Dispose();
136+
}
137+
}
73138
}
74139

75140
internal class LineSeparatedHttpContent : HttpContent
@@ -107,24 +172,18 @@ protected override bool TryComputeLength(out long length)
107172
}
108173
}
109174

110-
internal class PeekableStreamReader : StreamReader
175+
internal class PeekableStreamReader : TextReader
111176
{
112-
private Queue<string> _buffer;
177+
private readonly Queue<string> _buffer;
178+
private readonly StreamReader _inner;
113179

114-
public PeekableStreamReader(Stream stream) : base(stream)
180+
public PeekableStreamReader(Stream stream)
115181
{
116182
_buffer = new Queue<string>();
183+
_inner = new StreamReader(stream);
117184
}
118185

119-
public override string ReadLine()
120-
{
121-
if (_buffer.Count > 0)
122-
{
123-
return _buffer.Dequeue();
124-
}
125-
126-
return base.ReadLine();
127-
}
186+
public override string ReadLine() => throw new NotImplementedException();
128187

129188
public override Task<string> ReadLineAsync()
130189
{
@@ -133,7 +192,7 @@ public override Task<string> ReadLineAsync()
133192
return Task.FromResult(_buffer.Dequeue());
134193
}
135194

136-
return base.ReadLineAsync();
195+
return _inner.ReadLineAsync();
137196
}
138197

139198
public async Task<string> PeekLineAsync()
@@ -156,6 +215,15 @@ public async Task<string> PeekLineAsync()
156215
public override string ReadToEnd() => throw new NotImplementedException();
157216

158217
public override Task<string> ReadToEndAsync() => throw new NotImplementedException();
218+
219+
protected override void Dispose(bool disposing)
220+
{
221+
if (disposing)
222+
{
223+
_inner.Dispose();
224+
}
225+
base.Dispose(disposing);
226+
}
159227
}
160228
}
161229
}

0 commit comments

Comments
 (0)