Skip to content

Commit 80b46b7

Browse files
committed
feat: enhance Kubernetes client with watch functionality
1 parent a99fbe4 commit 80b46b7

File tree

7 files changed

+220
-40
lines changed

7 files changed

+220
-40
lines changed

examples/clientset/Program.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// See https://aka.ms/new-console-template for more information
1+
// See https://aka.ms/new-console-template for more information
22
using k8s;
33
using k8s.ClientSets;
44
using System.Threading.Tasks;
@@ -12,15 +12,21 @@ private static async Task Main(string[] args)
1212
var config = KubernetesClientConfiguration.BuildConfigFromConfigFile();
1313
var client = new Kubernetes(config);
1414

15-
ClientSet clientSet = new ClientSet(client);
15+
var clientSet = new ClientSet(client);
1616
var list = await clientSet.CoreV1.Pod.ListAsync("default").ConfigureAwait(false);
1717
foreach (var item in list)
1818
{
1919
System.Console.WriteLine(item.Metadata.Name);
2020
}
2121

22-
var pod = await clientSet.CoreV1.Pod.GetAsync("test","default").ConfigureAwait(false);
22+
var pod = await clientSet.CoreV1.Pod.GetAsync("test", "default").ConfigureAwait(false);
2323
System.Console.WriteLine(pod?.Metadata?.Name);
24+
25+
var watch = clientSet.CoreV1.Pod.WatchListAsync("default");
26+
await foreach (var (_, item)in watch.ConfigureAwait(false))
27+
{
28+
System.Console.WriteLine(item.Metadata.Name);
29+
}
2430
}
2531
}
26-
}
32+
}

examples/watch/Program.cs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using k8s;
2-
using k8s.Models;
32
using System;
43
using System.Threading;
54
using System.Threading.Tasks;
@@ -8,9 +7,10 @@
87

98
IKubernetes client = new Kubernetes(config);
109

11-
var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
10+
var podlistResp = client.CoreV1.WatchListNamespacedPodAsync("default");
11+
1212
// C# 8 required https://docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8
13-
await foreach (var (type, item) in podlistResp.WatchAsync<V1Pod, V1PodList>().ConfigureAwait(false))
13+
await foreach (var (type, item) in podlistResp.ConfigureAwait(false))
1414
{
1515
Console.WriteLine("==on watch event==");
1616
Console.WriteLine(type);
@@ -22,19 +22,29 @@
2222
void WatchUsingCallback(IKubernetes client)
2323
#pragma warning restore CS8321 // Remove unused private members
2424
{
25-
var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
26-
using (podlistResp.Watch<V1Pod, V1PodList>((type, item) =>
25+
using var podlistResp = client.CoreV1.WatchListNamespacedPod("default");
26+
podlistResp.OnEvent += (type, item) =>
2727
{
2828
Console.WriteLine("==on watch event==");
2929
Console.WriteLine(type);
3030
Console.WriteLine(item.Metadata.Name);
3131
Console.WriteLine("==on watch event==");
32-
}))
32+
};
33+
podlistResp.OnError += (error) =>
34+
{
35+
Console.WriteLine("==on watch error==");
36+
Console.WriteLine(error.Message);
37+
Console.WriteLine("==on watch error==");
38+
};
39+
podlistResp.OnClosed += () =>
40+
{
41+
Console.WriteLine("==on watch closed==");
42+
};
3343
{
3444
Console.WriteLine("press ctrl + c to stop watching");
3545

3646
var ctrlc = new ManualResetEventSlim(false);
3747
Console.CancelKeyPress += (sender, eventArgs) => ctrlc.Set();
3848
ctrlc.Wait();
3949
}
40-
}
50+
}

src/KubernetesClient.Aot/KubernetesClient.Aot.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@
8989
<Compile Include="..\KubernetesClient\IKubernetes.Exec.cs" />
9090
<Compile Include="..\KubernetesClient\Kubernetes.Exec.cs" />
9191

92-
<!-- <Compile Include="..\KubernetesClient\Watcher.cs" /> -->
93-
<!-- <Compile Include="..\KubernetesClient\WatcherExt.cs" /> -->
92+
<Compile Include="..\KubernetesClient\Watcher.cs" />
93+
<Compile Include="..\KubernetesClient\WatcherExt.cs" />
9494
<Compile Include="..\KubernetesClient\LineSeparatedHttpContent.cs" />
9595

9696
<Compile Include="..\KubernetesClient\Exceptions\KubeConfigException.cs" />

