Skip to content

Commit bc1cb62

Browse files
qmfrederikbrendandburns
authored andcommitted
Watcher: notify the caller when the server closes the connection (#184)
* Watcher: Add onClosed event and callback * Update the code generators to accept the onClose callback * Re-generate API * Update unit tests * Fix unit test * Watcher fixes/improvements: - Run Task.Yield at the beginning to make sure we're running async - Set Watching = false before invoking OnClosed, to make sure the callers see a consistent state.
1 parent c1543b5 commit bc1cb62

File tree

8 files changed

+601
-140
lines changed

8 files changed

+601
-140
lines changed

gen/KubernetesWatchGenerator/IKubernetes.Watch.cs.template

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ namespace k8s
3535
/// <param name="onError">
3636
/// The action to invoke when an error occurs.
3737
/// </param>
38+
/// <param name="onClosed">
39+
/// The action to invoke when the server closes the connection.
40+
/// </param>
3841
/// <param name="cancellationToken">
3942
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
4043
/// </param>
@@ -55,6 +58,7 @@ namespace k8s
5558
Dictionary<string, List<string>> customHeaders = null,
5659
Action<WatchEventType, {{GetClassName operation}}> onEvent = null,
5760
Action<Exception> onError = null,
61+
Action onClosed = null,
5862
CancellationToken cancellationToken = default(CancellationToken));
5963

6064
{{/.}}

gen/KubernetesWatchGenerator/Kubernetes.Watch.cs.template

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ namespace k8s
2424
Dictionary<string, List<string>> customHeaders = null,
2525
Action<WatchEventType, {{GetClassName operation}}> onEvent = null,
2626
Action<Exception> onError = null,
27+
Action onClosed = null,
2728
CancellationToken cancellationToken = default(CancellationToken))
2829
{
2930
string path = $"{{GetPathExpression .}}";
30-
return WatchObjectAsync<{{GetClassName operation}}>(path: path, @continue: @continue, fieldSelector: fieldSelector, includeUninitialized: includeUninitialized, labelSelector: labelSelector, limit: limit, pretty: pretty, timeoutSeconds: timeoutSeconds, resourceVersion: resourceVersion, customHeaders: customHeaders, onEvent: onEvent, onError: onError, cancellationToken: cancellationToken);
31+
return WatchObjectAsync<{{GetClassName operation}}>(path: path, @continue: @continue, fieldSelector: fieldSelector, includeUninitialized: includeUninitialized, labelSelector: labelSelector, limit: limit, pretty: pretty, timeoutSeconds: timeoutSeconds, resourceVersion: resourceVersion, customHeaders: customHeaders, onEvent: onEvent, onError: onError, onClosed: onClosed, cancellationToken: cancellationToken);
3132
}
3233

3334
{{/.}}

src/KubernetesClient/IKubernetes.Watch.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,15 @@ public partial interface IKubernetes
4545
/// <param name="onError">
4646
/// The action to invoke when an error occurs.
4747
/// </param>
48+
/// <param name="onClosed">
49+
/// The action to invoke when the server closes the connection.
50+
/// </param>
4851
/// <param name="cancellationToken">
4952
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
5053
/// </param>
5154
/// <returns>
5255
/// A <see cref="Task"/> which represents the asynchronous operation, and returns a new watcher.
5356
/// </returns>
54-
Task<Watcher<T>> WatchObjectAsync<T>(string path, string @continue = null, string fieldSelector = null, bool? includeUninitialized = null, string labelSelector = null, int? limit = null, bool? pretty = null, int? timeoutSeconds = null, string resourceVersion = null, Dictionary<string, List<string>> customHeaders = null, Action<WatchEventType, T> onEvent = null, Action<Exception> onError = null, CancellationToken cancellationToken = default(CancellationToken));
57+
Task<Watcher<T>> WatchObjectAsync<T>(string path, string @continue = null, string fieldSelector = null, bool? includeUninitialized = null, string labelSelector = null, int? limit = null, bool? pretty = null, int? timeoutSeconds = null, string resourceVersion = null, Dictionary<string, List<string>> customHeaders = null, Action<WatchEventType, T> onEvent = null, Action<Exception> onError = null, Action onClosed = null, CancellationToken cancellationToken = default(CancellationToken));
5558
}
5659
}

