Skip to content

Commit b310c71

Browse files
authored
Fix Kubernetes provider membership metadata (#2421)
Harden pod label/annotation parsing and avoid stale cluster metadata.
1 parent e5b0348 commit b310c71

File tree

10 files changed

+355
-89
lines changed

10 files changed

+355
-89
lines changed

ProtoActor.sln

Lines changed: 71 additions & 54 deletions
Large diffs are not rendered by default.

logs/exceptions.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,11 @@ Proto.TestKit.TestKitException : Waited 1 seconds but failed to receive a messag
6161

6262
### Proto.Tests.SharedFutureTests.Should_wrap_request_ids_without_hitting_zero
6363
`Expected future.Pid.RequestId to be 8u, but found 1u.` observed when verifying request id wrap-around on .NET 8 after the upgrade; focused rerun and a full-suite retry passed.
64+
65+
### Issue2151 KubernetesProvider.DeregisterMemberInner
66+
```
67+
System.ArgumentNullException: Value cannot be null. (Parameter 'source')
68+
at System.Linq.ThrowHelper.ThrowArgumentNullException(ExceptionArgument argument)
69+
at System.Linq.Enumerable.Where[TSource](IEnumerable`1 source, Func`2 predicate)
70+
at Proto.Cluster.Kubernetes.KubernetesProvider.DeregisterMemberInner(Cluster cluster)
71+
```
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="InternalsVisibleTo.cs" company="Asynkron AB">
3+
// Copyright (C) 2015-2025 Asynkron AB All rights reserved
4+
// </copyright>
5+
// -----------------------------------------------------------------------
6+
7+
using System.Runtime.CompilerServices;
8+
9+
[assembly: InternalsVisibleTo("Proto.Cluster.Kubernetes.Tests")]
10+

src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,13 +244,13 @@ private void Watch(WatchEventType eventType, V1Pod eventPod)
244244

245245
private bool IsValidClusterPod(V1Pod pod)
246246
{
247-
var podLabels = pod.Metadata.Labels;
247+
var podLabels = pod.Metadata?.Labels;
248248

249-
if (!podLabels.TryGetValue(LabelCluster, out var podClusterName))
249+
if (podLabels is null || !podLabels.TryGetValue(LabelCluster, out var podClusterName))
250250
{
251251
Logger.LogInformation(
252252
"[Cluster][KubernetesProvider] The pod {PodName} is not a Proto.Cluster node",
253-
pod.Metadata.Name
253+
pod.Metadata?.Name
254254
);
255255

256256
return false;
@@ -299,4 +299,4 @@ private void UpdateTopology()
299299
throw;
300300
}
301301
}
302-
}
302+
}

