@@ -83,10 +83,13 @@ public class Main {
83
83
private static final ConcurrentMap <String , Fiber > domainUpdaters = new ConcurrentHashMap <String , Fiber >();
84
84
private static final ConcurrentMap <String , DomainPresenceInfo > domains = new ConcurrentHashMap <String , DomainPresenceInfo >();
85
85
private static final AtomicBoolean stopping = new AtomicBoolean (false );
86
+ private static String principal ;
86
87
private static RestServer restServer = null ;
87
88
private static Thread livenessThread = null ;
88
89
private static Map <String , DomainWatcher > domainWatchers = new HashMap <>();
89
90
private static Map <String , PodWatcher > podWatchers = new HashMap <>();
91
+ private static Map <String , ServiceWatcher > serviceWatchers = new HashMap <>();
92
+ private static Map <String , IngressWatcher > ingressWatchers = new HashMap <>();
90
93
private static KubernetesVersion version = null ;
91
94
92
95
private static final Engine engine = new Engine ("operator" );
@@ -118,7 +121,7 @@ public static void main(String[] args) {
118
121
if (serviceAccountName == null ) {
119
122
serviceAccountName = "default" ;
120
123
}
121
- String principal = "system:serviceaccount:" + namespace + ":" + serviceAccountName ;
124
+ principal = "system:serviceaccount:" + namespace + ":" + serviceAccountName ;
122
125
123
126
LOGGER .info (MessageKeys .OP_CONFIG_NAMESPACE , namespace );
124
127
StringBuilder tns = new StringBuilder ();
@@ -163,8 +166,9 @@ public static void main(String[] args) {
163
166
// this would happen when the Domain was running BEFORE the Operator starts up
164
167
LOGGER .info (MessageKeys .LISTING_DOMAINS );
165
168
for (String ns : targetNamespaces ) {
166
- PodWatcher pw = createPodWatcher (ns );
167
- podWatchers .put (ns , pw );
169
+ podWatchers .put (ns , createPodWatcher (ns ));
170
+ serviceWatchers .put (ns , createServiceWatcher (ns ));
171
+ ingressWatchers .put (ns , createIngressWatcher (ns ));
168
172
169
173
Step domainList = CallBuilder .create ().listDomainAsync (ns , new ResponseStep <DomainList >(null ) {
170
174
@ Override
@@ -181,12 +185,12 @@ public NextAction onSuccess(Packet packet, DomainList result, int statusCode,
181
185
Map <String , List <String >> responseHeaders ) {
182
186
if (result != null ) {
183
187
for (Domain dom : result .getItems ()) {
184
- doCheckAndCreateDomainPresence (principal , dom );
188
+ doCheckAndCreateDomainPresence (dom );
185
189
}
186
190
}
187
191
188
192
// main logic now happens in the watch handlers
189
- domainWatchers .put (ns , createDomainWatcher (principal , ns , result != null ? result .getMetadata ().getResourceVersion () : "" ));
193
+ domainWatchers .put (ns , createDomainWatcher (ns , result != null ? result .getMetadata ().getResourceVersion () : "" ));
190
194
return doNext (packet );
191
195
}
192
196
});
@@ -360,7 +364,7 @@ public static void doRestartAdmin(String principal, String domainUID) {
360
364
if (info != null ) {
361
365
Domain dom = info .getDomain ();
362
366
if (dom != null ) {
363
- doCheckAndCreateDomainPresence (principal , dom , true , null , null );
367
+ doCheckAndCreateDomainPresence (dom , true , null , null );
364
368
}
365
369
}
366
370
}
@@ -377,7 +381,7 @@ public static void doRollingRestartServers(String principal, String domainUID, L
377
381
if (info != null ) {
378
382
Domain dom = info .getDomain ();
379
383
if (dom != null ) {
380
- doCheckAndCreateDomainPresence (principal , dom , false , servers , null );
384
+ doCheckAndCreateDomainPresence (dom , false , servers , null );
381
385
}
382
386
}
383
387
}
@@ -394,17 +398,17 @@ public static void doRollingRestartClusters(String principal, String domainUID,
394
398
if (info != null ) {
395
399
Domain dom = info .getDomain ();
396
400
if (dom != null ) {
397
- doCheckAndCreateDomainPresence (principal , dom , false , null , clusters );
401
+ doCheckAndCreateDomainPresence (dom , false , null , clusters );
398
402
}
399
403
}
400
404
}
401
405
402
- private static void doCheckAndCreateDomainPresence (String principal , Domain dom ) {
403
- doCheckAndCreateDomainPresence (principal , dom , false , null , null );
406
+ private static void doCheckAndCreateDomainPresence (Domain dom ) {
407
+ doCheckAndCreateDomainPresence (dom , false , null , null );
404
408
}
405
409
406
410
private static void doCheckAndCreateDomainPresence (
407
- String principal , Domain dom , boolean explicitRestartAdmin ,
411
+ Domain dom , boolean explicitRestartAdmin ,
408
412
List <String > explicitRestartServers , List <String > explicitRestartClusters ) {
409
413
LOGGER .entering ();
410
414
@@ -933,29 +937,14 @@ public ManagedServerDownStep(String serverName, ServerKubernetesObjects sko, Ste
933
937
934
938
@ Override
935
939
public NextAction apply (Packet packet ) {
936
- V1ObjectMeta meta = sko .getPod ().getMetadata ();
937
- V1DeleteOptions deleteOptions = new V1DeleteOptions ();
938
940
List <V1Service > services = new ArrayList <V1Service >();
939
941
services .add (sko .getService ());
940
942
services .addAll (sko .getChannels ().values ());
941
943
942
944
return doNext (IngressHelper .createRemoveServerStep (serverName , sko .getService (),
943
- new DeleteServiceListStep (services , CallBuilder .create ().deletePodAsync (meta .getName (), meta .getNamespace (), deleteOptions , new ResponseStep <V1Status >(next ) {
944
- @ Override
945
- public NextAction onFailure (Packet packet , ApiException e , int statusCode ,
946
- Map <String , List <String >> responseHeaders ) {
947
- if (statusCode == CallBuilder .NOT_FOUND ) {
948
- return onSuccess (packet , null , statusCode , responseHeaders );
949
- }
950
- return super .onFailure (packet , e , statusCode , responseHeaders );
951
- }
952
-
953
- @ Override
954
- public NextAction onSuccess (Packet packet , V1Status result , int statusCode ,
955
- Map <String , List <String >> responseHeaders ) {
956
- return doNext (new ManagedServerDownFinalizeStep (serverName , next ), packet );
957
- }
958
- }))), packet );
945
+ new DeleteServiceListStep (services ,
946
+ PodHelper .deletePodStep (sko ,
947
+ new ManagedServerDownFinalizeStep (serverName , next )))), packet );
959
948
}
960
949
}
961
950
@@ -1262,22 +1251,109 @@ public static boolean getStopping() {
1262
1251
return stopping .get ();
1263
1252
}
1264
1253
1265
- private static DomainWatcher createDomainWatcher (String principal , String namespace , String initialResourceVersion ) {
1266
- return DomainWatcher .create (namespace , initialResourceVersion , ( item ) -> { dispatchDomainWatch ( item , principal ); } , stopping );
1254
+ private static DomainWatcher createDomainWatcher (String namespace , String initialResourceVersion ) {
1255
+ return DomainWatcher .create (namespace , initialResourceVersion , Main :: dispatchDomainWatch , stopping );
1267
1256
}
1268
1257
1269
1258
private static PodWatcher createPodWatcher (String namespace ) {
1270
- return PodWatcher .create (namespace , "" , stopping );
1259
+ return PodWatcher .create (namespace , "" , Main :: dispatchPodWatch , stopping );
1271
1260
}
1272
1261
1262
+ private static void dispatchPodWatch (Watch .Response <V1Pod > item ) {
1263
+ switch (item .type ) {
1264
+ case "DELETED" :
1265
+ V1Pod p = item .object ;
1266
+ V1ObjectMeta metadata = p .getMetadata ();
1267
+ String domainUID = metadata .getLabels ().get (LabelConstants .DOMAINUID_LABEL );
1268
+ String serverName = metadata .getLabels ().get (LabelConstants .SERVERNAME_LABEL );
1269
+ if (domainUID != null ) {
1270
+ DomainPresenceInfo info = domains .get (domainUID );
1271
+ if (info != null && serverName != null ) {
1272
+ ServerKubernetesObjects sko = info .getServers ().get (serverName );
1273
+ if (sko != null ) {
1274
+ if (sko .getPod () != null ) {
1275
+ // Pod was deleted, but sko still contains a non-null entry
1276
+ LOGGER .info (MessageKeys .POD_DELETED , domainUID , metadata .getNamespace (), serverName );
1277
+ doCheckAndCreateDomainPresence (info .getDomain ());
1278
+ }
1279
+ }
1280
+ }
1281
+ }
1282
+ break ;
1283
+
1284
+ case "ERROR" :
1285
+ default :
1286
+ }
1287
+ }
1288
+
1289
+
1290
+ private static ServiceWatcher createServiceWatcher (String namespace ) {
1291
+ return ServiceWatcher .create (namespace , "" , Main ::dispatchServiceWatch , stopping );
1292
+ }
1293
+
1294
+ private static void dispatchServiceWatch (Watch .Response <V1Service > item ) {
1295
+ switch (item .type ) {
1296
+ case "DELETED" :
1297
+ V1Service s = item .object ;
1298
+ V1ObjectMeta metadata = s .getMetadata ();
1299
+ String domainUID = metadata .getLabels ().get (LabelConstants .DOMAINUID_LABEL );
1300
+ String serverName = metadata .getLabels ().get (LabelConstants .SERVERNAME_LABEL );
1301
+ String channelName = metadata .getLabels ().get (LabelConstants .CHANNELNAME_LABEL );
1302
+ if (domainUID != null ) {
1303
+ DomainPresenceInfo info = domains .get (domainUID );
1304
+ if (info != null && serverName != null ) {
1305
+ ServerKubernetesObjects sko = info .getServers ().get (serverName );
1306
+ if (sko != null ) {
1307
+ if ((channelName != null ? sko .getChannels ().get (channelName ) : sko .getService ()) != null ) {
1308
+ // Service was deleted, but sko still contains a non-null entry
1309
+ LOGGER .info (MessageKeys .SERVICE_DELETED , domainUID , metadata .getNamespace (), serverName );
1310
+ doCheckAndCreateDomainPresence (info .getDomain ());
1311
+ }
1312
+ }
1313
+ }
1314
+ }
1315
+ break ;
1316
+
1317
+ case "ERROR" :
1318
+ default :
1319
+ }
1320
+ }
1321
+
1322
+ private static IngressWatcher createIngressWatcher (String namespace ) {
1323
+ return IngressWatcher .create (namespace , "" , Main ::dispatchIngressWatch , stopping );
1324
+ }
1325
+
1326
+ private static void dispatchIngressWatch (Watch .Response <V1beta1Ingress > item ) {
1327
+ switch (item .type ) {
1328
+ case "DELETED" :
1329
+ V1beta1Ingress i = item .object ;
1330
+ V1ObjectMeta metadata = i .getMetadata ();
1331
+ String domainUID = metadata .getLabels ().get (LabelConstants .DOMAINUID_LABEL );
1332
+ String clusterName = metadata .getLabels ().get (LabelConstants .CLUSTERNAME_LABEL );
1333
+ if (domainUID != null ) {
1334
+ DomainPresenceInfo info = domains .get (domainUID );
1335
+ if (info != null && clusterName != null ) {
1336
+ if (clusterName != null && info .getIngresses ().get (clusterName ) != null ) {
1337
+ // Ingress was deleted, but sko still contains a non-null entry
1338
+ LOGGER .info (MessageKeys .INGRESS_DELETED , domainUID , metadata .getNamespace (), clusterName );
1339
+ doCheckAndCreateDomainPresence (info .getDomain ());
1340
+ }
1341
+ }
1342
+ }
1343
+ break ;
1344
+
1345
+ case "ERROR" :
1346
+ default :
1347
+ }
1348
+ }
1349
+
1273
1350
/**
1274
1351
* Dispatch the Domain event to the appropriate handler.
1275
1352
*
1276
1353
* @param item An item received from a Watch response.
1277
1354
* @param principal The name of the principal that will be used in this watch.
1278
1355
*/
1279
- public static void dispatchDomainWatch (Watch .Response <Domain > item , String principal ) {
1280
-
1356
+ private static void dispatchDomainWatch (Watch .Response <Domain > item ) {
1281
1357
try {
1282
1358
Domain d ;
1283
1359
String domainUID ;
@@ -1287,7 +1363,7 @@ public static void dispatchDomainWatch(Watch.Response<Domain> item, String princ
1287
1363
d = item .object ;
1288
1364
domainUID = d .getSpec ().getDomainUID ();
1289
1365
LOGGER .info (MessageKeys .WATCH_DOMAIN , domainUID );
1290
- doCheckAndCreateDomainPresence (principal , d );
1366
+ doCheckAndCreateDomainPresence (d );
1291
1367
break ;
1292
1368
1293
1369
case "DELETED" :
0 commit comments