src/LibKubernetesGenerator/ParamHelper.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Scriban.Runtime;
44
using System;
55
using System.Linq;
6+
using System.Collections.Generic;
67

78
namespace LibKubernetesGenerator
89
{
@@ -21,6 +22,8 @@ public void RegisterHelper(ScriptObject scriptObject)
2122
{
2223
scriptObject.Import(nameof(GetModelCtorParam), new Func<JsonSchema, string>(GetModelCtorParam));
2324
scriptObject.Import(nameof(IfParamContains), IfParamContains);
25+
scriptObject.Import(nameof(FilterParameters), FilterParameters);
26+
scriptObject.Import(nameof(GetParameterValueForWatch), new Func<OpenApiParameter, bool, string, string>(GetParameterValueForWatch));
2427
}
2528

2629
public static bool IfParamContains(OpenApiOperation operation, string name)
@@ -39,6 +42,23 @@ public static bool IfParamContains(OpenApiOperation operation, string name)
3942
return found;
4043
}
4144

45+
public static IEnumerable<OpenApiParameter> FilterParameters(OpenApiOperation operation, string excludeParam)
46+
{
47+
return operation.Parameters.Where(p => p.Name != excludeParam);
48+
}
49+
50+
public string GetParameterValueForWatch(OpenApiParameter parameter, bool watch, string init = "false")
51+
{
52+
if (parameter.Name == "watch")
53+
{
54+
return watch ? "true" : "false";
55+
}
56+
else
57+
{
58+
return generalNameHelper.GetDotNetNameOpenApiParameter(parameter, init);
59+
}
60+
}
61+
4262
public string GetModelCtorParam(JsonSchema schema)
4363
{
4464
return string.Join(", ", schema.Properties.Values
@@ -57,4 +77,4 @@ public string GetModelCtorParam(JsonSchema schema)
5777
}));
5878
}
5979
}
60-
}
80+
}

src/LibKubernetesGenerator/TypeHelper.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,14 @@ string toType()
245245
}
246246

247247
break;
248+
case "T":
249+
// Return single item type from list type (e.g., V1Pod from V1PodList)
250+
return !string.IsNullOrEmpty(t) && t.EndsWith("List", StringComparison.Ordinal)
251+
? t.Substring(0, t.Length - 4)
252+
: t;
253+
case "TList":
254+
// Return list type as-is
255+
return t;
248256
}
249257

250258
return t;
@@ -284,4 +292,4 @@ public static bool IfType(JsonSchemaProperty property, string type)
284292
return false;
285293
}
286294
}
287-
}
295+
}

src/LibKubernetesGenerator/templates/Client.cs.template

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ public partial class {{name}}Client : ResourceClient
1717
}
1818

