Skip to content

Commit 3f7c1a3

Browse files
authored
refactor: Kubernetes coordinator to be more accurate about nodes readiness (#13493)
1 parent 79860ca commit 3f7c1a3

File tree

6 files changed

+288
-573
lines changed

6 files changed

+288
-573
lines changed

docs/en/changes/changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
* Fix metrics comparison in promql with bool modifier.
9797
* Add rate limiter for Zipkin trace receiver to limit maximum spans per second.
9898
* Open `health-checker` module by default due to latest UI changes. Change the default check period to 30s.
99+
* Refactor Kubernetes coordinator to be more accurate about node readiness.
99100

100101
#### UI
101102

oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java

Lines changed: 69 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,10 @@
1818

1919
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
2020

21-
import io.fabric8.kubernetes.api.model.ObjectMeta;
22-
import io.fabric8.kubernetes.api.model.Pod;
23-
import io.fabric8.kubernetes.api.model.PodStatus;
24-
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
21+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
2522
import lombok.extern.slf4j.Slf4j;
2623
import org.apache.skywalking.oap.server.core.CoreModule;
2724
import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
28-
import org.apache.skywalking.oap.server.core.cluster.ClusterHealthStatus;
29-
import org.apache.skywalking.oap.server.core.cluster.OAPNodeChecker;
3025
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
3126
import org.apache.skywalking.oap.server.core.cluster.ServiceQueryException;
3227
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
@@ -39,66 +34,79 @@
3934
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
4035
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
4136

42-
import java.util.ArrayList;
43-
import java.util.Collections;
37+
import com.google.common.base.Strings;
38+
import com.linecorp.armeria.client.endpoint.EndpointGroup;
39+
40+
import java.util.HashMap;
4441
import java.util.List;
4542
import java.util.Map;
46-
import java.util.concurrent.ConcurrentHashMap;
4743
import java.util.stream.Collectors;
4844

49-
import static org.apache.skywalking.oap.server.cluster.plugin.kubernetes.EventType.ADDED;
50-
import static org.apache.skywalking.oap.server.cluster.plugin.kubernetes.EventType.DELETED;
51-
import static org.apache.skywalking.oap.server.cluster.plugin.kubernetes.EventType.MODIFIED;
52-
5345

5446
/**
55-
* Read collector pod info from api-server of kubernetes, then using all containerIp list to construct the list of
47+
* Read collector pod info from Kubernetes using KubernetesLabelSelectorEndpointGroup, then construct the list of
5648
* {@link RemoteInstance}.
5749
*/
5850
@Slf4j
5951
public class KubernetesCoordinator extends ClusterCoordinator {
6052
private final ModuleDefineHolder manager;
61-
private final String uid;
6253
private volatile int port = -1;
6354
private HealthCheckMetrics healthChecker;
6455
private ClusterModuleKubernetesConfig config;
65-
private final Map<String, RemoteInstance> remoteInstanceMap;
66-
private volatile List<String> latestInstances;
56+
57+
private EndpointGroup endpointGroup;
58+
private volatile List<RemoteInstance> remoteInstances;
6759

6860
public KubernetesCoordinator(final ModuleDefineHolder manager,
6961
final ClusterModuleKubernetesConfig config) {
70-
this.uid = new UidEnvSupplier(config.getUidEnvName()).get();
7162
this.manager = manager;
7263
this.config = config;
73-
this.remoteInstanceMap = new ConcurrentHashMap<>(20);
74-
this.latestInstances = new ArrayList<>(20);
64+
65+
if (Strings.isNullOrEmpty(config.getLabelSelector())) {
66+
throw new IllegalArgumentException("kubernetes labelSelector must be provided");
67+
}
68+
}
69+
70+
private EndpointGroup createEndpointGroup() {
71+
if (port == -1) {
72+
port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
73+
}
74+
final var kubernetesClient = new KubernetesClientBuilder().build();
75+
final var builder = KubernetesLabelSelectorEndpointGroup.builder(kubernetesClient);
76+
77+
if (StringUtil.isNotBlank(config.getNamespace())) {
78+
builder.namespace(config.getNamespace());
79+
}
80+
81+
final var labelMap = parseLabelSelector(config.getLabelSelector());
82+
builder.labelSelector(labelMap);
83+
84+
builder.port(port);
85+
builder.selfUid(new UidEnvSupplier(config.getUidEnvName()).get());
86+
87+
return builder.build();
88+
}
89+
90+
private Map<String, String> parseLabelSelector(String labelSelector) {
91+
final var labels = new HashMap<String, String>();
92+
if (StringUtil.isBlank(labelSelector)) {
93+
return labels;
94+
}
95+
96+
final var pairs = labelSelector.split(",");
97+
for (final var pair : pairs) {
98+
final var keyValue = pair.trim().split("=", 2);
99+
if (keyValue.length == 2) {
100+
labels.put(keyValue[0].trim(), keyValue[1].trim());
101+
}
102+
}
103+
return labels;
75104
}
76105

77106
@Override
78107
public List<RemoteInstance> queryRemoteNodes() {
79108
try {
80-
List<Pod> pods = NamespacedPodListInformer.INFORMER.listPods().orElseGet(this::selfPod);
81-
if (log.isDebugEnabled()) {
82-
List<String> uidList = pods
83-
.stream()
84-
.map(item -> item.getMetadata().getUid())
85-
.collect(Collectors.toList());
86-
log.debug("[kubernetes cluster pods uid list]:{}", uidList);
87-
}
88-
if (port == -1) {
89-
port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
90-
}
91-
List<RemoteInstance> remoteInstances =
92-
pods.stream()
93-
.filter(pod -> StringUtil.isNotBlank(pod.getStatus().getPodIP()))
94-
.map(pod -> new RemoteInstance(
95-
new Address(pod.getStatus().getPodIP(), port, pod.getMetadata().getUid().equals(uid))))
96-
.collect(Collectors.toList());
97109
healthChecker.health();
98-
this.latestInstances = remoteInstances.stream().map(it -> it.getAddress().toString()).collect(Collectors.toList());
99-
if (log.isDebugEnabled()) {
100-
remoteInstances.forEach(instance -> log.debug("kubernetes cluster instance: {}", instance));
101-
}
102110
return remoteInstances;
103111
} catch (Throwable e) {
104112
healthChecker.unHealth(e);
@@ -127,93 +135,33 @@ private void initHealthChecker() {
127135
}
128136
}
129137

130-
private List<Pod> selfPod() {
131-
Pod v1Pod = new Pod();
132-
v1Pod.setMetadata(new ObjectMeta());
133-
v1Pod.setStatus(new PodStatus());
134-
v1Pod.getMetadata().setUid(uid);
135-
v1Pod.getStatus().setPodIP("127.0.0.1");
136-
return Collections.singletonList(v1Pod);
137-
}
138-
139138
@Override
140139
public void start() {
141-
initHealthChecker();
142-
NamespacedPodListInformer.INFORMER.init(config, new K8sResourceEventHandler());
143-
}
140+
endpointGroup = createEndpointGroup();
141+
endpointGroup.addListener(endpoints -> {
142+
if (port == -1) {
143+
port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
144+
}
144145

145-
class K8sResourceEventHandler implements ResourceEventHandler<Pod> {
146+
if (log.isDebugEnabled()) {
147+
log.debug("[kubernetes cluster endpoints]: {}", endpoints);
148+
}
146149

147-
@Override
148-
public void onAdd(final Pod obj) {
149-
updateRemoteInstances(obj, ADDED);
150-
}
150+
final var instances = endpoints.stream()
151+
.map(endpoint -> new RemoteInstance(new Address(endpoint.host(), endpoint.port(), false)))
152+
.collect(Collectors.toList());
151153

152-
@Override
153-
public void onUpdate(final Pod oldObj, final Pod newObj) {
154-
updateRemoteInstances(newObj, MODIFIED);
155-
}
154+
// The endpoint group will never include itself, add it.
155+
final var selfInstance = new RemoteInstance(new Address("127.0.0.1", port, true));
156+
instances.add(selfInstance);
156157

157-
@Override
158-
public void onDelete(final Pod obj, final boolean deletedFinalStateUnknown) {
159-
updateRemoteInstances(obj, DELETED);
160-
}
161-
}
162-
163-
/**
164-
* When a remote instance up/off line, will receive multi event according to the pod status.
165-
* To avoid notify the watchers too frequency, here use a `remoteInstanceMap` to cache them.
166-
* Only notify watchers once when the instances changed.
167-
*/
168-
private void updateRemoteInstances(Pod pod, EventType event) {
169-
try {
170-
initHealthChecker();
171-
if (StringUtil.isNotBlank(pod.getStatus().getPodIP())) {
172-
if (port == -1) {
173-
port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
174-
}
175-
176-
RemoteInstance remoteInstance = new RemoteInstance(
177-
new Address(pod.getStatus().getPodIP(), this.port, pod.getMetadata().getUid().equals(uid)));
178-
switch (event) {
179-
case ADDED:
180-
case MODIFIED:
181-
if ("Running".equalsIgnoreCase(pod.getStatus().getPhase())) {
182-
remoteInstanceMap.put(remoteInstance.getAddress().toString(), remoteInstance);
183-
} else if ("Terminating".equalsIgnoreCase(pod.getStatus().getPhase())) {
184-
remoteInstanceMap.remove(remoteInstance.getAddress().toString());
185-
}
186-
break;
187-
case DELETED:
188-
this.remoteInstanceMap.remove(remoteInstance.getAddress().toString());
189-
break;
190-
default:
191-
return;
192-
}
193-
updateRemoteInstances();
158+
if (log.isDebugEnabled()) {
159+
instances.forEach(instance -> log.debug("kubernetes cluster instance: {}", instance));
194160
}
195-
} catch (Throwable e) {
196-
healthChecker.unHealth(e);
197-
log.error("Failed to notify RemoteInstances update.", e);
198-
}
199-
}
200-
201-
private void updateRemoteInstances() {
202-
List<String> updatedInstances = new ArrayList<>(this.remoteInstanceMap.keySet());
203-
if (this.latestInstances.size() != updatedInstances.size() || !this.latestInstances.containsAll(updatedInstances)) {
204-
List<RemoteInstance> remoteInstances = new ArrayList<>(this.remoteInstanceMap.values());
205-
this.latestInstances = updatedInstances;
206-
checkHealth(remoteInstances);
207-
notifyWatchers(remoteInstances);
208-
}
209-
}
210161

211-
private void checkHealth(List<RemoteInstance> remoteInstances) {
212-
ClusterHealthStatus healthStatus = OAPNodeChecker.isHealth(remoteInstances);
213-
if (healthStatus.isHealth()) {
214-
this.healthChecker.health();
215-
} else {
216-
this.healthChecker.unHealth(healthStatus.getReason());
217-
}
162+
this.remoteInstances = instances;
163+
notifyWatchers(instances);
164+
}, true);
165+
initHealthChecker();
218166
}
219167
}

0 commit comments

Comments
 (0)