Skip to content

Commit d55a333

Browse files
authored
fix watch=true doesn't respect cancellation token (#376)
* fix watch=true doesn't respect cancellation token * use exp body where can * address comments
1 parent 3e6815a commit d55a333

File tree

2 files changed

+99
-39
lines changed

2 files changed

+99
-39
lines changed

src/KubernetesClient/WatcherDelegatingHandler.cs

Lines changed: 64 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,61 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
2727

2828
if (query.TryGetValue("watch", out var values) && values.Any(v => v == "true"))
2929
{
30-
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
30+
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content, cancellationToken);
3131
}
3232
}
33+
3334
return originResponse;
3435
}
3536

37+
internal class CancelableStream : Stream
38+
{
39+
private readonly Stream _innerStream;
40+
private readonly CancellationToken _cancellationToken;
41+
42+
public CancelableStream(Stream innerStream, CancellationToken cancellationToken)
43+
{
44+
_innerStream = innerStream;
45+
_cancellationToken = cancellationToken;
46+
}
47+
48+
public override void Flush() => _innerStream.Flush();
49+
50+
public override int Read(byte[] buffer, int offset, int count) =>
51+
_innerStream.ReadAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult();
52+
53+
public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin);
54+
55+
public override void SetLength(long value) => _innerStream.SetLength(value);
56+
57+
public override void Write(byte[] buffer, int offset, int count) =>
58+
_innerStream.WriteAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult();
59+
60+
public override bool CanRead => _innerStream.CanRead;
61+
62+
public override bool CanSeek => _innerStream.CanSeek;
63+
64+
public override bool CanWrite => _innerStream.CanWrite;
65+
66+
public override long Length => _innerStream.Length;
67+
68+
public override long Position
69+
{
70+
get => _innerStream.Position;
71+
set => _innerStream.Position = value;
72+
}
73+
}
74+
3675
internal class LineSeparatedHttpContent : HttpContent
3776
{
3877
private readonly HttpContent _originContent;
78+
private readonly CancellationToken _cancellationToken;
3979
private Stream _originStream;
4080

41-
public LineSeparatedHttpContent(HttpContent originContent)
81+
public LineSeparatedHttpContent(HttpContent originContent, CancellationToken cancellationToken)
4282
{
4383
_originContent = originContent;
84+
_cancellationToken = cancellationToken;
4485
}
4586

4687
internal PeekableStreamReader StreamReader { get; private set; }
@@ -49,17 +90,14 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
4990
{
5091
_originStream = await _originContent.ReadAsStreamAsync().ConfigureAwait(false);
5192

52-
StreamReader = new PeekableStreamReader(_originStream);
93+
StreamReader = new PeekableStreamReader(new CancelableStream(_originStream, _cancellationToken));
5394

5495
var firstLine = await StreamReader.PeekLineAsync().ConfigureAwait(false);
5596

5697
var writer = new StreamWriter(stream);
5798

58-
// using (writer) // leave open
59-
{
60-
await writer.WriteAsync(firstLine).ConfigureAwait(false);
61-
await writer.FlushAsync().ConfigureAwait(false);
62-
}
99+
await writer.WriteAsync(firstLine).ConfigureAwait(false);
100+
await writer.FlushAsync().ConfigureAwait(false);
63101
}
64102

65103
protected override bool TryComputeLength(out long length)
@@ -68,9 +106,11 @@ protected override bool TryComputeLength(out long length)
68106
return false;
69107
}
70108
}
109+
71110
internal class PeekableStreamReader : StreamReader
72111
{
73112
private Queue<string> _buffer;
113+
74114
public PeekableStreamReader(Stream stream) : base(stream)
75115
{
76116
_buffer = new Queue<string>();
@@ -82,52 +122,40 @@ public override string ReadLine()
82122
{
83123
return _buffer.Dequeue();
84124
}
125+
85126
return base.ReadLine();
86127
}
128+
87129
public override Task<string> ReadLineAsync()
88130
{
89131
if (_buffer.Count > 0)
90132
{
91133
return Task.FromResult(_buffer.Dequeue());
92134
}
135+
93136
return base.ReadLineAsync();
94137
}
138+
95139
public async Task<string> PeekLineAsync()
96140
{
97141
var line = await ReadLineAsync().ConfigureAwait(false);
98142
_buffer.Enqueue(line);
99143
return line;
100144
}
101145

102-
public override int Read()
103-
{
104-
throw new NotImplementedException();
105-
}
146+
public override int Read() => throw new NotImplementedException();
106147

107-
public override int Read(char[] buffer, int index, int count)
108-
{
109-
throw new NotImplementedException();
110-
}
111-
public override Task<int> ReadAsync(char[] buffer, int index, int count)
112-
{
113-
throw new NotImplementedException();
114-
}
115-
public override int ReadBlock(char[] buffer, int index, int count)
116-
{
117-
throw new NotImplementedException();
118-
}
119-
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
120-
{
121-
throw new NotImplementedException();
122-
}
123-
public override string ReadToEnd()
124-
{
125-
throw new NotImplementedException();
126-
}
127-
public override Task<string> ReadToEndAsync()
128-
{
129-
throw new NotImplementedException();
130-
}
148+
public override int Read(char[] buffer, int index, int count) => throw new NotImplementedException();
149+
150+
public override Task<int> ReadAsync(char[] buffer, int index, int count) => throw new NotImplementedException();
151+
152+
public override int ReadBlock(char[] buffer, int index, int count) => throw new NotImplementedException();
153+
154+
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count) => throw new NotImplementedException();
155+
156+
public override string ReadToEnd() => throw new NotImplementedException();
157+
158+
public override Task<string> ReadToEndAsync() => throw new NotImplementedException();
131159
}
132160
}
133161
}

