Skip to content

Commit d38302d

Browse files
authored
add generic watch (#1155)
* add generic watch * Update WatcherExt.cs
1 parent 2a13d46 commit d38302d

File tree

2 files changed

+33
-2
lines changed

2 files changed

+33
-2
lines changed

src/KubernetesClient/GenericClient.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,34 @@ public async Task<T> ReplaceNamespacedAsync<T>(T obj, string ns, string name, Ca
115115
return KubernetesJson.Deserialize<T>(resp.Body.ToString());
116116
}
117117

118+
public IAsyncEnumerable<(WatchEventType, T)> WatchAsync<T>(Action<Exception> onError = null, CancellationToken cancel = default)
119+
where T : IKubernetesObject
120+
{
121+
var respTask = kubernetes.CustomObjects.ListClusterCustomObjectWithHttpMessagesAsync(group, version, plural, watch: true, cancellationToken: cancel);
122+
return respTask.WatchAsync<T, object>();
123+
}
124+
125+
public IAsyncEnumerable<(WatchEventType, T)> WatchNamespacedAsync<T>(string ns, Action<Exception> onError = null, CancellationToken cancel = default)
126+
where T : IKubernetesObject
127+
{
128+
var respTask = kubernetes.CustomObjects.ListNamespacedCustomObjectWithHttpMessagesAsync(group, version, ns, plural, watch: true, cancellationToken: cancel);
129+
return respTask.WatchAsync<T, object>();
130+
}
131+
132+
public Watcher<T> Watch<T>(Action<WatchEventType, T> onEvent, Action<Exception> onError = null, Action onClosed = null)
133+
where T : IKubernetesObject
134+
{
135+
var respTask = kubernetes.CustomObjects.ListClusterCustomObjectWithHttpMessagesAsync(group, version, plural, watch: true);
136+
return respTask.Watch(onEvent, onError, onClosed);
137+
}
138+
139+
public Watcher<T> WatchNamespaced<T>(string ns, Action<WatchEventType, T> onEvent, Action<Exception> onError = null, Action onClosed = null)
140+
where T : IKubernetesObject
141+
{
142+
var respTask = kubernetes.CustomObjects.ListNamespacedCustomObjectWithHttpMessagesAsync(group, version, ns, plural, watch: true);
143+
return respTask.Watch(onEvent, onError, onClosed);
144+
}
145+
118146
public void Dispose()
119147
{
120148
Dispose(true);

src/KubernetesClient/WatcherExt.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using k8s.Autorest;
22
using k8s.Exceptions;
33
using System.IO;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace k8s
@@ -72,12 +73,14 @@ public static Watcher<T> Watch<T, L>(
7273
/// <typeparam name="L">type of the HttpOperationResponse object</typeparam>
7374
/// <param name="responseTask">the api response</param>
7475
/// <param name="onError">a callbak when any exception was caught during watching</param>
76+
/// <param name="cancellationToken">cancellation token</param>
7577
/// <returns>IAsyncEnumerable of watch events</returns>
7678
public static IAsyncEnumerable<(WatchEventType, T)> WatchAsync<T, L>(
7779
this Task<HttpOperationResponse<L>> responseTask,
78-
Action<Exception> onError = null)
80+
Action<Exception> onError = null,
81+
CancellationToken cancellationToken = default)
7982
{
80-
return Watcher<T>.CreateWatchEventEnumerator(MakeStreamReaderCreator<T, L>(responseTask), onError);
83+
return Watcher<T>.CreateWatchEventEnumerator(MakeStreamReaderCreator<T, L>(responseTask), onError, cancellationToken);
8184
}
8285
}
8386
}

0 commit comments

Comments
 (0)