1
- // Copyright (c) 2022, 2023 , Oracle and/or its affiliates.
1
+ // Copyright (c) 2022, 2024 , Oracle and/or its affiliates.
2
2
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3
3
4
4
package oracle .kubernetes .operator ;
24
24
import java .util .concurrent .ThreadFactory ;
25
25
import java .util .concurrent .TimeUnit ;
26
26
import java .util .concurrent .atomic .AtomicReference ;
27
- import javax .annotation .Nonnull ;
28
27
29
28
import oracle .kubernetes .common .logging .MessageKeys ;
30
- import oracle .kubernetes .operator .calls .UnrecoverableCallException ;
31
- import oracle .kubernetes .operator .helpers .ClientPool ;
32
29
import oracle .kubernetes .operator .helpers .HelmAccess ;
33
30
import oracle .kubernetes .operator .http .BaseServer ;
34
31
import oracle .kubernetes .operator .http .metrics .MetricsServer ;
38
35
import oracle .kubernetes .operator .logging .LoggingFactory ;
39
36
import oracle .kubernetes .operator .tuning .TuningParameters ;
40
37
import oracle .kubernetes .operator .utils .PathSupport ;
41
- import oracle .kubernetes .operator .work .Component ;
42
- import oracle .kubernetes .operator .work .Container ;
43
- import oracle .kubernetes .operator .work .ContainerResolver ;
44
- import oracle .kubernetes .operator .work .Engine ;
45
38
import oracle .kubernetes .operator .work .Fiber .CompletionCallback ;
46
39
import oracle .kubernetes .operator .work .Packet ;
47
40
import oracle .kubernetes .operator .work .Step ;
41
+ import oracle .kubernetes .operator .work .VirtualScheduledExecutorService ;
48
42
import oracle .kubernetes .utils .SystemClock ;
49
43
50
44
/** An abstract base main class for the operator and the webhook. */
@@ -56,10 +50,8 @@ public abstract class BaseMain {
56
50
static final String GIT_COMMIT_KEY = "git.commit.id.abbrev" ;
57
51
static final String GIT_BUILD_TIME_KEY = "git.build.time" ;
58
52
59
- static final Container container = new Container ();
60
- static final ThreadFactory threadFactory = new WrappedThreadFactory ();
61
- static ScheduledExecutorService wrappedExecutorService =
62
- Engine .wrappedExecutorService (container ); // non-final to allow change in unit tests
53
+ static final ThreadFactory threadFactory = Thread .ofVirtual ().factory ();
54
+ static final ScheduledExecutorService executor = new VirtualScheduledExecutorService ();
63
55
static final AtomicReference <OffsetDateTime > lastFullRecheck =
64
56
new AtomicReference <>(SystemClock .now ());
65
57
static final Semaphore shutdownSignal = new Semaphore (0 );
@@ -78,8 +70,6 @@ public abstract class BaseMain {
78
70
PrintStream nullOut = new PrintStream (output );
79
71
System .setErr (nullOut );
80
72
81
- ClientPool .initialize (threadFactory );
82
-
83
73
// Simplify debugging the operator by allowing the setting of the operator
84
74
// top-level directory using either an env variable or a property. In the normal,
85
75
// container-based use case these values won't be set and the operator will with the
@@ -96,7 +86,7 @@ public abstract class BaseMain {
96
86
}
97
87
probesHome = new File (probesHomeLoc );
98
88
99
- TuningParameters .initializeInstance (wrappedExecutorService , new File (deploymentHome , "config" ));
89
+ TuningParameters .initializeInstance (executor , new File (deploymentHome , "config" ));
100
90
} catch (IOException e ) {
101
91
LOGGER .warning (MessageKeys .EXCEPTION , e );
102
92
throw new RuntimeException (e );
@@ -165,19 +155,18 @@ void markReadyAndStartLivenessThread() {
165
155
166
156
logStartingLivenessMessage ();
167
157
// every five seconds we need to update the last modified time on the liveness file
168
- wrappedExecutorService .scheduleWithFixedDelay (
169
- new DeploymentLiveness (delegate ), 5 , 5 , TimeUnit .SECONDS );
158
+ delegate .scheduleWithFixedDelay (new DeploymentLiveness (delegate ), 5 , 5 , TimeUnit .SECONDS );
170
159
} catch (IOException io ) {
171
160
LOGGER .severe (MessageKeys .EXCEPTION , io );
172
161
}
173
162
}
174
163
175
- void startRestServer (Container container )
164
+ void startRestServer ()
176
165
throws UnrecoverableKeyException , CertificateException , IOException , NoSuchAlgorithmException ,
177
166
KeyStoreException , InvalidKeySpecException , KeyManagementException {
178
167
BaseRestServer value = createRestServer ();
179
168
restServer .set (value );
180
- value .start (container );
169
+ value .start ();
181
170
}
182
171
183
172
abstract BaseRestServer createRestServer ();
@@ -191,18 +180,17 @@ void stopRestServer() {
191
180
Optional .ofNullable (restServer .getAndSet (null )).ifPresent (BaseServer ::stop );
192
181
}
193
182
194
- @ SuppressWarnings ("SameParameterValue" )
195
- void startMetricsServer (Container container ) throws UnrecoverableKeyException , CertificateException , IOException ,
183
+ void startMetricsServer () throws UnrecoverableKeyException , CertificateException , IOException ,
196
184
NoSuchAlgorithmException , KeyStoreException , InvalidKeySpecException , KeyManagementException {
197
- startMetricsServer (container , delegate .getMetricsPort ());
185
+ startMetricsServer (delegate .getMetricsPort ());
198
186
}
199
187
200
188
// for test
201
- void startMetricsServer (Container container , int port ) throws UnrecoverableKeyException , CertificateException ,
189
+ void startMetricsServer (int port ) throws UnrecoverableKeyException , CertificateException ,
202
190
IOException , NoSuchAlgorithmException , KeyStoreException , InvalidKeySpecException , KeyManagementException {
203
191
BaseServer value = new MetricsServer (port );
204
192
metricsServer .set (value );
205
- value .start (container );
193
+ value .start ();
206
194
}
207
195
208
196
// for test
@@ -253,7 +241,7 @@ void waitForDeath() {
253
241
}
254
242
255
243
void scheduleCheckForShutdownMarker () {
256
- wrappedExecutorService .scheduleWithFixedDelay (
244
+ delegate .scheduleWithFixedDelay (
257
245
() -> {
258
246
File marker = new File (delegate .getDeploymentHome (), CoreDelegate .SHUTDOWN_MARKER_NAME );
259
247
if (isFileExists (marker )) {
@@ -268,25 +256,10 @@ private static boolean isFileExists(File file) {
268
256
269
257
static Packet createPacketWithLoggingContext (String ns ) {
270
258
Packet packet = new Packet ();
271
- packet .getComponents ().put (
272
- LoggingContext .LOGGING_CONTEXT_KEY ,
273
- Component .createFor (new LoggingContext ().namespace (ns )));
259
+ packet .put (LoggingContext .LOGGING_CONTEXT_KEY , new LoggingContext ().namespace (ns ));
274
260
return packet ;
275
261
}
276
262
277
- private static class WrappedThreadFactory implements ThreadFactory {
278
- private final ThreadFactory delegate = Thread .ofVirtual ().factory ();
279
-
280
- @ Override
281
- public Thread newThread (@ Nonnull Runnable r ) {
282
- return delegate .newThread (
283
- () -> {
284
- ContainerResolver .getDefault ().enterContainer (container );
285
- r .run ();
286
- });
287
- }
288
- }
289
-
290
263
static class NullCompletionCallback implements CompletionCallback {
291
264
private final Runnable completionAction ;
292
265
@@ -303,11 +276,7 @@ public void onCompletion(Packet packet) {
303
276
304
277
@ Override
305
278
public void onThrowable (Packet packet , Throwable throwable ) {
306
- if (throwable instanceof UnrecoverableCallException uce ) {
307
- uce .log ();
308
- } else {
309
- LOGGER .severe (MessageKeys .EXCEPTION , throwable );
310
- }
279
+ LOGGER .severe (MessageKeys .EXCEPTION , throwable );
311
280
}
312
281
}
313
282
}
0 commit comments