1919
{{for api in apis }}
20+
{{~ $filteredParams = FilterParameters api.operation "watch" ~}}
2021
/// <summary>
2122
/// {{ToXmlDoc api.operation.description}}
2223
/// </summary>
23-
{{ for parameter in api.operation.parameters}}
24+
{{ for parameter in $filteredParams}}
2425
/// <param name="{{GetDotNetNameOpenApiParameter parameter "false"}}">
2526
/// {{ToXmlDoc parameter.description}}
2627
/// </param>
@@ -29,15 +30,15 @@ public partial class {{name}}Client : ResourceClient
2930
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
3031
/// </param>
3132
public async Task{{GetReturnType api.operation "<>"}} {{GetActionName api.operation name "Async"}}(
32-
{{ for parameter in api.operation.parameters}}
33+
{{ for parameter in $filteredParams}}
3334
{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}},
3435
{{ end }}
3536
CancellationToken cancellationToken = default(CancellationToken))
3637
{
3738
{{if IfReturnType api.operation "stream"}}
3839
var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
3940
{{ for parameter in api.operation.parameters}}
40-
{{GetDotNetNameOpenApiParameter parameter "false"}},
41+
{{GetParameterValueForWatch parameter false}},
4142
{{end}}
4243
null,
4344
cancellationToken);
@@ -47,7 +48,7 @@ public partial class {{name}}Client : ResourceClient
4748
{{if IfReturnType api.operation "obj"}}
4849
using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
4950
{{ for parameter in api.operation.parameters}}
50-
{{GetDotNetNameOpenApiParameter parameter "false"}},
51+
{{GetParameterValueForWatch parameter false}},
5152
{{end}}
5253
null,
5354
cancellationToken).ConfigureAwait(false))
@@ -58,7 +59,7 @@ public partial class {{name}}Client : ResourceClient
5859
{{if IfReturnType api.operation "void"}}
5960
using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
6061
{{ for parameter in api.operation.parameters}}
61-
{{GetDotNetNameOpenApiParameter parameter "false"}},
62+
{{GetParameterValueForWatch parameter false}},
6263
{{end}}
6364
null,
6465
cancellationToken).ConfigureAwait(false))
@@ -71,7 +72,7 @@ public partial class {{name}}Client : ResourceClient
7172
/// <summary>
7273
/// {{ToXmlDoc api.operation.description}}
7374
/// </summary>
74-
{{ for parameter in api.operation.parameters}}
75+
{{ for parameter in $filteredParams}}
7576
/// <param name="{{GetDotNetNameOpenApiParameter parameter "false"}}">
7677
/// {{ToXmlDoc parameter.description}}
7778
/// </param>
@@ -80,14 +81,14 @@ public partial class {{name}}Client : ResourceClient
8081
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
8182
/// </param>
8283
public async Task<T> {{GetActionName api.operation name "Async"}}<T>(
83-
{{ for parameter in api.operation.parameters}}
84+
{{ for parameter in $filteredParams}}
8485
{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "false"}},
8586
{{ end }}
8687
CancellationToken cancellationToken = default(CancellationToken))
8788
{
8889
using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}<T>(
8990
{{ for parameter in api.operation.parameters}}
90-
{{GetDotNetNameOpenApiParameter parameter "false"}},
91+
{{GetParameterValueForWatch parameter false}},
9192
{{end}}
9293
null,
9394
cancellationToken).ConfigureAwait(false))
@@ -96,5 +97,68 @@ public partial class {{name}}Client : ResourceClient
9697
}
9798
}
9899
{{end}}
100+
101+
{{if IfParamContains api.operation "watch"}}
102+
/// <summary>
103+
/// Watch {{ToXmlDoc api.operation.description}}
104+
/// </summary>
105+
{{ for parameter in $filteredParams}}
106+
/// <param name="{{GetDotNetNameOpenApiParameter parameter "false"}}">
107+
/// {{ToXmlDoc parameter.description}}
108+
/// </param>
109+
{{ end }}
110+
/// <param name="onEvent">Callback when any event raised from api server</param>
111+
/// <param name="onError">Callback when any exception was caught during watching</param>
112+
/// <param name="onClosed">Callback when the server closes the connection</param>
113+
public Watcher<{{GetReturnType api.operation "T"}}> Watch{{GetActionName api.operation name ""}}(
114+
{{ for parameter in $filteredParams}}
115+
{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}},
116+
{{ end }}
117+
Action<WatchEventType, {{GetReturnType api.operation "T"}}> onEvent = null,
118+
Action<Exception> onError = null,
119+
Action onClosed = null)
120+
{
121+
if (onEvent == null) throw new ArgumentNullException(nameof(onEvent));
122+
123+
var responseTask = Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
124+
{{ for parameter in api.operation.parameters}}
125+
{{GetParameterValueForWatch parameter true}},
126+
{{ end }}
127+
null,
128+
CancellationToken.None);
129+
130+
return responseTask.Watch<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>(
131+
onEvent, onError, onClosed);
132+
}
133+
134+
/// <summary>
135+
/// Watch {{ToXmlDoc api.operation.description}} as async enumerable
136+
/// </summary>
137+
{{ for parameter in $filteredParams}}
138+
/// <param name="{{GetDotNetNameOpenApiParameter parameter "false"}}">
139+
/// {{ToXmlDoc parameter.description}}
140+
/// </param>
141+
{{ end }}
142+
/// <param name="onError">Callback when any exception was caught during watching</param>
143+
/// <param name="cancellationToken">Cancellation token</param>
144+
public IAsyncEnumerable<(WatchEventType, {{GetReturnType api.operation "T"}})> Watch{{GetActionName api.operation name "Async"}}(
145+
{{ for parameter in $filteredParams}}
146+
{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}},
147+
{{ end }}
148+
Action<Exception> onError = null,
149+
CancellationToken cancellationToken = default)
150+
{
151+
var responseTask = Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
152+
{{ for parameter in api.operation.parameters}}
153+
{{GetParameterValueForWatch parameter true}},
154+
{{ end }}
155+
null,
156+
cancellationToken);
157+
158+
return responseTask.WatchAsync<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>(
159+
onError, cancellationToken);
160+
}
99161
{{end}}
100-
}
162+
163+
{{end}}
164+
}

0 commit comments

Comments
 (0)