Skip to content

Commit a95400b

Browse files
tg123k8s-ci-robot
authored andcommitted
watcher to support empty response (#313)
* Add a WatchAsync method. * proposal for watcher async * fix and add testcase for async watch * try fix build
1 parent 8ab95b6 commit a95400b

File tree

5 files changed

+82
-30
lines changed

5 files changed

+82
-30
lines changed

examples/watch/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ private static void Main(string[] args)
1313

1414
IKubernetes client = new Kubernetes(config);
1515

16-
var podlistResp = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
17-
using (podlistResp.Watch<V1Pod>((type, item) =>
16+
var podlistResp = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
17+
using (podlistResp.Watch<V1Pod, V1PodList>((type, item) =>
1818
{
1919
Console.WriteLine("==on watch event==");
2020
Console.WriteLine(type);

src/KubernetesClient/Kubernetes.Watch.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,12 @@ public partial class Kubernetes
149149
throw ex;
150150
}
151151

152-
var stream = await httpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false);
153-
StreamReader reader = new StreamReader(stream);
152+
return new Watcher<T>(async () => {
153+
var stream = await httpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false);
154+
StreamReader reader = new StreamReader(stream);
154155

155-
return new Watcher<T>(reader, onEvent, onError, onClosed);
156+
return reader;
157+
}, onEvent, onError, onClosed);
156158
}
157159
}
158160
}

src/KubernetesClient/Watcher.cs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ public class Watcher<T> : IDisposable
2929
public bool Watching { get; private set; }
3030

3131
private readonly CancellationTokenSource _cts;
32-
private readonly StreamReader _streamReader;
32+
private readonly Func<Task<StreamReader>> _streamReaderCreator;
33+
34+
private StreamReader _streamReader;
3335
private readonly Task _watcherLoop;
3436

3537
/// <summary>
@@ -47,9 +49,9 @@ public class Watcher<T> : IDisposable
4749
/// <param name="onClosed">
4850
/// The action to invoke when the server closes the connection.
4951
/// </param>
50-
public Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError, Action onClosed = null)
52+
public Watcher(Func<Task<StreamReader>> streamReaderCreator, Action<WatchEventType, T> onEvent, Action<Exception> onError, Action onClosed = null)
5153
{
52-
_streamReader = streamReader;
54+
_streamReaderCreator = streamReaderCreator;
5355
OnEvent += onEvent;
5456
OnError += onError;
5557
OnClosed += onClosed;
@@ -62,7 +64,7 @@ public Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Act
6264
public void Dispose()
6365
{
6466
_cts.Cancel();
65-
_streamReader.Dispose();
67+
_streamReader?.Dispose();
6668
}
6769

6870
/// <summary>
@@ -96,9 +98,10 @@ private async Task WatcherLoop(CancellationToken cancellationToken)
9698
{
9799
Watching = true;
98100
string line;
101+
_streamReader = await _streamReaderCreator();
99102

100103
// ReadLineAsync will return null when we've reached the end of the stream.
101-
while ((line = await this._streamReader.ReadLineAsync().ConfigureAwait(false)) != null)
104+
while ((line = await _streamReader.ReadLineAsync().ConfigureAwait(false)) != null)
102105
{
103106
if (cancellationToken.IsCancellationRequested)
104107
{
@@ -147,43 +150,49 @@ public static class WatcherExt
147150
/// create a watch object from a call to api server with watch=true
148151
/// </summary>
149152
/// <typeparam name="T">type of the event object</typeparam>
153+
/// <typeparam name="L">type of the HttpOperationResponse object</typeparam>
150154
/// <param name="response">the api response</param>
151155
/// <param name="onEvent">a callback when any event raised from api server</param>
152156
/// <param name="onError">a callbak when any exception was caught during watching</param>
153157
/// <param name="onClosed">
154158
/// The action to invoke when the server closes the connection.
155159
/// </param>
156160
/// <returns>a watch object</returns>
157-
public static Watcher<T> Watch<T>(this HttpOperationResponse response,
161+
public static Watcher<T> Watch<T, L>(this Task<HttpOperationResponse<L>> responseTask,
158162
Action<WatchEventType, T> onEvent,
159163
Action<Exception> onError = null,
160164
Action onClosed = null)
161165
{
162-
if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content))
163-
{
164-
throw new KubernetesClientException("not a watchable request or failed response");
165-
}
166+
return new Watcher<T>(async () => {
167+
var response = await responseTask;
168+
169+
if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content))
170+
{
171+
throw new KubernetesClientException("not a watchable request or failed response");
172+
}
166173

167-
return new Watcher<T>(content.StreamReader, onEvent, onError, onClosed);
174+
return content.StreamReader;
175+
} , onEvent, onError, onClosed);
168176
}
169177

170178
/// <summary>
171179
/// create a watch object from a call to api server with watch=true
172180
/// </summary>
173181
/// <typeparam name="T">type of the event object</typeparam>
182+
/// <typeparam name="L">type of the HttpOperationResponse object</typeparam>
174183
/// <param name="response">the api response</param>
175184
/// <param name="onEvent">a callback when any event raised from api server</param>
176185
/// <param name="onError">a callbak when any exception was caught during watching</param>
177186
/// <param name="onClosed">
178187
/// The action to invoke when the server closes the connection.
179188
/// </param>
180189
/// <returns>a watch object</returns>
181-
public static Watcher<T> Watch<T>(this HttpOperationResponse<T> response,
190+
public static Watcher<T> Watch<T, L>(this HttpOperationResponse<L> response,
182191
Action<WatchEventType, T> onEvent,
183192
Action<Exception> onError = null,
184193
Action onClosed = null)
185194
{
186-
return Watch((HttpOperationResponse)response, onEvent, onError, onClosed);
195+
return Watch(Task.FromResult(response), onEvent, onError, onClosed);
187196
}
188197
}
189198
}

