Skip to content

Commit 136b103

Browse files
committed
refactor: update Kubernetes watch functionality to use new event handling methods and add async enumerable support
1 parent 6979dd5 commit 136b103

File tree

4 files changed

+421
-56
lines changed

4 files changed

+421
-56
lines changed

examples/watch/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
void WatchUsingCallback(IKubernetes client)
2323
#pragma warning restore CS8321 // Remove unused private members
2424
{
25-
using var podlistResp = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) =>
25+
using (var podlistResp = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) =>
2626
{
2727
Console.WriteLine("==on watch event==");
2828
Console.WriteLine(type);
2929
Console.WriteLine(item.Metadata.Name);
3030
Console.WriteLine("==on watch event==");
31-
});
31+
}))
3232
{
3333
Console.WriteLine("press ctrl + c to stop watching");
3434

src/LibKubernetesGenerator/templates/IOperations.cs.template

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ public partial interface I{{name}}Operations
2525
/// <param name="cancellationToken">
2626
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
2727
/// </param>
28+
{{if IfParamContains api.operation "watch"}}
29+
[Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.", false)]
30+
{{end}}
2831
Task<HttpOperationResponse{{GetReturnType api.operation "<>"}}> {{GetOperationId api.operation "WithHttpMessagesAsync"}}(
2932
{{ for parameter in api.operation.parameters}}
3033
{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}},

tests/E2E.Tests/MinikubeTests.cs

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,13 +224,12 @@ public async Task WatcherIntegrationTest()
224224
var started = new AsyncManualResetEvent();
225225
var connectionClosed = new AsyncManualResetEvent();
226226

227-
var watcher = kubernetes.BatchV1.ListNamespacedJobWithHttpMessagesAsync(
227+
var watcher = kubernetes.BatchV1.WatchListNamespacedJob(
228228
job.Metadata.NamespaceProperty,
229229
fieldSelector: $"metadata.name={job.Metadata.Name}",
230230
resourceVersion: job.Metadata.ResourceVersion,
231231
timeoutSeconds: 30,
232-
watch: true).Watch<V1Job, V1JobList>(
233-
(type, source) =>
232+
onEvent: (type, source) =>
234233
{
235234
Debug.WriteLine($"Watcher 1: {type}, {source}");
236235
events.Add(new Tuple<WatchEventType, V1Job>(type, source));
@@ -250,6 +249,86 @@ public async Task WatcherIntegrationTest()
250249
new V1DeleteOptions() { PropagationPolicy = "Foreground" }).ConfigureAwait(false);
251250
}
252251

252+
[MinikubeFact]
253+
public async Task WatcherIntegrationTestAsyncEnumerable()
254+
{
255+
using var kubernetes = CreateClient();
256+
257+
var job = await kubernetes.BatchV1.CreateNamespacedJobAsync(
258+
new V1Job()
259+
{
260+
ApiVersion = "batch/v1",
261+
Kind = V1Job.KubeKind,
262+
Metadata = new V1ObjectMeta() { Name = nameof(WatcherIntegrationTestAsyncEnumerable).ToLowerInvariant() },
263+
Spec = new V1JobSpec()
264+
{
265+
Template = new V1PodTemplateSpec()
266+
{
267+
Spec = new V1PodSpec()
268+
{
269+
Containers = new List<V1Container>()
270+
{
271+
new V1Container()
272+
{
273+
Image = "ubuntu",
274+
Name = "runner",
275+
Command = new List<string>() { "/bin/bash", "-c", "--" },
276+
Args = new List<string>()
277+
{
278+
"trap : TERM INT; sleep infinity & wait",
279+
},
280+
},
281+
},
282+
RestartPolicy = "Never",
283+
},
284+
},
285+
},
286+
},
287+
"default").ConfigureAwait(false);
288+
289+
var events = new Collection<Tuple<WatchEventType, V1Job>>();
290+
291+
var started = new AsyncManualResetEvent();
292+
var watchCompleted = new AsyncManualResetEvent();
293+
294+
// Start async enumerable watch in background task to mimic callback behavior
295+
var watchTask = Task.Run(async () =>
296+
{
297+
try
298+
{
299+
await foreach (var (type, source) in kubernetes.BatchV1.WatchListNamespacedJobAsync(
300+
job.Metadata.NamespaceProperty,
301+
fieldSelector: $"metadata.name={job.Metadata.Name}",
302+
resourceVersion: job.Metadata.ResourceVersion,
303+
timeoutSeconds: 30).ConfigureAwait(false))
304+
{
305+
Debug.WriteLine($"AsyncEnumerable Watcher: {type}, {source}");
306+
events.Add(new Tuple<WatchEventType, V1Job>(type, source));
307+
job = source;
308+
started.Set();
309+
}
310+
}
311+
catch (Exception ex)
312+
{
313+
Debug.WriteLine($"Watch exception: {ex.GetType().FullName}: {ex.Message}");
314+
}
315+
finally
316+
{
317+
watchCompleted.Set();
318+
}
319+
});
320+
321+
await started.WaitAsync().ConfigureAwait(false);
322+
323+
await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TimeSpan.FromMinutes(3))).ConfigureAwait(false);
324+
Assert.True(watchCompleted.IsSet);
325+
326+
var st = await kubernetes.BatchV1.DeleteNamespacedJobAsync(
327+
job.Metadata.Name,
328+
job.Metadata.NamespaceProperty,
329+
new V1DeleteOptions() { PropagationPolicy = "Foreground" }).ConfigureAwait(false);
330+
}
331+
253332
[MinikubeFact]
254333
public void LeaderIntegrationTest()
255334
{

0 commit comments

Comments
 (0)