Skip to content

Commit 29a9c22

Browse files
qmfrederikbrendandburns
authored andcommitted
Handle error messages in the watcher (#150)
* Handle error messages in the watcher * Fix documentation * Fix typo
1 parent 194211b commit 29a9c22

File tree

4 files changed

+186
-4
lines changed

4 files changed

+186
-4
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
using k8s.Models;
2+
using System;
3+
#if !NETSTANDARD1_4
4+
using System.Runtime.Serialization;
5+
#endif
6+
7+
namespace k8s
8+
{
9+
/// <summary>
10+
/// Represents an error message returned by the Kubernetes API server.
11+
/// </summary>
12+
public class KubernetesException : Exception
13+
{
14+
/// <summary>
15+
/// Initializes a new instance of the <see cref="KubernetesException"/> class.
16+
/// </summary>
17+
public KubernetesException()
18+
{
19+
}
20+
21+
/// <summary>
22+
/// Initializes a ne winstance of the <see cref="KubernetesException"/> class using
23+
/// the data from a <see cref="V1Status"/> object.
24+
/// </summary>
25+
/// <param name="status">
26+
/// A status message which triggered this exception to be thrown.
27+
/// </param>
28+
public KubernetesException(V1Status status)
29+
: this(status?.Message)
30+
{
31+
this.Status = status;
32+
}
33+
34+
/// <summary>
35+
/// Initializes a new instance of the <see cref="KubernetesException"/> class with an error message.
36+
/// </summary>
37+
/// <param name="message">
38+
/// The error message that explains the reason for the exception.
39+
/// </param>
40+
public KubernetesException(string message)
41+
: base(message)
42+
{
43+
}
44+
45+
/// <summary>
46+
/// Initializes a new instance of the <see cref="KubernetesException"/> class with a specified error
47+
/// message and a reference to the inner exception that is the cause of this exception.
48+
/// </summary>
49+
/// <param name="message">
50+
/// The error message that explains the reason for the exception.
51+
/// </param>
52+
/// <param name="innerException">
53+
/// The exception that is the cause of the current exception, or <see langword="null"/>
54+
/// if no inner exception is specified.
55+
/// </param>
56+
public KubernetesException(string message, Exception innerException)
57+
: base(message, innerException)
58+
{
59+
}
60+
61+
#if !NETSTANDARD1_4
62+
/// <summary>
63+
/// Initializes a new instance of the <see cref="KubernetesException"/> class with serialized data.
64+
/// </summary>
65+
/// <param name="info">
66+
/// The <see cref="SerializationInfo"/> that holds the serialized
67+
/// object data about the exception being thrown.
68+
/// </param>
69+
/// <param name="context">
70+
/// The <see cref="StreamingContext"/> that contains contextual information
71+
/// about the source or destination.
72+
/// </param>
73+
protected KubernetesException(SerializationInfo info, StreamingContext context)
74+
: base(info, context)
75+
{
76+
}
77+
#endif
78+
79+
/// <summary>
80+
/// Gets, when this exception was raised because of a Kubernetes status message, the underlying
81+
/// Kubernetes status message.
82+
/// </summary>
83+
public V1Status Status
84+
{
85+
get;
86+
private set;
87+
}
88+
}
89+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using Newtonsoft.Json;
2+
3+
namespace k8s
4+
{
5+
/// <summary>
6+
/// Represents a generic Kubernetes object.
7+
/// </summary>
8+
/// <remarks>
9+
/// You can use the <see cref="KubernetesObject"/> if you receive JSON from a Kubernetes API server but
10+
/// are unsure which object the API server is about to return. You can parse the JSON as a <see cref="KubernetesObject"/>
11+
/// and use the <see cref="ApiVersion"/> and <see cref="Kind"/> properties to get basic metadata about any Kubernetes object.
12+
/// You can then
13+
/// </remarks>
14+
public class KubernetesObject
15+
{
16+
/// <summary>
17+
/// Gets or sets aPIVersion defines the versioned schema of this
18+
/// representation of an object. Servers should convert recognized
19+
/// schemas to the latest internal value, and may reject unrecognized
20+
/// values. More info:
21+
/// https://git.k8s.io/community/contributors/devel/api-conventions.md#resources
22+
/// </summary>
23+
[JsonProperty(PropertyName = "apiVersion")]
24+
public string ApiVersion { get; set; }
25+
26+
/// <summary>
27+
/// Gets or sets kind is a string value representing the REST resource
28+
/// this object represents. Servers may infer this from the endpoint
29+
/// the client submits requests to. Cannot be updated. In CamelCase.
30+
/// More info:
31+
/// https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds
32+
/// </summary>
33+
[JsonProperty(PropertyName = "kind")]
34+
public string Kind { get; set; }
35+
}
36+
}

src/KubernetesClient/Watcher.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Threading;
55
using System.Threading.Tasks;
66
using k8s.Exceptions;
7+
using k8s.Models;
78
using Microsoft.Rest;
89
using Microsoft.Rest.Serialization;
910

@@ -30,7 +31,7 @@ public class Watcher<T> : IDisposable
3031
private readonly CancellationTokenSource _cts;
3132
private readonly StreamReader _streamReader;
3233

33-
internal Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError)
34+
public Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError)
3435
{
3536
_streamReader = streamReader;
3637
OnEvent += onEvent;
@@ -56,9 +57,20 @@ internal Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, A
5657
var line = await streamReader.ReadLineAsync();
5758

5859
try
59-
{
60-
var @event = SafeJsonConvert.DeserializeObject<WatchEvent>(line);
61-
OnEvent?.Invoke(@event.Type, @event.Object);
60+
{
61+
var genericEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<KubernetesObject>.WatchEvent>(line);
62+
63+
if (genericEvent.Object.Kind == "Status")
64+
{
65+
var statusEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<V1Status>.WatchEvent>(line);
66+
var exception = new KubernetesException(statusEvent.Object);
67+
this.OnError?.Invoke(exception);
68+
}
69+
else
70+
{
71+
var @event = SafeJsonConvert.DeserializeObject<k8s.Watcher<T>.WatchEvent>(line);
72+
this.OnEvent?.Invoke(@event.Type, @event.Object);
73+
}
6274
}
6375
catch (Exception e)
6476
{
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using k8s.Models;
2+
using System;
3+
using System.IO;
4+
using System.Text;
5+
using System.Threading;
6+
using Xunit;
7+
8+
namespace k8s.tests
9+
{
10+
public class WatcherTests
11+
{
12+
[Fact]
13+
public void ReadError()
14+
{
15+
byte[] data = Encoding.UTF8.GetBytes("{\"type\":\"ERROR\",\"object\":{\"kind\":\"Status\",\"apiVersion\":\"v1\",\"metadata\":{},\"status\":\"Failure\",\"message\":\"too old resource version: 44982(53593)\",\"reason\":\"Gone\",\"code\":410}}");
16+
17+
using (MemoryStream stream = new MemoryStream(data))
18+
using (StreamReader reader = new StreamReader(stream))
19+
{
20+
Exception recordedException = null;
21+
ManualResetEvent mre = new ManualResetEvent(false);
22+
23+
Watcher<V1Pod> watcher = new Watcher<V1Pod>(
24+
reader,
25+
null,
26+
(exception) =>
27+
{
28+
recordedException = exception;
29+
mre.Set();
30+
});
31+
32+
mre.WaitOne();
33+
34+
Assert.NotNull(recordedException);
35+
36+
var k8sException = recordedException as KubernetesException;
37+
38+
Assert.NotNull(k8sException);
39+
Assert.NotNull(k8sException.Status);
40+
Assert.Equal("too old resource version: 44982(53593)", k8sException.Message);
41+
Assert.Equal("too old resource version: 44982(53593)", k8sException.Status.Message);
42+
}
43+
}
44+
}
45+
}

0 commit comments

Comments
 (0)