tests/KubernetesClient.Tests/WatchTests.cs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,14 @@ public async Task CannotWatch()
6969
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default");
7070
var onErrorCalled = false;
7171

72-
using (listTask.Watch<V1Pod, V1PodList>((type, item) => { }, e => {
72+
using (listTask.Watch<V1Pod, V1PodList>((type, item) => { }, e =>
73+
{
7374
onErrorCalled = true;
7475
})) { }
7576

7677
await Task.Delay(TimeSpan.FromSeconds(1)); // delay for onerror to be called
7778
Assert.True(onErrorCalled);
78-
79+
7980

8081
// server did not response line by line
8182
await Assert.ThrowsAnyAsync<Exception>(() =>
@@ -109,7 +110,8 @@ public async Task AsyncWatcher()
109110

110111

111112
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
112-
using (listTask.Watch<V1Pod, V1PodList>((type, item) => {
113+
using (listTask.Watch<V1Pod, V1PodList>((type, item) =>
114+
{
113115
eventsReceived.Set();
114116
}))
115117
{
@@ -748,5 +750,35 @@ public async Task DirectWatchEventsWithTimeout()
748750
}
749751
}
750752

753+
[Fact]
754+
public async Task WatchShouldCancelAfterRequested()
755+
{
756+
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
757+
758+
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
759+
{
760+
httpContext.Response.StatusCode = 200;
761+
await httpContext.Response.Body.FlushAsync();
762+
await Task.Delay(TimeSpan.FromSeconds(5)); // The default timeout is 100 seconds
763+
764+
return true;
765+
}, resp: ""))
766+
{
767+
var client = new Kubernetes(new KubernetesClientConfiguration
768+
{
769+
Host = server.Uri.ToString()
770+
});
771+
772+
var cts = new CancellationTokenSource();
773+
cts.CancelAfter(TimeSpan.FromSeconds(2));
774+
775+
await Assert.ThrowsAnyAsync<TaskCanceledException>(async () =>
776+
{
777+
await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true, cancellationToken: cts.Token);
778+
});
779+
780+
}
781+
782+
}
751783
}
752784
}

0 commit comments

Comments
 (0)