src/KubernetesClient/Kubernetes.Watch.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace k8s
1313
public partial class Kubernetes
1414
{
1515
/// <inheritdoc/>
16-
public async Task<Watcher<T>> WatchObjectAsync<T>(string path, string @continue = null, string fieldSelector = null, bool? includeUninitialized = null, string labelSelector = null, int? limit = null, bool? pretty = null, int? timeoutSeconds = null, string resourceVersion = null, Dictionary<string, List<string>> customHeaders = null, Action<WatchEventType, T> onEvent = null, Action<Exception> onError = null, CancellationToken cancellationToken = default(CancellationToken))
16+
public async Task<Watcher<T>> WatchObjectAsync<T>(string path, string @continue = null, string fieldSelector = null, bool? includeUninitialized = null, string labelSelector = null, int? limit = null, bool? pretty = null, int? timeoutSeconds = null, string resourceVersion = null, Dictionary<string, List<string>> customHeaders = null, Action<WatchEventType, T> onEvent = null, Action<Exception> onError = null, Action onClosed = null, CancellationToken cancellationToken = default(CancellationToken))
1717
{
1818
// Tracing
1919
bool _shouldTrace = ServiceClientTracing.IsEnabled;
@@ -152,7 +152,7 @@ public partial class Kubernetes
152152
var stream = await httpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false);
153153
StreamReader reader = new StreamReader(stream);
154154

155-
return new Watcher<T>(reader, onEvent, onError);
155+
return new Watcher<T>(reader, onEvent, onError, onClosed);
156156
}
157157
}
158158
}

src/KubernetesClient/Watcher.cs

Lines changed: 98 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -29,68 +29,36 @@ public class Watcher<T> : IDisposable
2929
public bool Watching { get; private set; }
3030

3131
private readonly CancellationTokenSource _cts;
32-
private readonly StreamReader _streamReader;
33-
34-
public Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError)
32+
private readonly StreamReader _streamReader;
33+
private readonly Task _watcherLoop;
34+
35+
/// <summary>
36+
/// Initializes a new instance of the <see cref="Watcher{T}"/> class.
37+
/// </summary>
38+
/// <param name="streamReader">
39+
/// A <see cref="StreamReader"/> from which to read the events.
40+
/// </param>
41+
/// <param name="onEvent">
42+
/// The action to invoke when the server sends a new event.
43+
/// </param>
44+
/// <param name="onError">
45+
/// The action to invoke when an error occurs.
46+
/// </param>
47+
/// <param name="onClosed">
48+
/// The action to invoke when the server closes the connection.
49+
/// </param>
50+
public Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError, Action onClosed = null)
3551
{
3652
_streamReader = streamReader;
3753
OnEvent += onEvent;
38-
OnError += onError;
39-
40-
_cts = new CancellationTokenSource();
41-
42-
var token = _cts.Token;
43-
44-
Task.Run(async () =>
45-
{
46-
try
47-
{
48-
Watching = true;
49-
50-
while (!streamReader.EndOfStream)
51-
{
52-
if (token.IsCancellationRequested)
53-
{
54-
return;
55-
}
56-
57-
var line = await streamReader.ReadLineAsync();
54+
OnError += onError;
55+
OnClosed += onClosed;
5856

59-
try
60-
{
61-
var genericEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<KubernetesObject>.WatchEvent>(line);
62-
63-
if (genericEvent.Object.Kind == "Status")
64-
{
65-
var statusEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<V1Status>.WatchEvent>(line);
66-
var exception = new KubernetesException(statusEvent.Object);
67-
this.OnError?.Invoke(exception);
68-
}
69-
else
70-
{
71-
var @event = SafeJsonConvert.DeserializeObject<k8s.Watcher<T>.WatchEvent>(line);
72-
this.OnEvent?.Invoke(@event.Type, @event.Object);
73-
}
74-
}
75-
catch (Exception e)
76-
{
77-
// error if deserialized failed or onevent throws
78-
OnError?.Invoke(e);
79-
}
80-
}
81-
}
82-
catch (Exception e)
83-
{
84-
// error when transport error, IOException ect
85-
OnError?.Invoke(e);
86-
}
87-
finally
88-
{
89-
Watching = false;
90-
}
91-
}, token);
57+
_cts = new CancellationTokenSource();
58+
_watcherLoop = this.WatcherLoop(_cts.Token);
9259
}
93-
60+
61+
/// <inheritdoc/>
9462
public void Dispose()
9563
{
9664
_cts.Cancel();
@@ -105,13 +73,71 @@ public void Dispose()
10573
/// <summary>
10674
/// add/remove callbacks when any exception was caught during watching
10775
/// </summary>
108-
public event Action<Exception> OnError;
76+
public event Action<Exception> OnError;
77+
78+
/// <summary>
79+
/// The event which is raised when the server closes th econnection.
80+
/// </summary>
81+
public event Action OnClosed;
10982

11083
public class WatchEvent
11184
{
11285
public WatchEventType Type { get; set; }
11386

11487
public T Object { get; set; }
88+
}
89+
90+
private async Task WatcherLoop(CancellationToken cancellationToken)
91+
{
92+
// Make sure we run async
93+
await Task.Yield();
94+
95+
try
96+
{
97+
Watching = true;
98+
string line;
99+
100+
// ReadLineAsync will return null when we've reached the end of the stream.
101+
while ((line = await this._streamReader.ReadLineAsync().ConfigureAwait(false)) != null)
102+
{
103+
if (cancellationToken.IsCancellationRequested)
104+
{
105+
return;
106+
}
107+
108+
try
109+
{
110+
var genericEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<KubernetesObject>.WatchEvent>(line);
111+
112+
if (genericEvent.Object.Kind == "Status")
113+
{
114+
var statusEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<V1Status>.WatchEvent>(line);
115+
var exception = new KubernetesException(statusEvent.Object);
116+
this.OnError?.Invoke(exception);
117+
}
118+
else
119+
{
120+
var @event = SafeJsonConvert.DeserializeObject<k8s.Watcher<T>.WatchEvent>(line);
121+
this.OnEvent?.Invoke(@event.Type, @event.Object);
122+
}
123+
}
124+
catch (Exception e)
125+
{
126+
// error if deserialized failed or onevent throws
127+
OnError?.Invoke(e);
128+
}
129+
}
130+
}
131+
catch (Exception e)
132+
{
133+
// error when transport error, IOException ect
134+
OnError?.Invoke(e);
135+
}
136+
finally
137+
{
138+
Watching = false;
139+
OnClosed?.Invoke();
140+
}
115141
}
116142
}
117143

