Skip to content

Commit d9271e1

Browse files
committed
Restore deleted pods, services, and ingress resources
1 parent d27fefc commit d9271e1

14 files changed

+427
-63
lines changed

src/main/java/oracle/kubernetes/operator/DomainWatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public Watch<Domain> initiateWatch(Object context, String resourceVersion) throw
7676
return Watch.createWatch(client.getApiClient(),
7777
client.callBuilder().with($ -> {
7878
$.resourceVersion = resourceVersion;
79-
$.timeoutSeconds = 2;
79+
$.timeoutSeconds = 30;
8080
$.watch = true;
8181
}).listDomainCall(ns),
8282
new TypeToken<Watch.Response<Domain>>() {
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2018, Oracle Corporation and/or its affiliates. All rights reserved.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
3+
4+
package oracle.kubernetes.operator;
5+
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
8+
import com.google.gson.reflect.TypeToken;
9+
import io.kubernetes.client.ApiException;
10+
import io.kubernetes.client.models.V1beta1Ingress;
11+
import io.kubernetes.client.util.Watch;
12+
import oracle.kubernetes.operator.helpers.ClientHelper;
13+
import oracle.kubernetes.operator.helpers.ClientHolder;
14+
import oracle.kubernetes.operator.watcher.Watcher;
15+
import oracle.kubernetes.operator.watcher.Watching;
16+
import oracle.kubernetes.operator.watcher.WatchingEventDestination;
17+
18+
/**
19+
* This class handles Ingress watching. It receives Ingress change events and sends
20+
* them into the operator for processing.
21+
*/
22+
public class IngressWatcher implements Runnable {
23+
private final String ns;
24+
private final String initialResourceVersion;
25+
private final WatchingEventDestination<V1beta1Ingress> destination;
26+
private final AtomicBoolean isStopping;
27+
28+
public static IngressWatcher create(String ns, String initialResourceVersion, WatchingEventDestination<V1beta1Ingress> destination, AtomicBoolean isStopping) {
29+
IngressWatcher dlw = new IngressWatcher(ns, initialResourceVersion, destination, isStopping);
30+
Thread thread = new Thread(dlw);
31+
thread.setName("Thread-IngressWatcher-" + ns);
32+
thread.setDaemon(true);
33+
thread.start();
34+
return dlw;
35+
}
36+
37+
private IngressWatcher(String ns, String initialResourceVersion, WatchingEventDestination<V1beta1Ingress> destination, AtomicBoolean isStopping) {
38+
this.ns = ns;
39+
this.initialResourceVersion = initialResourceVersion;
40+
this.destination = destination;
41+
this.isStopping = isStopping;
42+
}
43+
44+
/**
45+
* Polling loop. Get the next Ingress object event and process it.
46+
*/
47+
@Override
48+
public void run() {
49+
ClientHelper helper = ClientHelper.getInstance();
50+
ClientHolder client = helper.take();
51+
try {
52+
Watching<V1beta1Ingress> w = createWatching(client);
53+
Watcher<V1beta1Ingress> watcher = new Watcher<V1beta1Ingress>(w, null, initialResourceVersion);
54+
55+
// invoke watch on current Thread. Won't return until watch stops
56+
watcher.doWatch();
57+
58+
} finally {
59+
helper.recycle(client);
60+
}
61+
}
62+
63+
protected Watching<V1beta1Ingress> createWatching(ClientHolder client) {
64+
return new Watching<V1beta1Ingress>() {
65+
66+
/**
67+
* Watcher callback to issue the list Ingress changes. It is driven by the
68+
* Watcher wrapper to issue repeated watch requests.
69+
* @param context user defined contact object or null
70+
* @param resourceVersion resource version to omit older events
71+
* @return Watch object or null if the operation should end
72+
* @throws ApiException if there is an API error.
73+
*/
74+
@Override
75+
public Watch<V1beta1Ingress> initiateWatch(Object context, String resourceVersion) throws ApiException {
76+
return Watch.createWatch(client.getApiClient(),
77+
client.callBuilder().with($ -> {
78+
$.resourceVersion = resourceVersion;
79+
$.labelSelector = LabelConstants.DOMAINUID_LABEL; // Any Ingress with a domainUID label
80+
$.timeoutSeconds = 30;
81+
$.watch = true;
82+
}).listIngressCall(ns),
83+
new TypeToken<Watch.Response<V1beta1Ingress>>() {
84+
}.getType());
85+
}
86+
87+
@Override
88+
public void eventCallback(Watch.Response<V1beta1Ingress> item) {
89+
processEventCallback(item);
90+
}
91+
92+
@Override
93+
public boolean isStopping() {
94+
return isStopping.get();
95+
}
96+
};
97+
}
98+
99+
public void processEventCallback(Watch.Response<V1beta1Ingress> item) {
100+
destination.eventCallback(item);
101+
}
102+
}

src/main/java/oracle/kubernetes/operator/LabelConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ public interface LabelConstants {
88
public static final String DOMAINUID_LABEL = "weblogic.domainUID";
99
public static final String DOMAINNAME_LABEL = "weblogic.domainName";
1010
public static final String SERVERNAME_LABEL = "weblogic.serverName";
11+
public static final String CHANNELNAME_LABEL = "weblogic.channelName";
1112
public static final String CLUSTERNAME_LABEL = "weblogic.clusterName";
1213

1314
}

src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 111 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,13 @@ public class Main {
8282
private static final ConcurrentMap<String, Fiber> domainUpdaters = new ConcurrentHashMap<String, Fiber>();
8383
private static final ConcurrentMap<String, DomainPresenceInfo> domains = new ConcurrentHashMap<String, DomainPresenceInfo>();
8484
private static final AtomicBoolean stopping = new AtomicBoolean(false);
85+
private static String principal;
8586
private static RestServer restServer = null;
8687
private static Thread livenessThread = null;
8788
private static Map<String, DomainWatcher> domainWatchers = new HashMap<>();
8889
private static Map<String, PodWatcher> podWatchers = new HashMap<>();
90+
private static Map<String, ServiceWatcher> serviceWatchers = new HashMap<>();
91+
private static Map<String, IngressWatcher> ingressWatchers = new HashMap<>();
8992

9093
private static final Engine engine = new Engine("operator");
9194

@@ -116,7 +119,7 @@ public static void main(String[] args) {
116119
if (serviceAccountName == null) {
117120
serviceAccountName = "default";
118121
}
119-
String principal = "system:serviceaccount:" + namespace + ":" + serviceAccountName;
122+
principal = "system:serviceaccount:" + namespace + ":" + serviceAccountName;
120123

121124
LOGGER.info(MessageKeys.OP_CONFIG_NAMESPACE, namespace);
122125
StringBuilder tns = new StringBuilder();
@@ -160,8 +163,9 @@ public static void main(String[] args) {
160163
// this would happen when the Domain was running BEFORE the Operator starts up
161164
LOGGER.info(MessageKeys.LISTING_DOMAINS);
162165
for (String ns : targetNamespaces) {
163-
PodWatcher pw = createPodWatcher(ns);
164-
podWatchers.put(ns, pw);
166+
podWatchers.put(ns, createPodWatcher(ns));
167+
serviceWatchers.put(ns, createServiceWatcher(ns));
168+
ingressWatchers.put(ns, createIngressWatcher(ns));
165169

166170
Step domainList = CallBuilder.create().listDomainAsync(ns, new ResponseStep<DomainList>(null) {
167171
@Override
@@ -178,12 +182,12 @@ public NextAction onSuccess(Packet packet, DomainList result, int statusCode,
178182
Map<String, List<String>> responseHeaders) {
179183
if (result != null) {
180184
for (Domain dom : result.getItems()) {
181-
doCheckAndCreateDomainPresence(principal, dom);
185+
doCheckAndCreateDomainPresence(dom);
182186
}
183187
}
184188

185189
// main logic now happens in the watch handlers
186-
domainWatchers.put(ns, createDomainWatcher(principal, ns, result != null ? result.getMetadata().getResourceVersion() : ""));
190+
domainWatchers.put(ns, createDomainWatcher(ns, result != null ? result.getMetadata().getResourceVersion() : ""));
187191
return doNext(packet);
188192
}
189193
});
@@ -357,7 +361,7 @@ public static void doRestartAdmin(String principal, String domainUID) {
357361
if (info != null) {
358362
Domain dom = info.getDomain();
359363
if (dom != null) {
360-
doCheckAndCreateDomainPresence(principal, dom, true, null, null);
364+
doCheckAndCreateDomainPresence(dom, true, null, null);
361365
}
362366
}
363367
}
@@ -374,7 +378,7 @@ public static void doRollingRestartServers(String principal, String domainUID, L
374378
if (info != null) {
375379
Domain dom = info.getDomain();
376380
if (dom != null) {
377-
doCheckAndCreateDomainPresence(principal, dom, false, servers, null);
381+
doCheckAndCreateDomainPresence(dom, false, servers, null);
378382
}
379383
}
380384
}
@@ -391,17 +395,17 @@ public static void doRollingRestartClusters(String principal, String domainUID,
391395
if (info != null) {
392396
Domain dom = info.getDomain();
393397
if (dom != null) {
394-
doCheckAndCreateDomainPresence(principal, dom, false, null, clusters);
398+
doCheckAndCreateDomainPresence(dom, false, null, clusters);
395399
}
396400
}
397401
}
398402

399-
private static void doCheckAndCreateDomainPresence(String principal, Domain dom) {
400-
doCheckAndCreateDomainPresence(principal, dom, false, null, null);
403+
private static void doCheckAndCreateDomainPresence(Domain dom) {
404+
doCheckAndCreateDomainPresence(dom, false, null, null);
401405
}
402406

403407
private static void doCheckAndCreateDomainPresence(
404-
String principal, Domain dom, boolean explicitRestartAdmin,
408+
Domain dom, boolean explicitRestartAdmin,
405409
List<String> explicitRestartServers, List<String> explicitRestartClusters) {
406410
LOGGER.entering();
407411

@@ -927,29 +931,14 @@ public ManagedServerDownStep(String serverName, ServerKubernetesObjects sko, Ste
927931

928932
@Override
929933
public NextAction apply(Packet packet) {
930-
V1ObjectMeta meta = sko.getPod().getMetadata();
931-
V1DeleteOptions deleteOptions = new V1DeleteOptions();
932934
List<V1Service> services = new ArrayList<V1Service>();
933935
services.add(sko.getService());
934936
services.addAll(sko.getChannels().values());
935937

936938
return doNext(IngressHelper.createRemoveServerStep(serverName, sko.getService(),
937-
new DeleteServiceListStep(services, CallBuilder.create().deletePodAsync(meta.getName(), meta.getNamespace(), deleteOptions, new ResponseStep<V1Status>(next) {
938-
@Override
939-
public NextAction onFailure(Packet packet, ApiException e, int statusCode,
940-
Map<String, List<String>> responseHeaders) {
941-
if (statusCode == CallBuilder.NOT_FOUND) {
942-
return onSuccess(packet, null, statusCode, responseHeaders);
943-
}
944-
return super.onFailure(packet, e, statusCode, responseHeaders);
945-
}
946-
947-
@Override
948-
public NextAction onSuccess(Packet packet, V1Status result, int statusCode,
949-
Map<String, List<String>> responseHeaders) {
950-
return doNext(new ManagedServerDownFinalizeStep(serverName, next), packet);
951-
}
952-
}))), packet);
939+
new DeleteServiceListStep(services,
940+
PodHelper.deletePodStep(sko,
941+
new ManagedServerDownFinalizeStep(serverName, next)))), packet);
953942
}
954943
}
955944

@@ -1256,22 +1245,109 @@ public static boolean getStopping() {
12561245
return stopping.get();
12571246
}
12581247

1259-
private static DomainWatcher createDomainWatcher(String principal, String namespace, String initialResourceVersion) {
1260-
return DomainWatcher.create(namespace, initialResourceVersion, (item) -> { dispatchDomainWatch(item, principal); }, stopping);
1248+
private static DomainWatcher createDomainWatcher(String namespace, String initialResourceVersion) {
1249+
return DomainWatcher.create(namespace, initialResourceVersion, Main::dispatchDomainWatch, stopping);
12611250
}
12621251

12631252
private static PodWatcher createPodWatcher(String namespace) {
1264-
return PodWatcher.create(namespace, "", stopping);
1253+
return PodWatcher.create(namespace, "", Main::dispatchPodWatch, stopping);
12651254
}
12661255

1256+
private static void dispatchPodWatch(Watch.Response<V1Pod> item) {
1257+
switch (item.type) {
1258+
case "DELETED":
1259+
V1Pod p = item.object;
1260+
V1ObjectMeta metadata = p.getMetadata();
1261+
String domainUID = metadata.getLabels().get(LabelConstants.DOMAINUID_LABEL);
1262+
String serverName = metadata.getLabels().get(LabelConstants.SERVERNAME_LABEL);
1263+
if (domainUID != null) {
1264+
DomainPresenceInfo info = domains.get(domainUID);
1265+
if (info != null && serverName != null) {
1266+
ServerKubernetesObjects sko = info.getServers().get(serverName);
1267+
if (sko != null) {
1268+
if (sko.getPod() != null) {
1269+
// Pod was deleted, but sko still contains a non-null entry
1270+
LOGGER.info(MessageKeys.POD_DELETED, domainUID, metadata.getNamespace(), serverName);
1271+
doCheckAndCreateDomainPresence(info.getDomain());
1272+
}
1273+
}
1274+
}
1275+
}
1276+
break;
1277+
1278+
case "ERROR":
1279+
default:
1280+
}
1281+
}
1282+
1283+
1284+
private static ServiceWatcher createServiceWatcher(String namespace) {
1285+
return ServiceWatcher.create(namespace, "", Main::dispatchServiceWatch, stopping);
1286+
}
1287+
1288+
private static void dispatchServiceWatch(Watch.Response<V1Service> item) {
1289+
switch (item.type) {
1290+
case "DELETED":
1291+
V1Service s = item.object;
1292+
V1ObjectMeta metadata = s.getMetadata();
1293+
String domainUID = metadata.getLabels().get(LabelConstants.DOMAINUID_LABEL);
1294+
String serverName = metadata.getLabels().get(LabelConstants.SERVERNAME_LABEL);
1295+
String channelName = metadata.getLabels().get(LabelConstants.CHANNELNAME_LABEL);
1296+
if (domainUID != null) {
1297+
DomainPresenceInfo info = domains.get(domainUID);
1298+
if (info != null && serverName != null) {
1299+
ServerKubernetesObjects sko = info.getServers().get(serverName);
1300+
if (sko != null) {
1301+
if ((channelName != null ? sko.getChannels().get(channelName) : sko.getService()) != null) {
1302+
// Service was deleted, but sko still contains a non-null entry
1303+
LOGGER.info(MessageKeys.SERVICE_DELETED, domainUID, metadata.getNamespace(), serverName);
1304+
doCheckAndCreateDomainPresence(info.getDomain());
1305+
}
1306+
}
1307+
}
1308+
}
1309+
break;
1310+
1311+
case "ERROR":
1312+
default:
1313+
}
1314+
}
1315+
1316+
private static IngressWatcher createIngressWatcher(String namespace) {
1317+
return IngressWatcher.create(namespace, "", Main::dispatchIngressWatch, stopping);
1318+
}
1319+
1320+
private static void dispatchIngressWatch(Watch.Response<V1beta1Ingress> item) {
1321+
switch (item.type) {
1322+
case "DELETED":
1323+
V1beta1Ingress i = item.object;
1324+
V1ObjectMeta metadata = i.getMetadata();
1325+
String domainUID = metadata.getLabels().get(LabelConstants.DOMAINUID_LABEL);
1326+
String clusterName = metadata.getLabels().get(LabelConstants.CLUSTERNAME_LABEL);
1327+
if (domainUID != null) {
1328+
DomainPresenceInfo info = domains.get(domainUID);
1329+
if (info != null && clusterName != null) {
1330+
if (clusterName != null && info.getIngresses().get(clusterName) != null) {
1331+
// Ingress was deleted, but sko still contains a non-null entry
1332+
LOGGER.info(MessageKeys.INGRESS_DELETED, domainUID, metadata.getNamespace(), clusterName);
1333+
doCheckAndCreateDomainPresence(info.getDomain());
1334+
}
1335+
}
1336+
}
1337+
break;
1338+
1339+
case "ERROR":
1340+
default:
1341+
}
1342+
}
1343+
12671344
/**
12681345
* Dispatch the Domain event to the appropriate handler.
12691346
*
12701347
* @param item An item received from a Watch response.
12711348
* @param principal The name of the principal that will be used in this watch.
12721349
*/
1273-
public static void dispatchDomainWatch(Watch.Response<Domain> item, String principal) {
1274-
1350+
private static void dispatchDomainWatch(Watch.Response<Domain> item) {
12751351
try {
12761352
Domain d;
12771353
String domainUID;
@@ -1281,7 +1357,7 @@ public static void dispatchDomainWatch(Watch.Response<Domain> item, String princ
12811357
d = item.object;
12821358
domainUID = d.getSpec().getDomainUID();
12831359
LOGGER.info(MessageKeys.WATCH_DOMAIN, domainUID);
1284-
doCheckAndCreateDomainPresence(principal, d);
1360+
doCheckAndCreateDomainPresence(d);
12851361
break;
12861362

12871363
case "DELETED":

0 commit comments

Comments
 (0)