Skip to content

Commit 1935c26

Browse files
committed
Operator auto-recreates domain config map
1 parent e5578dd commit 1935c26

File tree

6 files changed

+186
-2
lines changed

6 files changed

+186
-2
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2018, Oracle Corporation and/or its affiliates. All rights reserved.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
3+
4+
package oracle.kubernetes.operator;
5+
6+
import io.kubernetes.client.ApiException;
7+
import io.kubernetes.client.models.V1ConfigMap;
8+
import io.kubernetes.client.util.Watch;
9+
import oracle.kubernetes.operator.builders.WatchBuilder;
10+
import oracle.kubernetes.operator.builders.WatchI;
11+
import oracle.kubernetes.operator.helpers.ClientHelper;
12+
import oracle.kubernetes.operator.helpers.ClientHolder;
13+
import oracle.kubernetes.operator.watcher.Watcher;
14+
import oracle.kubernetes.operator.watcher.Watching;
15+
import oracle.kubernetes.operator.watcher.WatchingEventDestination;
16+
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
19+
/**
20+
* This class handles ConfigMap watching. It receives config map change events and sends
21+
* them into the operator for processing.
22+
*/
23+
public class ConfigMapWatcher implements Runnable {
24+
private final String ns;
25+
private final String initialResourceVersion;
26+
private final WatchingEventDestination<V1ConfigMap> destination;
27+
private final AtomicBoolean isStopping;
28+
29+
public static ConfigMapWatcher create(String ns, String initialResourceVersion, WatchingEventDestination<V1ConfigMap> destination, AtomicBoolean isStopping) {
30+
ConfigMapWatcher dlw = new ConfigMapWatcher(ns, initialResourceVersion, destination, isStopping);
31+
Thread thread = new Thread(dlw);
32+
thread.setName("Thread-ConfigMapWatcher-" + ns);
33+
thread.setDaemon(true);
34+
thread.start();
35+
return dlw;
36+
}
37+
38+
private ConfigMapWatcher(String ns, String initialResourceVersion, WatchingEventDestination<V1ConfigMap> destination, AtomicBoolean isStopping) {
39+
this.ns = ns;
40+
this.initialResourceVersion = initialResourceVersion;
41+
this.destination = destination;
42+
this.isStopping = isStopping;
43+
}
44+
45+
/**
46+
* Polling loop. Get the next ConfigMap object event and process it.
47+
*/
48+
@Override
49+
public void run() {
50+
ClientHelper helper = ClientHelper.getInstance();
51+
ClientHolder client = helper.take();
52+
try {
53+
Watching<V1ConfigMap> w = createWatching(client);
54+
Watcher<V1ConfigMap> watcher = new Watcher<>(w, initialResourceVersion);
55+
56+
// invoke watch on current Thread. Won't return until watch stops
57+
watcher.doWatch();
58+
59+
} finally {
60+
helper.recycle(client);
61+
}
62+
}
63+
64+
protected Watching<V1ConfigMap> createWatching(ClientHolder client) {
65+
return new Watching<V1ConfigMap>() {
66+
67+
/**
68+
* Watcher callback to issue the list ConfigMap changes. It is driven by the
69+
* Watcher wrapper to issue repeated watch requests.
70+
* @param resourceVersion resource version to omit older events
71+
* @return Watch object or null if the operation should end
72+
* @throws ApiException if there is an API error.
73+
*/
74+
@Override
75+
public WatchI<V1ConfigMap> initiateWatch(String resourceVersion) throws ApiException {
76+
return new WatchBuilder(client)
77+
.withResourceVersion(resourceVersion)
78+
.withLabelSelector(LabelConstants.CREATEDBYOPERATOR_LABEL)
79+
.createConfigMapWatch(ns);
80+
}
81+
82+
@Override
83+
public void eventCallback(Watch.Response<V1ConfigMap> item) {
84+
processEventCallback(item);
85+
}
86+
87+
@Override
88+
public boolean isStopping() {
89+
return isStopping.get();
90+
}
91+
};
92+
}
93+
94+
public void processEventCallback(Watch.Response<V1ConfigMap> item) {
95+
destination.eventCallback(item);
96+
}
97+
}

src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.concurrent.atomic.AtomicBoolean;
1717