@@ -123,18 +149,22 @@ public static class WatcherExt
123149
/// <typeparam name="T">type of the event object</typeparam>
124150
/// <param name="response">the api response</param>
125151
/// <param name="onEvent">a callback when any event raised from api server</param>
126-
/// <param name="onError">a callbak when any exception was caught during watching</param>
152+
/// <param name="onError">a callbak when any exception was caught during watching</param>
153+
/// <param name="onClosed">
154+
/// The action to invoke when the server closes the connection.
155+
/// </param>
127156
/// <returns>a watch object</returns>
128157
public static Watcher<T> Watch<T>(this HttpOperationResponse response,
129158
Action<WatchEventType, T> onEvent,
130-
Action<Exception> onError = null)
159+
Action<Exception> onError = null,
160+
Action onClosed = null)
131161
{
132162
if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content))
133163
{
134164
throw new KubernetesClientException("not a watchable request or failed response");
135165
}
136166

137-
return new Watcher<T>(content.StreamReader, onEvent, onError);
167+
return new Watcher<T>(content.StreamReader, onEvent, onError, onClosed);
138168
}
139169

140170
/// <summary>
@@ -143,13 +173,17 @@ public static Watcher<T> Watch<T>(this HttpOperationResponse response,
143173
/// <typeparam name="T">type of the event object</typeparam>
144174
/// <param name="response">the api response</param>
145175
/// <param name="onEvent">a callback when any event raised from api server</param>
146-
/// <param name="onError">a callbak when any exception was caught during watching</param>
176+
/// <param name="onError">a callbak when any exception was caught during watching</param>
177+
/// <param name="onClosed">
178+
/// The action to invoke when the server closes the connection.
179+
/// </param>
147180
/// <returns>a watch object</returns>
148181
public static Watcher<T> Watch<T>(this HttpOperationResponse<T> response,
149182
Action<WatchEventType, T> onEvent,
150-
Action<Exception> onError = null)
183+
Action<Exception> onError = null,
184+
Action onClosed = null)
151185
{
152-
return Watch((HttpOperationResponse) response, onEvent, onError);
186+
return Watch((HttpOperationResponse)response, onEvent, onError, onClosed);
153187
}
154188
}
155189
}

0 commit comments

Comments
 (0)