Skip to content

Commit 87a95b2

Browse files
committed
Move job watcher cache to JobWatcher
1 parent 4f57092 commit 87a95b2

File tree

10 files changed

+171
-124
lines changed

10 files changed

+171
-124
lines changed

operator/src/main/java/oracle/kubernetes/operator/DomainProcessor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@
1414

1515
public interface DomainProcessor {
1616

17-
public static DomainProcessor getInstance() {
18-
return DomainProcessorImpl.INSTANCE;
19-
}
20-
2117
public void makeRightDomainPresence(
2218
DomainPresenceInfo info,
2319
boolean explicitRecheck,

operator/src/main/java/oracle/kubernetes/operator/DomainProcessorImpl.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import oracle.kubernetes.weblogic.domain.model.DomainSpec;
5858

5959
public class DomainProcessorImpl implements DomainProcessor {
60-
static final DomainProcessor INSTANCE = new DomainProcessorImpl();
6160

6261
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
6362

@@ -808,12 +807,7 @@ private static Step[] domainIntrospectionSteps(DomainPresenceInfo info, Step nex
808807
resources.add(
809808
JobHelper.deleteDomainIntrospectorJobStep(
810809
dom.getDomainUID(), dom.getMetadata().getNamespace(), null));
811-
resources.add(
812-
JobHelper.createDomainIntrospectorJobStep(
813-
Main.tuningAndConfig.getWatchTuning(),
814-
next,
815-
jws,
816-
Main.isNamespaceStopping(dom.getMetadata().getNamespace())));
810+
resources.add(JobHelper.createDomainIntrospectorJobStep(next));
817811
return resources.toArray(new Step[0]);
818812
}
819813

operator/src/main/java/oracle/kubernetes/operator/JobWatcher.java

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010
import io.kubernetes.client.models.V1JobStatus;
1111
import io.kubernetes.client.models.V1ObjectMeta;
1212
import io.kubernetes.client.util.Watch;
13+
import java.util.HashMap;
1314
import java.util.List;
1415
import java.util.Map;
1516
import java.util.concurrent.ConcurrentHashMap;
1617
import java.util.concurrent.ConcurrentMap;
1718
import java.util.concurrent.ThreadFactory;
1819
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.function.Function;
1921
import oracle.kubernetes.operator.TuningParameters.WatchTuning;
2022
import oracle.kubernetes.operator.builders.WatchBuilder;
2123
import oracle.kubernetes.operator.builders.WatchI;
@@ -28,19 +30,36 @@
2830
import oracle.kubernetes.operator.work.NextAction;
2931
import oracle.kubernetes.operator.work.Packet;
3032
import oracle.kubernetes.operator.work.Step;
33+
import oracle.kubernetes.weblogic.domain.model.Domain;
3134

3235
/** Watches for Jobs to become Ready or leave Ready state. */
3336
public class JobWatcher extends Watcher<V1Job> implements WatchListener<V1Job> {
3437
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
38+
private static final Map<String, JobWatcher> JOB_WATCHERS = new HashMap<>();
39+
private static JobWatcherFactory factory;
3540

36-
private final String ns;
41+
private final String namespace;
3742

3843
// Map of Pod name to Complete
3944
private final ConcurrentMap<String, Complete> completeCallbackRegistrations =
4045
new ConcurrentHashMap<>();
4146

4247
/**
43-
* Factory for JobWatcher.
48+
* If a JobWatcher has already been created, returns it.
49+
*
50+
* @param domain the domain for which the job watcher is to be returned
51+
* @return a cached jobwatcher, or null.
52+
*/
53+
public static JobWatcher getOrCreateFor(Domain domain) {
54+
return JOB_WATCHERS.computeIfAbsent(getNamespace(domain), n -> factory.createFor(domain));
55+
}
56+
57+
static String getNamespace(Domain domain) {
58+
return domain.getMetadata().getNamespace();
59+
}
60+
61+
/**
62+
* Creates a new JobWatcher and caches it by namespace.
4463
*
4564
* @param factory thread factory
4665
* @param ns Namespace
@@ -61,17 +80,27 @@ public static JobWatcher create(
6180
}
6281

6382
private JobWatcher(
64-
String ns, String initialResourceVersion, WatchTuning tuning, AtomicBoolean isStopping) {
83+
String namespace,
84+
String initialResourceVersion,
85+
WatchTuning tuning,
86+
AtomicBoolean isStopping) {
6587
super(initialResourceVersion, tuning, isStopping);
6688
setListener(this);
67-
this.ns = ns;
89+
this.namespace = namespace;
90+
}
91+
92+
static void defineFactory(
93+
ThreadFactory threadFactory,
94+
WatchTuning tuning,
95+
Function<String, AtomicBoolean> isNamespaceStopping) {
96+
factory = new JobWatcherFactory(threadFactory, tuning, isNamespaceStopping);
6897
}
6998

7099
@Override
71100
public WatchI<V1Job> initiateWatch(WatchBuilder watchBuilder) throws ApiException {
72101
return watchBuilder
73102
.withLabelSelectors(LabelConstants.DOMAINUID_LABEL, LabelConstants.CREATEDBYOPERATOR_LABEL)
74-
.createJobWatch(ns);
103+
.createJobWatch(namespace);
75104
}
76105

77106
public void receivedResponse(Watch.Response<V1Job> item) {
@@ -222,6 +251,32 @@ public NextAction onSuccess(
222251
}
223252
}
224253

254+
static class JobWatcherFactory {
255+
private ThreadFactory threadFactory;
256+
private WatchTuning watchTuning;
257+
258+
private Function<String, AtomicBoolean> isNamespaceStopping;
259+
260+
JobWatcherFactory(
261+
ThreadFactory threadFactory,
262+
WatchTuning watchTuning,
263+
Function<String, AtomicBoolean> isNamespaceStopping) {
264+
this.threadFactory = threadFactory;
265+
this.watchTuning = watchTuning;
266+
this.isNamespaceStopping = isNamespaceStopping;
267+
}
268+
269+
JobWatcher createFor(Domain domain) {
270+
String namespace = getNamespace(domain);
271+
return create(
272+
threadFactory,
273+
namespace,
274+
domain.getMetadata().getResourceVersion(),
275+
watchTuning,
276+
isNamespaceStopping.apply(namespace));
277+
}
278+
}
279+
225280
@FunctionalInterface
226281
private interface Complete {
227282
void isComplete(V1Job job);

operator/src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public Thread newThread(Runnable r) {
127127
}
128128

129129
static final Engine engine = new Engine(wrappedExecutorService);
130-
private static final DomainProcessor processor = DomainProcessor.getInstance();
130+
private static final DomainProcessor processor = new DomainProcessorImpl();
131131

132132
static final ConcurrentMap<String, AtomicBoolean> isNamespaceStarted = new ConcurrentHashMap<>();
133133
static final ConcurrentMap<String, AtomicBoolean> isNamespaceStopping = new ConcurrentHashMap<>();
@@ -196,6 +196,8 @@ private static void begin() {
196196
principal = "system:serviceaccount:" + operatorNamespace + ":" + serviceAccountName;
197197

198198
LOGGER.info(MessageKeys.OP_CONFIG_NAMESPACE, operatorNamespace);
199+
JobWatcher.defineFactory(
200+
threadFactory, tuningAndConfig.getWatchTuning(), Main::isNamespaceStopping);
199201

200202
Collection<String> targetNamespaces = getTargetNamespaces();
201203
StringBuilder tns = new StringBuilder();
@@ -530,6 +532,15 @@ private static DomainWatcher createDomainWatcher(String ns, String initialResour
530532
isNamespaceStopping(ns));
531533
}
532534

535+
private static void createJobWatcher(String ns, String initialResourceVersion) {
536+
JobWatcher.create(
537+
threadFactory,
538+
ns,
539+
initialResourceVersion,
540+
tuningAndConfig.getWatchTuning(),
541+
isNamespaceStopping(ns));
542+
}
543+
533544
static String getOperatorNamespace() {
534545
String namespace = System.getenv("OPERATOR_NAMESPACE");
535546
if (namespace == null) {

operator/src/main/java/oracle/kubernetes/operator/helpers/JobHelper.java

Lines changed: 26 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,10 @@
1212
import io.kubernetes.client.models.V1Volume;
1313
import io.kubernetes.client.models.V1VolumeMount;
1414
import java.util.List;
15-
import java.util.Map;
16-
import java.util.concurrent.atomic.AtomicBoolean;
17-
import javax.validation.constraints.NotNull;
1815
import oracle.kubernetes.operator.JobWatcher;
1916
import oracle.kubernetes.operator.LabelConstants;
2017
import oracle.kubernetes.operator.ProcessingConstants;
2118
import oracle.kubernetes.operator.TuningParameters;
22-
import oracle.kubernetes.operator.TuningParameters.WatchTuning;
2319
import oracle.kubernetes.operator.calls.CallResponse;
2420
import oracle.kubernetes.operator.logging.LoggingFacade;
2521
import oracle.kubernetes.operator.logging.LoggingFactory;
@@ -115,32 +111,18 @@ List<V1EnvVar> getConfiguredEnvVars(TuningParameters tuningParameters) {
115111
/**
116112
* Factory for {@link Step} that creates WebLogic domain introspector job.
117113
*
118-
* @param tuning Watch tuning parameters
119114
* @param next Next processing step
120-
* @param jws Map of JobWatcher objects, keyed by the string value of the name of a namespace
121-
* @param isStopping Stop signal
122115
* @return Step for creating job
123116
*/
124-
public static Step createDomainIntrospectorJobStep(
125-
WatchTuning tuning,
126-
Step next,
127-
@NotNull Map<String, JobWatcher> jws,
128-
@NotNull AtomicBoolean isStopping) {
117+
public static Step createDomainIntrospectorJobStep(Step next) {
129118

130-
return new DomainIntrospectorJobStep(tuning, next, jws, isStopping);
119+
return new DomainIntrospectorJobStep(next);
131120
}
132121

133122
static class DomainIntrospectorJobStep extends Step {
134-
private final WatchTuning tuning;
135-
private final Map<String, JobWatcher> jws;
136-
private final AtomicBoolean isStopping;
137123

138-
DomainIntrospectorJobStep(
139-
WatchTuning tuning, Step next, Map<String, JobWatcher> jws, AtomicBoolean isStopping) {
124+
DomainIntrospectorJobStep(Step next) {
140125
super(next);
141-
this.tuning = tuning;
142-
this.jws = jws;
143-
this.isStopping = isStopping;
144126
}
145127

146128
@Override
@@ -149,12 +131,12 @@ public NextAction apply(Packet packet) {
149131
if (runIntrospector(packet, info)) {
150132
JobStepContext context = new DomainIntrospectorJobStepContext(info, packet);
151133

152-
packet.putIfAbsent(START_TIME, Long.valueOf(System.currentTimeMillis()));
134+
packet.putIfAbsent(START_TIME, System.currentTimeMillis());
153135

154136
return doNext(
155137
context.createNewJob(
156138
readDomainIntrospectorPodLogStep(
157-
tuning, ConfigMapHelper.createSitConfigMapStep(getNext()), jws, isStopping)),
139+
ConfigMapHelper.createSitConfigMapStep(getNext()))),
158140
packet);
159141
}
160142

@@ -167,10 +149,7 @@ private static boolean runIntrospector(Packet packet, DomainPresenceInfo info) {
167149
LOGGER.fine("runIntrospector topology: " + config);
168150
LOGGER.fine("runningServersCount: " + runningServersCount(info));
169151
LOGGER.fine("creatingServers: " + creatingServers(info));
170-
if (config == null || (runningServersCount(info) == 0 && creatingServers(info))) {
171-
return true;
172-
}
173-
return false;
152+
return config == null || (runningServersCount(info) == 0 && creatingServers(info));
174153
}
175154

176155
private static int runningServersCount(DomainPresenceInfo info) {
@@ -180,7 +159,7 @@ private static int runningServersCount(DomainPresenceInfo info) {
180159
/**
181160
* TODO: Enhance determination of when we believe we're creating WLS managed server pods.
182161
*
183-
* @param info
162+
* @param info the domain presence info
184163
* @return True, if creating servers
185164
*/
186165
static boolean creatingServers(DomainPresenceInfo info) {
@@ -262,43 +241,35 @@ String getJobDeletedMessageKey() {
262241
return MessageKeys.JOB_DELETED;
263242
}
264243

265-
protected void logJobDeleted(String domainUID, String namespace, String jobName) {
244+
void logJobDeleted(String domainUID, String namespace, String jobName) {
266245
LOGGER.info(getJobDeletedMessageKey(), domainUID, namespace, jobName);
267246
}
268247

269248
private Step deleteJob(Step next) {
270249
String jobName = JobHelper.createJobName(this.domainUID);
271250
logJobDeleted(this.domainUID, namespace, jobName);
272-
Step step =
273-
new CallBuilder()
274-
.deleteJobAsync(
275-
jobName,
276-
this.namespace,
277-
new V1DeleteOptions().propagationPolicy("Foreground"),
278-
new DefaultResponseStep<>(next));
279-
return step;
251+
return new CallBuilder()
252+
.deleteJobAsync(
253+
jobName,
254+
this.namespace,
255+
new V1DeleteOptions().propagationPolicy("Foreground"),
256+
new DefaultResponseStep<>(next));
280257
}
281258
}
282259

283-
private static Step createWatchDomainIntrospectorJobReadyStep(
284-
WatchTuning tuning, Step next, Map<String, JobWatcher> jws, AtomicBoolean isStopping) {
285-
return new WatchDomainIntrospectorJobReadyStep(tuning, next, jws, isStopping);
260+
private static Step createWatchDomainIntrospectorJobReadyStep(Step next) {
261+
return new WatchDomainIntrospectorJobReadyStep(next);
286262
}
287263

288264
/**
289265
* Factory for {@link Step} that reads WebLogic domain introspector job results from pod's log.
290266
*
291-
* @param tuning Watch tuning parameters
292267
* @param next Next processing step
293268
* @return Step for reading WebLogic domain introspector pod log
294269
*/
295-
static Step readDomainIntrospectorPodLogStep(
296-
WatchTuning tuning, Step next, Map<String, JobWatcher> jws, AtomicBoolean isStopping) {
270+
private static Step readDomainIntrospectorPodLogStep(Step next) {
297271
return createWatchDomainIntrospectorJobReadyStep(
298-
tuning,
299-
readDomainIntrospectorPodStep(new ReadDomainIntrospectorPodLogStep(next)),
300-
jws,
301-
isStopping);
272+
readDomainIntrospectorPodStep(new ReadDomainIntrospectorPodLogStep(next)));
302273
}
303274

304275
private static class ReadDomainIntrospectorPodLogStep extends Step {
@@ -318,16 +289,14 @@ public NextAction apply(Packet packet) {
318289
}
319290

320291
private Step readDomainIntrospectorPodLog(String jobPodName, String namespace, Step next) {
321-
Step step =
322-
new CallBuilder()
323-
.readPodLogAsync(
324-
jobPodName, namespace, new ReadDomainIntrospectorPodLogResponseStep(next));
325-
return step;
292+
return new CallBuilder()
293+
.readPodLogAsync(
294+
jobPodName, namespace, new ReadDomainIntrospectorPodLogResponseStep(next));
326295
}
327296
}
328297

329298
private static class ReadDomainIntrospectorPodLogResponseStep extends ResponseStep<String> {
330-
public ReadDomainIntrospectorPodLogResponseStep(Step nextStep) {
299+
ReadDomainIntrospectorPodLogResponseStep(Step nextStep) {
331300
super(nextStep);
332301
}
333302

@@ -376,7 +345,7 @@ private void cleanupJobArtifacts(Packet packet) {
376345
* @param next Next processing step
377346
* @return Step for reading WebLogic domain introspector pod
378347
*/
379-
public static Step readDomainIntrospectorPodStep(Step next) {
348+
private static Step readDomainIntrospectorPodStep(Step next) {
380349
return new ReadDomainIntrospectorPodStep(next);
381350
}
382351

@@ -396,24 +365,18 @@ public NextAction apply(Packet packet) {
396365
}
397366

398367
private Step readDomainIntrospectorPod(String domainUID, String namespace, Step next) {
399-
400-
Step step =
401-
new CallBuilder()
402-
.withLabelSelectors(LabelConstants.JOBNAME_LABEL)
403-
.listPodAsync(namespace, new PodListStep(domainUID, namespace, next));
404-
405-
return step;
368+
return new CallBuilder()
369+
.withLabelSelectors(LabelConstants.JOBNAME_LABEL)
370+
.listPodAsync(namespace, new PodListStep(domainUID, namespace, next));
406371
}
407372
}
408373

409374
private static class PodListStep extends ResponseStep<V1PodList> {
410-
private final String ns;
411375
private final String domainUID;
412376

413377
PodListStep(String domainUID, String ns, Step next) {
414378
super(next);
415379
this.domainUID = domainUID;
416-
this.ns = ns;
417380
}
418381

419382
@Override

0 commit comments

Comments
 (0)