1818
import io.kubernetes.client.ApiException;
19+
import io.kubernetes.client.models.V1ConfigMap;
1920
import io.kubernetes.client.models.V1DeleteOptions;
2021
import io.kubernetes.client.models.V1EnvVar;
2122
import io.kubernetes.client.models.V1ObjectMeta;
@@ -91,6 +92,7 @@ public class Main {
9192
private static String principal;
9293
private static RestServer restServer = null;
9394
private static Thread livenessThread = null;
95+
private static Map<String, ConfigMapWatcher> configMapWatchers = new HashMap<>();
9496
private static Map<String, DomainWatcher> domainWatchers = new HashMap<>();
9597
private static Map<String, PodWatcher> podWatchers = new HashMap<>();
9698
private static Map<String, ServiceWatcher> serviceWatchers = new HashMap<>();
@@ -207,6 +209,7 @@ public NextAction onSuccess(Packet packet, DomainList result, int statusCode,
207209
});
208210

209211
Step initialize = ConfigMapHelper.createScriptConfigMapStep(ns,
212+
new ConfigMapAfterStep(ns,
210213
CallBuilder.create().with($ -> {
211214
$.labelSelector = LabelConstants.DOMAINUID_LABEL
212215
+ "," + LabelConstants.CREATEDBYOPERATOR_LABEL;
@@ -319,7 +322,7 @@ public NextAction onSuccess(Packet packet, V1PodList result, int statusCode,
319322
podWatchers.put(ns, createPodWatcher(ns, result != null ? result.getMetadata().getResourceVersion() : ""));
320323
return doNext(packet);
321324
}
322-
}));
325+
})));
323326