src/Proto.Cluster.Kubernetes/KubernetesExtensions.cs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Collections.Generic;
99
using System.IO;
1010
using System.Linq;
11+
using System.Globalization;
1112
using System.Text;
1213
using System.Text.Json;
1314
using System.Threading.Tasks;
@@ -63,45 +64,68 @@ IDictionary<string, string> annotations
6364
[CanBeNull]
6465
internal static MemberStatus GetMemberStatus(this V1Pod pod, KubernetesProviderConfig config)
6566
{
67+
if (pod.Metadata?.DeletionTimestamp is not null)
68+
{
69+
return null;
70+
}
71+
6672
var isRunning = pod.Status is { Phase: "Running", PodIP: not null };
6773

68-
if (pod.Status?.ContainerStatuses is null)
74+
var containerStatuses = pod.Status?.ContainerStatuses;
75+
if (containerStatuses is null)
76+
{
77+
return null;
78+
}
79+
80+
var labels = pod.Metadata?.Labels;
81+
if (labels is null)
82+
{
83+
return null;
84+
}
85+
86+
if (!labels.TryGetValue(LabelPort, out var portLabel) ||
87+
!int.TryParse(portLabel, NumberStyles.Integer, CultureInfo.InvariantCulture, out var port))
88+
{
6989
return null;
90+
}
7091

71-
if (pod.Metadata?.Labels is null)
92+
if (!labels.TryGetValue(LabelMemberId, out var memberId))
93+
{
7294
return null;
95+
}
7396

74-
var kinds = pod
75-
.Metadata
76-
.Annotations
77-
.Where(l => l.Key.StartsWith(AnnotationKinds))
78-
.SelectMany(l => l.Value.Split(';'))
79-
.ToArray();
97+
var annotations = pod.Metadata.Annotations;
98+
if (annotations is null || !annotations.TryGetValue(AnnotationKinds, out var kindsAnnotation))
99+
{
100+
return null;
101+
}
102+
103+
var kinds = kindsAnnotation.Split(';', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
80104

81-
var host = pod.Status.PodIP ?? "";
82-
if (pod.Metadata.Labels.TryGetValue(LabelHost, out var hostOverride))
105+
var host = pod.Status?.PodIP ?? "";
106+
if (labels.TryGetValue(LabelHost, out var hostOverride) && !string.IsNullOrWhiteSpace(hostOverride))
107+
{
83108
host = hostOverride;
84-
else if (pod.Metadata.Labels.TryGetValue(LabelHostPrefix, out var hostPrefix))
109+
}
110+
else if (labels.TryGetValue(LabelHostPrefix, out var hostPrefix) && !string.IsNullOrWhiteSpace(hostPrefix))
85111
{
86112
var dnsPostfix = $".{pod.Namespace()}.svc.{config.ClusterDomain}";
87113

88114
// If we have a subdomain, then we can add that to the dnsPostfix, as it will be known to the cluster
89-
if (!string.IsNullOrEmpty(pod.Spec.Subdomain))
115+
if (!string.IsNullOrEmpty(pod.Spec?.Subdomain))
90116
{
91117
dnsPostfix = $".{pod.Spec.Subdomain}{dnsPostfix}";
92118
}
93119

94120
host = hostPrefix + dnsPostfix;
95121
}
96122

97-
var port = Convert.ToInt32(pod.Metadata.Labels[LabelPort]);
98-
var mid = pod.Metadata.Labels[LabelMemberId];
99-
var alive = pod.Status.ContainerStatuses.All(x => x.Ready);
123+
var alive = containerStatuses.All(x => x.Ready);
100124

101125
return new MemberStatus(isRunning, alive,
102126
new Member
103127
{
104-
Id = mid,
128+
Id = memberId,
105129
Host = host,
106130
Port = port,
107131
Kinds = { kinds }
@@ -185,4 +209,4 @@ internal static bool TryGetKubeNamespace(out string kubeNamespace)
185209
internal static string GetPodName() => Environment.MachineName;
186210
}
187211

188-
public record MemberStatus(bool IsRunning, bool IsReady, Member Member);
212+
public record MemberStatus(bool IsRunning, bool IsReady, Member Member);
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="KubernetesPodMetadata.cs" company="Asynkron AB">
3+
// Copyright (C) 2015-2025 Asynkron AB All rights reserved
4+
// </copyright>
5+
// -----------------------------------------------------------------------
6+
7+
using System;
8+
using System.Collections.Generic;
9+
using static Proto.Cluster.Kubernetes.ProtoLabels;
10+
11+
namespace Proto.Cluster.Kubernetes;
12+
13+
internal static class KubernetesPodMetadata
14+
{
15+
internal static Dictionary<string, string> ToNonClusterDictionary(IDictionary<string, string> source)
16+
{
17+
if (source is null || source.Count == 0)
18+
{
19+
return new Dictionary<string, string>();
20+
}
21+
22+
var result = new Dictionary<string, string>();
23+
24+
foreach (var (key, value) in source)
25+
{
26+
if (key.StartsWith(ProtoClusterPrefix, StringComparison.Ordinal))
27+
{
28+
continue;
29+
}
30+
31+
result[key] = value;
32+
}
33+
34+
return result;
35+
}
36+
37+
internal static void TryAddNonClusterEntries(IDictionary<string, string> source, Dictionary<string, string> destination)
38+
{
39+
if (source is null || source.Count == 0)
40+
{
41+
return;
42+
}
43+
44+
foreach (var (key, value) in source)
45+
{
46+
if (key.StartsWith(ProtoClusterPrefix, StringComparison.Ordinal))
47+
{
48+
continue;
49+
}
50+
51+
destination.TryAdd(key, value);
52+
}
53+
}
54+
}

src/Proto.Cluster.Kubernetes/KubernetesProvider.cs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,7 @@ protected override async Task RegisterMemberInner()
124124

125125
AppendHostToPodLabels(pod, labels);
126126

127-
foreach (var existing in pod.Metadata.Labels)
128-
{
129-
labels.TryAdd(existing.Key, existing.Value);
130-
}
127+
KubernetesPodMetadata.TryAddNonClusterEntries(pod.Metadata.Labels, labels);
131128

132129
var annotations = new Dictionary<string, string>
133130
{
@@ -136,10 +133,7 @@ protected override async Task RegisterMemberInner()
136133

137134
if (pod.Metadata.Annotations is not null)
138135
{
139-
foreach (var existing in pod.Metadata.Annotations)
140-
{
141-
annotations.TryAdd(existing.Key, existing.Value);
142-
}
136+
KubernetesPodMetadata.TryAddNonClusterEntries(pod.Metadata.Annotations, annotations);
143137
}
144138

145139
try
@@ -240,13 +234,15 @@ private async Task DeregisterMemberInner(Cluster cluster)
240234

241235
var pod = await kubernetes.CoreV1.ReadNamespacedPodAsync(_podName, kubeNamespace).ConfigureAwait(false);
242236

243-
var labels = pod.Metadata.Labels
244-
.Where(label => !label.Key.StartsWith(ProtoClusterPrefix, StringComparison.Ordinal))
245-
.ToDictionary(label => label.Key, label => label.Value);
237+
if (pod?.Metadata is null)
238+
{
239+
cluster.System.Root.Send(_clusterMonitor, new DeregisterMember());
240+
return;
241+
}
242+
243+
var labels = KubernetesPodMetadata.ToNonClusterDictionary(pod.Metadata.Labels);
246244

247-
var annotations = pod.Metadata.Annotations
248-
.Where(label => !label.Key.StartsWith(ProtoClusterPrefix, StringComparison.Ordinal))
249-
.ToDictionary(label => label.Key, label => label.Value);
245+
var annotations = KubernetesPodMetadata.ToNonClusterDictionary(pod.Metadata.Annotations);
250246

251247
await kubernetes.ReplacePodLabelsAndAnnotations(_podName, kubeNamespace, pod, labels, annotations).ConfigureAwait(false);
252248

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using System.Collections.Generic;
2+
using FluentAssertions;
3+
using k8s.Models;
4+
using Xunit;
5+
6+
namespace Proto.Cluster.Kubernetes.Tests;
7+
8+
public class KubernetesExtensionsTests
9+
{
10+
[Fact]
11+
public void GetMemberStatus_WhenAnnotationsAreMissing_ReturnsNull()
12+
{
13+
var pod = CreateRunningPod(
14+
labels: new Dictionary<string, string>
15+
{
16+
[ProtoLabels.LabelPort] = "12000",
17+
[ProtoLabels.LabelMemberId] = "member-1"
18+
},
19+
annotations: null
20+
);
21+
22+
var config = new KubernetesProviderConfig();
23+
24+
pod.GetMemberStatus(config).Should().BeNull();
25+
}
26+
27+
[Fact]
28+
public void GetMemberStatus_WhenPortLabelIsMissing_ReturnsNull()
29+
{
30+
var pod = CreateRunningPod(
31+
labels: new Dictionary<string, string>
32+
{
33+
[ProtoLabels.LabelMemberId] = "member-1"
34+
},
35+
annotations: new Dictionary<string, string>
36+
{
37+
[ProtoLabels.AnnotationKinds] = "kind-1"
38+
}
39+
);
40+
41+
var config = new KubernetesProviderConfig();
42+
43+
pod.GetMemberStatus(config).Should().BeNull();
44+
}
45+
46+
[Fact]
47+
public void GetMemberStatus_WhenPodHasRequiredLabelsAndAnnotations_ReturnsMemberStatus()
48+
{
49+
var pod = CreateRunningPod(
50+
labels: new Dictionary<string, string>
51+
{
52+
[ProtoLabels.LabelPort] = "12000",
53+
[ProtoLabels.LabelMemberId] = "member-1"
54+
},
55+
annotations: new Dictionary<string, string>
56+
{
57+
[ProtoLabels.AnnotationKinds] = "kind-1;kind-2"
58+
}
59+
);
60+
61+
var config = new KubernetesProviderConfig();
62+
63+
var status = pod.GetMemberStatus(config);
64+
status.Should().NotBeNull();
65+
status!.IsRunning.Should().BeTrue();
66+
status.IsReady.Should().BeTrue();
67+
status.Member.Id.Should().Be("member-1");
68+
status.Member.Port.Should().Be(12000);
69+
status.Member.Kinds.Should().Contain(new[] { "kind-1", "kind-2" });
70+
}
71+
72+
private static V1Pod CreateRunningPod(
73+
Dictionary<string, string> labels,
74+
Dictionary<string, string> annotations
75+
) =>
76+
new()
77+
{
78+
Metadata = new V1ObjectMeta
79+
{
80+
Name = "pod-1",
81+
NamespaceProperty = "default",
82+
Labels = labels,
83+
Annotations = annotations
84+
},
85+
Status = new V1PodStatus
86+
{
87+
Phase = "Running",
88+
PodIP = "10.0.0.1",
89+
ContainerStatuses = new List<V1ContainerStatus>
90+
{
91+
new()
92+
{
93+
Ready = true
94+
}
95+
}
96+
}
97+
};
98+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System.Collections.Generic;
2+
using FluentAssertions;
3+
using Xunit;
4+
5+
namespace Proto.Cluster.Kubernetes.Tests;
6+
7+
public class KubernetesPodMetadataTests
8+
{
9+
[Fact]
10+
public void ToNonClusterDictionary_WhenSourceIsNull_ReturnsEmptyDictionary()
11+
{
12+
KubernetesPodMetadata.ToNonClusterDictionary(null).Should().BeEmpty();
13+
}
14+
15+
[Fact]
16+
public void ToNonClusterDictionary_StripsProtoClusterEntries()
17+
{
18+
var source = new Dictionary<string, string>
19+
{
20+
[ProtoLabels.LabelHost] = "stale-host",
21+
["app"] = "proto"
22+
};
23+
24+
var result = KubernetesPodMetadata.ToNonClusterDictionary(source);
25+
26+
result.Should().NotContainKey(ProtoLabels.LabelHost);
27+
result.Should().ContainKey("app").WhoseValue.Should().Be("proto");
28+
}
29+
30+
[Fact]
31+
public void TryAddNonClusterEntries_DoesNotAddProtoClusterEntries()
32+
{
33+
var source = new Dictionary<string, string>
34+
{
35+
[ProtoLabels.LabelHost] = "stale-host",
36+
["app"] = "proto"
37+
};
38+
39+
var destination = new Dictionary<string, string>();
40+
41+
KubernetesPodMetadata.TryAddNonClusterEntries(source, destination);
42+
43+
destination.Should().NotContainKey(ProtoLabels.LabelHost);
44+
destination.Should().ContainKey("app").WhoseValue.Should().Be("proto");
45+
}
46+
}
47+

0 commit comments

Comments
 (0)