diff --git a/examples/clientset/Program.cs b/examples/clientset/Program.cs index d49e7f47..a1b74e0f 100644 --- a/examples/clientset/Program.cs +++ b/examples/clientset/Program.cs @@ -1,7 +1,6 @@ -// See https://aka.ms/new-console-template for more information -using k8s; -using k8s.ClientSets; +using k8s; using k8s.Models; +using k8s.ClientSets; using System.Threading.Tasks; namespace clientset @@ -13,7 +12,7 @@ private static async Task Main(string[] args) var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(); var client = new Kubernetes(config); - ClientSet clientSet = new ClientSet(client); + var clientSet = new ClientSet(client); var list = await clientSet.CoreV1.Pod.ListAsync("default").ConfigureAwait(false); foreach (var item in list) { @@ -22,6 +21,12 @@ private static async Task Main(string[] args) var pod = await clientSet.CoreV1.Pod.GetAsync("test", "default").ConfigureAwait(false); System.Console.WriteLine(pod?.Metadata?.Name); + + var watch = clientSet.CoreV1.Pod.WatchListAsync("default"); + await foreach (var (_, item) in watch.ConfigureAwait(false)) + { + System.Console.WriteLine(item.Metadata.Name); + } } } -} +} \ No newline at end of file diff --git a/examples/watch/Program.cs b/examples/watch/Program.cs index f21f8f88..1aff6588 100644 --- a/examples/watch/Program.cs +++ b/examples/watch/Program.cs @@ -1,5 +1,4 @@ using k8s; -using k8s.Models; using System; using System.Threading; using System.Threading.Tasks; @@ -8,9 +7,10 @@ IKubernetes client = new Kubernetes(config); -var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); +var podlistResp = client.CoreV1.WatchListNamespacedPodAsync("default"); + // C# 8 required https://docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8 -await foreach (var (type, item) in podlistResp.WatchAsync().ConfigureAwait(false)) +await foreach (var (type, item) in podlistResp.ConfigureAwait(false)) { Console.WriteLine("==on watch event=="); Console.WriteLine(type); @@ -22,8 +22,7 @@ void WatchUsingCallback(IKubernetes client) #pragma warning restore CS8321 // Remove unused private members { - var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); - using (podlistResp.Watch((type, item) => + using (var podlistResp = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { Console.WriteLine("==on watch event=="); Console.WriteLine(type); @@ -37,4 +36,4 @@ void WatchUsingCallback(IKubernetes client) Console.CancelKeyPress += (sender, eventArgs) => ctrlc.Set(); ctrlc.Wait(); } -} +} \ No newline at end of file diff --git a/src/KubernetesClient/WatcherExt.cs b/src/KubernetesClient/WatcherExt.cs index f3047f8c..f82301cf 100644 --- a/src/KubernetesClient/WatcherExt.cs +++ b/src/KubernetesClient/WatcherExt.cs @@ -2,7 +2,7 @@ namespace k8s { - public static class WatcherExt + internal static class WatcherExt { /// /// create a watch object from a call to api server with watch=true diff --git a/src/LibKubernetesGenerator/ParamHelper.cs b/src/LibKubernetesGenerator/ParamHelper.cs index 216c2f35..fcaf030d 100644 --- a/src/LibKubernetesGenerator/ParamHelper.cs +++ b/src/LibKubernetesGenerator/ParamHelper.cs @@ -3,6 +3,7 @@ using Scriban.Runtime; using System; using System.Linq; +using System.Collections.Generic; namespace LibKubernetesGenerator { @@ -21,6 +22,8 @@ public void RegisterHelper(ScriptObject scriptObject) { scriptObject.Import(nameof(GetModelCtorParam), new Func(GetModelCtorParam)); scriptObject.Import(nameof(IfParamContains), IfParamContains); + scriptObject.Import(nameof(FilterParameters), FilterParameters); + scriptObject.Import(nameof(GetParameterValueForWatch), new Func(GetParameterValueForWatch)); } public static bool IfParamContains(OpenApiOperation operation, string name) @@ -39,6 +42,23 @@ public static bool IfParamContains(OpenApiOperation operation, string name) return found; } + public static IEnumerable FilterParameters(OpenApiOperation operation, string excludeParam) + { + return operation.Parameters.Where(p => p.Name != excludeParam); + } + + public string GetParameterValueForWatch(OpenApiParameter parameter, bool watch, string init = "false") + { + if (parameter.Name == "watch") + { + return watch ? "true" : "false"; + } + else + { + return generalNameHelper.GetDotNetNameOpenApiParameter(parameter, init); + } + } + public string GetModelCtorParam(JsonSchema schema) { return string.Join(", ", schema.Properties.Values @@ -57,4 +77,4 @@ public string GetModelCtorParam(JsonSchema schema) })); } } -} +} \ No newline at end of file diff --git a/src/LibKubernetesGenerator/TypeHelper.cs b/src/LibKubernetesGenerator/TypeHelper.cs index 057a363a..db27dab9 100644 --- a/src/LibKubernetesGenerator/TypeHelper.cs +++ b/src/LibKubernetesGenerator/TypeHelper.cs @@ -122,7 +122,6 @@ private string GetDotNetType(JsonSchema schema, JsonSchemaProperty parent) return $"IDictionary"; } - if (schema?.Reference != null) { return classNameHelper.GetClassNameForSchemaDefinition(schema.Reference); @@ -245,6 +244,16 @@ string toType() } break; + case "T": + var itemType = TryGetItemTypeFromSchema(response); + if (itemType != null) + { + return itemType; + } + + break; + case "TList": + return t; } return t; @@ -283,5 +292,26 @@ public static bool IfType(JsonSchemaProperty property, string type) return false; } + + private string TryGetItemTypeFromSchema(OpenApiResponse response) + { + var listSchema = response?.Schema?.Reference; + if (listSchema?.Properties?.TryGetValue("items", out var itemsProperty) != true) + { + return null; + } + + if (itemsProperty.Reference != null) + { + return classNameHelper.GetClassNameForSchemaDefinition(itemsProperty.Reference); + } + + if (itemsProperty.Item?.Reference != null) + { + return classNameHelper.GetClassNameForSchemaDefinition(itemsProperty.Item.Reference); + } + + return null; + } } -} +} \ No newline at end of file diff --git a/src/LibKubernetesGenerator/templates/Client.cs.template b/src/LibKubernetesGenerator/templates/Client.cs.template index 02c55d7b..f9ce845e 100644 --- a/src/LibKubernetesGenerator/templates/Client.cs.template +++ b/src/LibKubernetesGenerator/templates/Client.cs.template @@ -17,10 +17,11 @@ public partial class {{name}}Client : ResourceClient } {{for api in apis }} + {{~ $filteredParams = FilterParameters api.operation "watch" ~}} /// /// {{ToXmlDoc api.operation.description}} /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// @@ -29,7 +30,7 @@ public partial class {{name}}Client : ResourceClient /// A which can be used to cancel the asynchronous operation. /// public async Task{{GetReturnType api.operation "<>"}} {{GetActionName api.operation name "Async"}}( - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, {{ end }} CancellationToken cancellationToken = default(CancellationToken)) @@ -37,7 +38,7 @@ public partial class {{name}}Client : ResourceClient {{if IfReturnType api.operation "stream"}} var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken); @@ -47,7 +48,7 @@ public partial class {{name}}Client : ResourceClient {{if IfReturnType api.operation "obj"}} using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -58,7 +59,7 @@ public partial class {{name}}Client : ResourceClient {{if IfReturnType api.operation "void"}} using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -71,7 +72,7 @@ public partial class {{name}}Client : ResourceClient /// /// {{ToXmlDoc api.operation.description}} /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// @@ -80,14 +81,14 @@ public partial class {{name}}Client : ResourceClient /// A which can be used to cancel the asynchronous operation. /// public async Task {{GetActionName api.operation name "Async"}}( - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "false"}}, {{ end }} CancellationToken cancellationToken = default(CancellationToken)) { using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -96,5 +97,69 @@ public partial class {{name}}Client : ResourceClient } } {{end}} + +#if !K8S_AOT + {{if IfParamContains api.operation "watch"}} + /// + /// Watch {{ToXmlDoc api.operation.description}} + /// + {{ for parameter in $filteredParams}} + /// + /// {{ToXmlDoc parameter.description}} + /// + {{ end }} + /// Callback when any event raised from api server + /// Callback when any exception was caught during watching + /// Callback when the server closes the connection + public Watcher<{{GetReturnType api.operation "T"}}> Watch{{GetActionName api.operation name ""}}( + {{ for parameter in $filteredParams}} + {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, + {{ end }} + Action onEvent = null, + Action onError = null, + Action onClosed = null) + { + if (onEvent == null) throw new ArgumentNullException(nameof(onEvent)); + + var responseTask = Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( + {{ for parameter in api.operation.parameters}} + {{GetParameterValueForWatch parameter true}}, + {{ end }} + null, + CancellationToken.None); + + return responseTask.Watch<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>( + onEvent, onError, onClosed); + } + + /// + /// Watch {{ToXmlDoc api.operation.description}} as async enumerable + /// + {{ for parameter in $filteredParams}} + /// + /// {{ToXmlDoc parameter.description}} + /// + {{ end }} + /// Callback when any exception was caught during watching + /// Cancellation token + public IAsyncEnumerable<(WatchEventType, {{GetReturnType api.operation "T"}})> Watch{{GetActionName api.operation name "Async"}}( + {{ for parameter in $filteredParams}} + {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, + {{ end }} + Action onError = null, + CancellationToken cancellationToken = default) + { + var responseTask = Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( + {{ for parameter in api.operation.parameters}} + {{GetParameterValueForWatch parameter true}}, + {{ end }} + null, + cancellationToken); + + return responseTask.WatchAsync<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>( + onError, cancellationToken); + } {{end}} -} +#endif + {{end}} +} \ No newline at end of file diff --git a/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template b/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template index b05f0e24..7544d235 100644 --- a/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template +++ b/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template @@ -12,26 +12,27 @@ namespace k8s; public static partial class {{name}}OperationsExtensions { {{for api in apis }} + {{~ $filteredParams = FilterParameters api.operation "watch" ~}} /// /// {{ToXmlDoc api.operation.description}} /// /// /// The operations group for this extension method. /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// - /// {{ToXmlDoc api.description}} + /// {{ToXmlDoc parameter.description}} /// {{ end }} public static {{GetReturnType api.operation "void"}} {{GetOperationId api.operation ""}}( this I{{name}}Operations operations -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} ,{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}} {{end}} ) { {{GetReturnType api.operation "return"}} operations.{{GetOperationId api.operation "Async"}}( -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} {{GetDotNetNameOpenApiParameter parameter "false"}}, {{end}} CancellationToken.None @@ -45,20 +46,20 @@ public static partial class {{name}}OperationsExtensions /// /// The operations group for this extension method. /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// {{end}} public static T {{GetOperationId api.operation ""}}( this I{{name}}Operations operations -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} ,{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}} {{end}} ) { return operations.{{GetOperationId api.operation "Async"}}( -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} {{GetDotNetNameOpenApiParameter parameter "false"}}, {{end}} CancellationToken.None @@ -72,7 +73,7 @@ public static partial class {{name}}OperationsExtensions /// /// The operations group for this extension method. /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// @@ -82,7 +83,7 @@ public static partial class {{name}}OperationsExtensions /// public static async Task{{GetReturnType api.operation "<>"}} {{GetOperationId api.operation "Async"}}( this I{{name}}Operations operations, -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, {{ end }} CancellationToken cancellationToken = default(CancellationToken)) @@ -90,7 +91,7 @@ public static partial class {{name}}OperationsExtensions {{if IfReturnType api.operation "stream"}} var _result = await operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken); @@ -100,7 +101,7 @@ public static partial class {{name}}OperationsExtensions {{if IfReturnType api.operation "obj"}} using (var _result = await operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -111,7 +112,7 @@ public static partial class {{name}}OperationsExtensions {{if IfReturnType api.operation "void"}} using (var _result = await operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -127,7 +128,7 @@ public static partial class {{name}}OperationsExtensions /// /// The operations group for this extension method. /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// @@ -137,14 +138,14 @@ public static partial class {{name}}OperationsExtensions /// public static async Task {{GetOperationId api.operation "Async"}}( this I{{name}}Operations operations, -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, {{ end }} CancellationToken cancellationToken = default(CancellationToken)) { using (var _result = await operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -154,5 +155,77 @@ public static partial class {{name}}OperationsExtensions } {{end}} - {{end}} +#if !K8S_AOT +{{if IfParamContains api.operation "watch"}} +{{~ $filteredParams = FilterParameters api.operation "watch" ~}} +/// +/// Watch {{ToXmlDoc api.operation.description}} +/// +/// +/// The operations group for this extension method. +/// +{{ for parameter in $filteredParams}} +/// +/// {{ToXmlDoc parameter.description}} +/// +{{ end }} +/// Callback when any event raised from api server +/// Callback when any exception was caught during watching +/// Callback when the server closes the connection +public static Watcher<{{GetReturnType api.operation "T"}}> Watch{{GetOperationId api.operation ""}}( + this I{{name}}Operations operations, +{{ for parameter in $filteredParams}} + {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, +{{end}} + Action onEvent = null, + Action onError = null, + Action onClosed = null) +{ + if (onEvent == null) throw new ArgumentNullException(nameof(onEvent)); + + var responseTask = operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( +{{ for parameter in api.operation.parameters}} + {{GetParameterValueForWatch parameter true}}, +{{end}} + null, + CancellationToken.None); + + return responseTask.Watch<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>( + onEvent, onError, onClosed); +} + +/// +/// Watch {{ToXmlDoc api.operation.description}} as async enumerable +/// +/// +/// The operations group for this extension method. +/// +{{ for parameter in $filteredParams}} +/// +/// {{ToXmlDoc parameter.description}} +/// +{{ end }} +/// Callback when any exception was caught during watching +/// Cancellation token +public static IAsyncEnumerable<(WatchEventType, {{GetReturnType api.operation "T"}})> Watch{{GetOperationId api.operation "Async"}}( + this I{{name}}Operations operations, +{{ for parameter in $filteredParams}} + {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, +{{end}} + Action onError = null, + CancellationToken cancellationToken = default) +{ + var responseTask = operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( +{{ for parameter in api.operation.parameters}} + {{GetParameterValueForWatch parameter true}}, +{{end}} + null, + cancellationToken); + + return responseTask.WatchAsync<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>( + onError, cancellationToken); } +{{end}} +#endif + {{end}} +} \ No newline at end of file diff --git a/tests/E2E.Tests/MinikubeTests.cs b/tests/E2E.Tests/MinikubeTests.cs index a04141c7..3ca37888 100644 --- a/tests/E2E.Tests/MinikubeTests.cs +++ b/tests/E2E.Tests/MinikubeTests.cs @@ -224,13 +224,12 @@ public async Task WatcherIntegrationTest() var started = new AsyncManualResetEvent(); var connectionClosed = new AsyncManualResetEvent(); - var watcher = kubernetes.BatchV1.ListNamespacedJobWithHttpMessagesAsync( + var watcher = kubernetes.BatchV1.WatchListNamespacedJob( job.Metadata.NamespaceProperty, fieldSelector: $"metadata.name={job.Metadata.Name}", resourceVersion: job.Metadata.ResourceVersion, timeoutSeconds: 30, - watch: true).Watch( - (type, source) => + onEvent: (type, source) => { Debug.WriteLine($"Watcher 1: {type}, {source}"); events.Add(new Tuple(type, source)); @@ -250,6 +249,86 @@ public async Task WatcherIntegrationTest() new V1DeleteOptions() { PropagationPolicy = "Foreground" }).ConfigureAwait(false); } + [MinikubeFact] + public async Task WatcherIntegrationTestAsyncEnumerable() + { + using var kubernetes = CreateClient(); + + var job = await kubernetes.BatchV1.CreateNamespacedJobAsync( + new V1Job() + { + ApiVersion = "batch/v1", + Kind = V1Job.KubeKind, + Metadata = new V1ObjectMeta() { Name = nameof(WatcherIntegrationTestAsyncEnumerable).ToLowerInvariant() }, + Spec = new V1JobSpec() + { + Template = new V1PodTemplateSpec() + { + Spec = new V1PodSpec() + { + Containers = new List() + { + new V1Container() + { + Image = "ubuntu", + Name = "runner", + Command = new List() { "/bin/bash", "-c", "--" }, + Args = new List() + { + "trap : TERM INT; sleep infinity & wait", + }, + }, + }, + RestartPolicy = "Never", + }, + }, + }, + }, + "default").ConfigureAwait(false); + + var events = new Collection>(); + + var started = new AsyncManualResetEvent(); + var watchCompleted = new AsyncManualResetEvent(); + + // Start async enumerable watch in background task to mimic callback behavior + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, source) in kubernetes.BatchV1.WatchListNamespacedJobAsync( + job.Metadata.NamespaceProperty, + fieldSelector: $"metadata.name={job.Metadata.Name}", + resourceVersion: job.Metadata.ResourceVersion, + timeoutSeconds: 30).ConfigureAwait(false)) + { + Debug.WriteLine($"AsyncEnumerable Watcher: {type}, {source}"); + events.Add(new Tuple(type, source)); + job = source; + started.Set(); + } + } + catch (Exception ex) + { + Debug.WriteLine($"Watch exception: {ex.GetType().FullName}: {ex.Message}"); + } + finally + { + watchCompleted.Set(); + } + }); + + await started.WaitAsync().ConfigureAwait(false); + + await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TimeSpan.FromMinutes(3))).ConfigureAwait(false); + Assert.True(watchCompleted.IsSet); + + var st = await kubernetes.BatchV1.DeleteNamespacedJobAsync( + job.Metadata.Name, + job.Metadata.NamespaceProperty, + new V1DeleteOptions() { PropagationPolicy = "Foreground" }).ConfigureAwait(false); + } + [MinikubeFact] public void LeaderIntegrationTest() { diff --git a/tests/KubernetesClient.Tests/WatchTests.cs b/tests/KubernetesClient.Tests/WatchTests.cs index 3b3a695d..b3f15110 100644 --- a/tests/KubernetesClient.Tests/WatchTests.cs +++ b/tests/KubernetesClient.Tests/WatchTests.cs @@ -54,24 +54,29 @@ public async Task CannotWatch() var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); // did not pass watch param - var listTask = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); var onErrorCalled = false; - using (listTask.Watch((type, item) => { }, e => { onErrorCalled = true; })) + using (var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { }, + onError: e => { onErrorCalled = true; })) { await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(true); // delay for onerror to be called } Assert.True(onErrorCalled); - // server did not response line by line await Assert.ThrowsAnyAsync(() => { - return client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); + using (var testWatcher = client.CoreV1.WatchListNamespacedPod( + "default")) + { + return Task.CompletedTask; + } // this line did not throw - // listTask.Watch((type, item) => { }); + // using (var testWatcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { })) }).ConfigureAwait(true); } } @@ -93,8 +98,7 @@ public async Task AsyncWatcher() var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); - using (listTask.Watch((type, item) => { eventsReceived.Set(); })) + using (var watcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { eventsReceived.Set(); })) { // here watcher is ready to use, but http server has not responsed yet. created.Set(); @@ -134,27 +138,26 @@ public async Task SurviveBadLine() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); var errors = 0; - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, - error => + onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, - connectionClosed.Set); + onClosed: connectionClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); @@ -195,17 +198,16 @@ public async Task DisposeWatch() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { events.Add(type); eventsReceived.Signal(); }, - error => + onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); }, @@ -255,27 +257,26 @@ public async Task WatchAllEvents() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); var errors = 0; - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, - error => + onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, - waitForClosed.Set); + onClosed: waitForClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); @@ -324,27 +325,26 @@ public async Task WatchEventsWithTimeout() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); var errors = 0; - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, - error => + onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, - connectionClosed.Set); + onClosed: connectionClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); @@ -386,18 +386,17 @@ public async Task WatchServerDisconnect() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - waitForException.Set(); Watcher watcher; - watcher = listTask.Watch( - (type, item) => { }, - e => + watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { }, + onError: e => { exceptionCatched = e; exceptionReceived.Set(); }, - waitForClosed.Set); + onClosed: waitForClosed.Set); // wait server down await Task.WhenAny(exceptionReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); @@ -456,12 +455,11 @@ public async Task TestWatchWithHandlers() Assert.False(handler1.Called); Assert.False(handler2.Called); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { events.Add(type); eventsReceived.Signal(); @@ -507,7 +505,9 @@ public async Task DirectWatchAllEvents() var events = new HashSet(); var errors = 0; - var watcher = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", fieldSelector: $"metadata.name=${"myPod"}", watch: true).Watch( + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + fieldSelector: $"metadata.name=${"myPod"}", onEvent: (type, item) => { @@ -568,7 +568,7 @@ await Assert.ThrowsAsync(async () => Host = server.Uri.ToString(), HttpClientTimeout = TimeSpan.FromSeconds(5), }); - await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default").ConfigureAwait(true); + await client.CoreV1.ListNamespacedPodAsync("default").ConfigureAwait(true); }).ConfigureAwait(true); // cts @@ -580,7 +580,7 @@ await Assert.ThrowsAsync(async () => { Host = server.Uri.ToString(), }); - await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", cancellationToken: cts.Token).ConfigureAwait(true); + await client.CoreV1.ListNamespacedPodAsync("default", cancellationToken: cts.Token).ConfigureAwait(true); }).ConfigureAwait(true); } @@ -608,7 +608,9 @@ public async Task DirectWatchEventsWithTimeout() var events = new HashSet(); var errors = 0; - var watcher = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", fieldSelector: $"metadata.name=${"myPod"}", watch: true).Watch( + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + fieldSelector: $"metadata.name=${"myPod"}", onEvent: (type, item) => { @@ -656,7 +658,6 @@ public async Task WatchShouldCancelAfterRequested() httpContext.Response.StatusCode = 200; await httpContext.Response.Body.FlushAsync().ConfigureAwait(true); await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(true); // The default timeout is 100 seconds - return true; }, resp: "")) { @@ -667,8 +668,10 @@ public async Task WatchShouldCancelAfterRequested() await Assert.ThrowsAnyAsync(async () => { - await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true, - cancellationToken: cts.Token).ConfigureAwait(true); + using var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { }); + await Task.Delay(TimeSpan.FromSeconds(5), cts.Token).ConfigureAwait(true); }).ConfigureAwait(true); } } @@ -742,8 +745,288 @@ public async Task MustHttp2VersionSet() new KubernetesClientConfiguration { Host = server.Uri.ToString() }, handler); Assert.Null(handler.Version); - await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); + using var watcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { }); Assert.Equal(HttpVersion.Version20, handler.Version); + await Task.CompletedTask.ConfigureAwait(true); + } + + [Fact] + public async Task AsyncEnumerableWatchAllEvents() + { + var eventsReceived = new AsyncCountdownEvent(4); + var serverShutdown = new AsyncManualResetEvent(); + var watchCompleted = new AsyncManualResetEvent(); + + using (var server = new MockKubeApiServer(testOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true); + + // make server alive, cannot set to int.max as of it would block response + await serverShutdown.WaitAsync().ConfigureAwait(true); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); + + var events = new HashSet(); + var errors = 0; + + // Start async enumerable watch in background task + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync("default").ConfigureAwait(false)) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event."); + events.Add(type); + eventsReceived.Signal(); + + // Break when we have all expected events + if (events.Count >= 4) + { + break; + } + } + } + catch (Exception ex) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received exception: {ex.GetType().FullName}"); + errors++; + eventsReceived.Signal(); + } + finally + { + watchCompleted.Set(); + } + }); + + // wait server yields all events + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + + Assert.True( + eventsReceived.CurrentCount == 0, + "Timed out waiting for all events / errors to be received."); + + Assert.Contains(WatchEventType.Added, events); + Assert.Contains(WatchEventType.Deleted, events); + Assert.Contains(WatchEventType.Modified, events); + Assert.Contains(WatchEventType.Error, events); + + Assert.Equal(0, errors); + + serverShutdown.Set(); + + await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + Assert.True(watchCompleted.IsSet); + } + } + + [Fact] + public async Task AsyncEnumerableWatchWithCancellation() + { + var eventsReceived = new AsyncCountdownEvent(2); + var serverShutdown = new AsyncManualResetEvent(); + + using (var server = new MockKubeApiServer(testOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); + + // Keep server alive + await serverShutdown.WaitAsync().ConfigureAwait(true); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); + + var events = new HashSet(); + var cts = new CancellationTokenSource(); + + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync("default", cancellationToken: cts.Token).ConfigureAwait(false)) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event."); + events.Add(type); + eventsReceived.Signal(); + } + } + catch (OperationCanceledException) + { + testOutput.WriteLine("AsyncEnumerable Watcher was cancelled as expected."); + } + }); + + // Wait for some events to be received + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + + Assert.True( + eventsReceived.CurrentCount == 0, + "Timed out waiting for events to be received."); + + Assert.Contains(WatchEventType.Added, events); + Assert.Contains(WatchEventType.Modified, events); + + // Cancel the watch + cts.Cancel(); + + // Wait for watch task to complete + await Task.WhenAny(watchTask, Task.Delay(TimeSpan.FromSeconds(5))).ConfigureAwait(true); + Assert.True(watchTask.IsCompletedSuccessfully || watchTask.IsCanceled); + + serverShutdown.Set(); + } + } + + [Fact] + public async Task AsyncEnumerableWatchWithFieldSelector() + { + var eventsReceived = new AsyncCountdownEvent(3); + var serverShutdown = new AsyncManualResetEvent(); + var watchCompleted = new AsyncManualResetEvent(); + + using (var server = new MockKubeApiServer(testOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); + + await serverShutdown.WaitAsync().ConfigureAwait(true); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); + + var events = new List<(WatchEventType, V1Pod)>(); + + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync( + "default", + fieldSelector: $"metadata.name={"testPod"}").ConfigureAwait(false)) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event for pod '{item?.Metadata?.Name}'."); + events.Add((type, item)); + eventsReceived.Signal(); + + if (events.Count >= 3) + { + break; + } + } + } + catch (Exception ex) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received exception: {ex.GetType().FullName}"); + } + finally + { + watchCompleted.Set(); + } + }); + + // Wait for events + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + + Assert.True( + eventsReceived.CurrentCount == 0, + "Timed out waiting for all events to be received."); + + Assert.Equal(3, events.Count); + Assert.Contains(events, e => e.Item1 == WatchEventType.Added); + Assert.Contains(events, e => e.Item1 == WatchEventType.Deleted); + Assert.Contains(events, e => e.Item1 == WatchEventType.Modified); + + serverShutdown.Set(); + + await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + Assert.True(watchCompleted.IsSet); + } + } + + [Fact] + public async Task AsyncEnumerableWatchErrorHandling() + { + var eventsReceived = new AsyncCountdownEvent(3); + var serverShutdown = new AsyncManualResetEvent(); + var watchCompleted = new AsyncManualResetEvent(); + var errorReceived = new AsyncManualResetEvent(); + + using (var server = new MockKubeApiServer(testOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); + + await serverShutdown.WaitAsync().ConfigureAwait(true); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); + + var events = new List<(WatchEventType, V1Pod)>(); + var errorCaught = false; + + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync( + "default", + onError: ex => + { + testOutput.WriteLine($"AsyncEnumerable Watcher onError called: {ex.GetType().FullName}"); + errorCaught = true; + errorReceived.Set(); + eventsReceived.Signal(); + }).ConfigureAwait(false)) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event."); + events.Add((type, item)); + eventsReceived.Signal(); + + // Expect some valid events plus error handling + if (events.Count >= 2) + { + break; + } + } + } + catch (Exception ex) + { + testOutput.WriteLine($"AsyncEnumerable Watcher caught exception: {ex.GetType().FullName}"); + } + finally + { + watchCompleted.Set(); + } + }); + + // Wait for events and errors + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + + Assert.True( + eventsReceived.CurrentCount == 0, + "Timed out waiting for events and errors to be received."); + + // Should have received at least one valid event and one error + Assert.True(events.Count >= 1, "Should have received at least one valid event"); + Assert.True(errorCaught, "Should have caught parsing error"); + Assert.True(errorReceived.IsSet, "Error callback should have been called"); + + serverShutdown.Set(); + + await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + Assert.True(watchCompleted.IsSet); + } } } }