324327
engine.createFiber().start(initialize, new Packet(), new CompletionCallback() {
325328
@Override
@@ -1668,6 +1671,53 @@ private static void dispatchIngressWatch(Watch.Response<V1beta1Ingress> item) {
16681671
}
16691672
}
16701673

1674+
private static class ConfigMapAfterStep extends Step {
1675+
private final String ns;
1676+
1677+
public ConfigMapAfterStep(String ns, Step next) {
1678+
super(next);
1679+
this.ns = ns;
1680+
}
1681+
1682+
@Override
1683+
public NextAction apply(Packet packet) {
1684+
V1ConfigMap result = (V1ConfigMap) packet.get(ProcessingConstants.SCRIPT_CONFIG_MAP);
1685+
configMapWatchers.put(ns, createConfigMapWatcher(ns, result != null ? result.getMetadata().getResourceVersion() : ""));
1686+
return doNext(packet);
1687+
}
1688+
}
1689+
1690+
private static ConfigMapWatcher createConfigMapWatcher(String namespace, String initialResourceVersion) {
1691+
return ConfigMapWatcher.create(namespace, initialResourceVersion, Main::dispatchConfigMapWatch, stopping);
1692+
}
1693+
1694+
private static void dispatchConfigMapWatch(Watch.Response<V1ConfigMap> item) {
1695+
V1ConfigMap c = item.object;
1696+
if (c != null) {
1697+
switch (item.type) {
1698+
case "MODIFIED":
1699+
case "DELETED":
1700+
engine.createFiber().start(
1701+
ConfigMapHelper.createScriptConfigMapStep(c.getMetadata().getNamespace(), null),
1702+
new Packet(), new CompletionCallback() {
1703+
@Override
1704+
public void onCompletion(Packet packet) {
1705+
// no-op
1706+
}
1707+
1708+
@Override
1709+
public void onThrowable(Packet packet, Throwable throwable) {
1710+
LOGGER.severe(MessageKeys.EXCEPTION, throwable);
1711+
}
1712+
});
1713+
break;
1714+
1715+
case "ERROR":
1716+
default:
1717+
}
1718+
}
1719+
}
1720+
16711721
/**
16721722
* Dispatch the Domain event to the appropriate handler.
16731723
*

src/main/java/oracle/kubernetes/operator/ProcessingConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ public interface ProcessingConstants {
2828
public static final String EXPLICIT_RESTART_SERVERS = "explicitRestartServers";
2929
public static final String EXPLICIT_RESTART_CLUSTERS = "explicitRestartClusters";
3030

31+
public static final String SCRIPT_CONFIG_MAP = "scriptConfigMap";
32+
3133
}

src/main/java/oracle/kubernetes/operator/ServiceWatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.concurrent.atomic.AtomicBoolean;
2020

2121
/**
22-
* This class handles Service watching. It service change events and sends
22+
* This class handles Service watching. It receives service change events and sends
2323
* them into the operator for processing.
2424
*/
2525
public class ServiceWatcher implements Runnable {

src/main/java/oracle/kubernetes/operator/builders/WatchBuilder.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.kubernetes.client.ApiException;
88
import io.kubernetes.client.ProgressRequestBody;
99
import io.kubernetes.client.ProgressResponseBody;
10+
import io.kubernetes.client.models.V1ConfigMap;
1011
import io.kubernetes.client.models.V1Pod;
1112
import io.kubernetes.client.models.V1Service;
1213
import io.kubernetes.client.models.V1beta1Ingress;
@@ -179,6 +180,36 @@ public Call apply(ClientHolder clientHolder, CallParams callParams) {
179180
}
180181
}
181182

183+
/**
184+
* Creates a web hook object to track config map calls
185+
* @param namespace the namespace
186+
* @return the active web hook
187+
* @throws ApiException if there is an error on the call that sets up the web hook.
188+
*/
189+
public WatchI<V1ConfigMap> createConfigMapWatch(String namespace) throws ApiException {
190+
return FACTORY.createWatch(clientHolder, callParams, V1ConfigMap.class, new ListNamespacedConfigMapCall(namespace));
191+
}
192+
193+
private class ListNamespacedConfigMapCall implements BiFunction<ClientHolder, CallParams, Call> {
194+
private String namespace;
195+
196+
ListNamespacedConfigMapCall(String namespace) {
197+
this.namespace = namespace;
198+
}
199+
200+
@Override
201+
public Call apply(ClientHolder clientHolder, CallParams callParams) {
202+
try {
203+
return clientHolder.getCoreApiClient().listNamespacedConfigMapCall(namespace,
204+
callParams.getPretty(), START_LIST,
205+
callParams.getFieldSelector(), callParams.getIncludeUninitialized(), callParams.getLabelSelector(),
206+
callParams.getLimit(), callParams.getResourceVersion(), callParams.getTimeoutSeconds(), WATCH, null, null);
207+
} catch (ApiException e) {
208+
throw new UncheckedApiException(e);
209+
}
210+
}
211+
}
212+
182213
/**
183214
* Sets a value for the fieldSelector parameter for the call that will set up this watch. Defaults to null.
184215
* @param fieldSelector the desired value

src/main/java/oracle/kubernetes/operator/helpers/ConfigMapHelper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.kubernetes.client.models.V1ObjectMeta;
1313
import oracle.kubernetes.operator.KubernetesConstants;
1414
import oracle.kubernetes.operator.LabelConstants;
15+
import oracle.kubernetes.operator.ProcessingConstants;
1516
import oracle.kubernetes.operator.logging.LoggingFacade;
1617
import oracle.kubernetes.operator.logging.LoggingFactory;
1718
import oracle.kubernetes.operator.logging.MessageKeys;
@@ -122,13 +123,15 @@ public NextAction onSuccess(Packet packet, V1ConfigMap result, int statusCode,
122123
Map<String, List<String>> responseHeaders) {
123124

124125
LOGGER.info(MessageKeys.CM_CREATED, namespace);
126+
packet.put(ProcessingConstants.SCRIPT_CONFIG_MAP, result);
125127
return doNext(packet);
126128
}
127129
});
128130
return doNext(create, packet);
129131
} else if (AnnotationHelper.checkFormatAnnotation(result.getMetadata()) && result.getData().entrySet().containsAll(data.entrySet())) {
130132
// existing config map has correct data
131133
LOGGER.fine(MessageKeys.CM_EXISTS, namespace);
134+
packet.put(ProcessingConstants.SCRIPT_CONFIG_MAP, result);
132135
return doNext(packet);
133136
} else {
134137
// we need to update the config map
@@ -146,6 +149,7 @@ public NextAction onFailure(Packet packet, ApiException e, int statusCode,
146149
public NextAction onSuccess(Packet packet, V1ConfigMap result, int statusCode,
147150
Map<String, List<String>> responseHeaders) {
148151
LOGGER.info(MessageKeys.CM_REPLACED, namespace);
152+
packet.put(ProcessingConstants.SCRIPT_CONFIG_MAP, result);
149153
return doNext(packet);
150154
}
151155
});

0 commit comments

Comments
 (0)