tests/KubernetesClient.Tests/WatchTests.cs

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,16 @@ public async Task CannotWatch()
6666
});
6767

6868
// did not pass watch param
69-
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default");
70-
Assert.ThrowsAny<KubernetesClientException>(() =>
71-
{
72-
listTask.Watch<V1Pod>((type, item) => { });
73-
});
69+
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default");
70+
var onErrorCalled = false;
71+
72+
using (listTask.Watch<V1Pod, V1PodList>((type, item) => { }, e => {
73+
onErrorCalled = true;
74+
})) { }
75+
76+
await Task.Delay(TimeSpan.FromSeconds(1)); // delay for onerror to be called
77+
Assert.True(onErrorCalled);
78+
7479

7580
// server did not response line by line
7681
await Assert.ThrowsAnyAsync<Exception>(() =>
@@ -83,6 +88,41 @@ await Assert.ThrowsAnyAsync<Exception>(() =>
8388
}
8489
}
8590

91+
[Fact]
92+
public async Task AsyncWatcher()
93+
{
94+
var created = new AsyncManualResetEvent(false);
95+
var eventsReceived = new AsyncManualResetEvent(false);
96+
97+
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
98+
{
99+
// block until reponse watcher obj created
100+
await created.WaitAsync();
101+
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
102+
return false;
103+
}))
104+
{
105+
var client = new Kubernetes(new KubernetesClientConfiguration
106+
{
107+
Host = server.Uri.ToString()
108+
});
109+
110+
111+
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
112+
using (listTask.Watch<V1Pod, V1PodList>((type, item) => {
113+
eventsReceived.Set();
114+
}))
115+
{
116+
// here watcher is ready to use, but http server has not responsed yet.
117+
created.Set();
118+
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout));
119+
}
120+
121+
Assert.True(eventsReceived.IsSet);
122+
Assert.True(created.IsSet);
123+
}
124+
}
125+
86126
[Fact]
87127
public async Task SuriveBadLine()
88128
{
@@ -119,7 +159,7 @@ public async Task SuriveBadLine()
119159
var events = new HashSet<WatchEventType>();
120160
var errors = 0;
121161

122-
var watcher = listTask.Watch<V1Pod>(
162+
var watcher = listTask.Watch<V1Pod, V1PodList>(
123163
(type, item) =>
124164
{
125165
testOutput.WriteLine($"Watcher received '{type}' event.");
@@ -188,7 +228,7 @@ public async Task DisposeWatch()
188228

189229
var events = new HashSet<WatchEventType>();
190230

191-
var watcher = listTask.Watch<V1Pod>(
231+
var watcher = listTask.Watch<V1Pod, V1PodList>(
192232
(type, item) =>
193233
{
194234
events.Add(type);
@@ -256,7 +296,7 @@ public async Task WatchAllEvents()
256296
var events = new HashSet<WatchEventType>();
257297
var errors = 0;
258298

259-
var watcher = listTask.Watch<V1Pod>(
299+
var watcher = listTask.Watch<V1Pod, V1PodList>(
260300
(type, item) =>
261301
{
262302
testOutput.WriteLine($"Watcher received '{type}' event.");
@@ -330,7 +370,7 @@ public async Task WatchEventsWithTimeout()
330370
var events = new HashSet<WatchEventType>();
331371
var errors = 0;
332372

333-
var watcher = listTask.Watch<V1Pod>(
373+
var watcher = listTask.Watch<V1Pod, V1PodList>(
334374
(type, item) =>
335375
{
336376
testOutput.WriteLine($"Watcher received '{type}' event.");
@@ -396,7 +436,7 @@ public async Task WatchServerDisconnect()
396436

397437
waitForException.Set();
398438
Watcher<V1Pod> watcher;
399-
watcher = listTask.Watch<V1Pod>(
439+
watcher = listTask.Watch<V1Pod, V1PodList>(
400440
onEvent: (type, item) => { },
401441
onError: e =>
402442
{
@@ -463,7 +503,7 @@ public async Task TestWatchWithHandlers()
463503

464504
var events = new HashSet<WatchEventType>();
465505

466-
var watcher = listTask.Watch<V1Pod>(
506+
var watcher = listTask.Watch<V1Pod, V1PodList>(
467507
(type, item) =>
468508
{
469509
events.Add(type);

tests/KubernetesClient.Tests/WatcherTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.IO;
44
using System.Text;
55
using System.Threading;
6+
using System.Threading.Tasks;
67
using Xunit;
78

89
namespace k8s.Tests
@@ -21,7 +22,7 @@ public void ReadError()
2122
ManualResetEvent mre = new ManualResetEvent(false);
2223

2324
Watcher<V1Pod> watcher = new Watcher<V1Pod>(
24-
reader,
25+
() => Task.FromResult(reader),
2526
null,
2627
(exception) =>
2728
{

0 commit comments

Comments
 (0)