File tree Expand file tree Collapse file tree 11 files changed +23
-25
lines changed
docs/layouts/shortcodes/generated
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation
flink-kubernetes-operator/src
java/org/apache/flink/kubernetes/operator
test/java/org/apache/flink/kubernetes/operator Expand file tree Collapse file tree 11 files changed +23
-25
lines changed Original file line number Diff line number Diff line change 304
304
</ tr >
305
305
< tr >
306
306
< td > < h5 > kubernetes.operator.reconcile.parallelism</ h5 > </ td >
307
- < td style ="word-wrap: break-word; "> 200 </ td >
307
+ < td style ="word-wrap: break-word; "> 50 </ td >
308
308
< td > Integer</ td >
309
309
< td > The maximum number of threads running the reconciliation loop. Use -1 for infinite.</ td >
310
310
</ tr >
Original file line number Diff line number Diff line change 100
100
</ tr >
101
101
< tr >
102
102
< td > < h5 > kubernetes.operator.reconcile.parallelism</ h5 > </ td >
103
- < td style ="word-wrap: break-word; "> 200 </ td >
103
+ < td style ="word-wrap: break-word; "> 50 </ td >
104
104
< td > Integer</ td >
105
105
< td > The maximum number of threads running the reconciliation loop. Use -1 for infinite.</ td >
106
106
</ tr >
Original file line number Diff line number Diff line change @@ -93,7 +93,11 @@ protected static void checkObjectCompatibility(
93
93
// This field was removed from Kubernetes ObjectMeta v1 in 1.25 as it was unused
94
94
// for a long time. If set for any reason (very unlikely as it does nothing),
95
95
// the property will be dropped / ignored by the api server.
96
- if (!fieldPath .endsWith (".metadata.clusterName" )) {
96
+ if (!fieldPath .endsWith (".metadata.clusterName" )
97
+ // This claims field was removed in Kubernetes 1.28 as it was mistakenly
98
+ // added in the first place. For more context please refer to
99
+ // https://github.com/kubernetes/api/commit/8b14183
100
+ && !fieldPath .contains (".volumeClaimTemplate.spec.resources.claims" )) {
97
101
err (fieldPath + " has been removed" );
98
102
}
99
103
} else {
Original file line number Diff line number Diff line change @@ -137,7 +137,7 @@ private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
137
137
overrider .withExecutorService (Executors .newCachedThreadPool ());
138
138
} else {
139
139
LOG .info ("Configuring operator with {} reconciliation threads." , parallelism );
140
- overrider .withExecutorService ( Executors . newFixedThreadPool ( parallelism ) );
140
+ overrider .withConcurrentReconciliationThreads ( parallelism );
141
141
}
142
142
143
143
if (operatorConf .isJosdkMetricsEnabled ()) {
Original file line number Diff line number Diff line change @@ -266,8 +266,7 @@ private static LeaderElectionConfiguration getLeaderElectionConfig(Configuration
266
266
null ,
267
267
conf .get (KubernetesOperatorConfigOptions .OPERATOR_LEADER_ELECTION_LEASE_DURATION ),
268
268
conf .get (KubernetesOperatorConfigOptions .OPERATOR_LEADER_ELECTION_RENEW_DEADLINE ),
269
- conf .get (KubernetesOperatorConfigOptions .OPERATOR_LEADER_ELECTION_RETRY_PERIOD ),
270
- null );
269
+ conf .get (KubernetesOperatorConfigOptions .OPERATOR_LEADER_ELECTION_RETRY_PERIOD ));
271
270
}
272
271
273
272
private static Optional <String > getEnv (String key ) {
Original file line number Diff line number Diff line change @@ -205,9 +205,9 @@ private KubernetesResourceNamespaceMetricGroup getResourceNsMg(
205
205
206
206
Class <? extends AbstractFlinkResource <?, ?>> resourceClass ;
207
207
208
- if (resourceGvk .kind .equals (FlinkDeployment .class .getSimpleName ())) {
208
+ if (resourceGvk .getKind () .equals (FlinkDeployment .class .getSimpleName ())) {
209
209
resourceClass = FlinkDeployment .class ;
210
- } else if (resourceGvk .kind .equals (FlinkSessionJob .class .getSimpleName ())) {
210
+ } else if (resourceGvk .getKind () .equals (FlinkSessionJob .class .getSimpleName ())) {
211
211
resourceClass = FlinkSessionJob .class ;
212
212
} else {
213
213
return Optional .empty ();
Original file line number Diff line number Diff line change @@ -47,10 +47,10 @@ This project bundles the following dependencies under the Apache Software Licens
47
47
- io.fabric8:kubernetes-model-scheduling:jar:6.8.1
48
48
- io.fabric8:kubernetes-model-storageclass:jar:6.8.1
49
49
- io.fabric8:zjsonpatch:jar:0.3.0
50
- - io.javaoperatorsdk:operator-framework-core:jar:4.4.4
51
- - io.javaoperatorsdk:operator-framework:jar:4.4.4
50
+ - io.javaoperatorsdk:operator-framework-core:jar:4.8.3
51
+ - io.javaoperatorsdk:operator-framework:jar:4.8.3
52
52
- org.apache.commons:commons-compress:1.21
53
- - org.apache.commons:commons-lang3:3.13 .0
53
+ - org.apache.commons:commons-lang3:3.14 .0
54
54
- org.apache.commons:commons-math3:3.6.1
55
55
- org.apache.commons:commons-text:jar:1.10.0
56
56
- org.apache.logging.log4j:log4j-1.2-api:2.17.1
Original file line number Diff line number Diff line change @@ -70,21 +70,10 @@ public void testConfigurationPassedToJOSDK() {
70
70
71
71
var configService = testOperator .getOperator ().getConfigurationService ();
72
72
73
- // Test parallelism being passed expectedly
73
+ // Test parallelism being passed
74
74
var executorService = configService .getExecutorService ();
75
75
Assertions .assertInstanceOf (ThreadPoolExecutor .class , executorService );
76
76
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor ) executorService ;
77
- for (int i = 0 ; i < testParallelism * 2 ; i ++) {
78
- threadPoolExecutor .execute (
79
- () -> {
80
- try {
81
- Thread .sleep (1000 );
82
- } catch (InterruptedException e ) {
83
- e .printStackTrace ();
84
- }
85
- });
86
- }
87
- Assertions .assertEquals (threadPoolExecutor .getPoolSize (), testParallelism );
88
77
Assertions .assertEquals (threadPoolExecutor .getMaximumPoolSize (), testParallelism );
89
78
90
79
// Test label selector being passed
Original file line number Diff line number Diff line change 51
51
import io .fabric8 .mockwebserver .utils .ResponseProvider ;
52
52
import io .javaoperatorsdk .operator .api .config .ControllerConfiguration ;
53
53
import io .javaoperatorsdk .operator .api .reconciler .Context ;
54
+ import io .javaoperatorsdk .operator .api .reconciler .IndexedResourceCache ;
54
55
import io .javaoperatorsdk .operator .api .reconciler .ResourceDiscriminator ;
55
56
import io .javaoperatorsdk .operator .api .reconciler .RetryInfo ;
56
57
import io .javaoperatorsdk .operator .api .reconciler .dependent .managed .ManagedDependentResourceContext ;
@@ -497,5 +498,10 @@ public KubernetesClient getClient() {
497
498
public ExecutorService getWorkflowExecutorService () {
498
499
throw new UnsupportedOperationException ("Not implemented" );
499
500
}
501
+
502
+ @ Override
503
+ public IndexedResourceCache <T > getPrimaryCache () {
504
+ return null ;
505
+ }
500
506
}
501
507
}
Original file line number Diff line number Diff line change @@ -1013,7 +1013,7 @@ public void testBlockingDeploymentDeletion() {
1013
1013
namespace , deploymentName );
1014
1014
String watchUrl =
1015
1015
String .format (
1016
- "/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&allowWatchBookmarks=true &watch=true" ,
1016
+ "/apis/apps/v1/namespaces/%s/deployments?allowWatchBookmarks=true& fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&watch=true" ,
1017
1017
namespace , deploymentName );
1018
1018
1019
1019
var flinkService = new TestingService (null );
You can’t perform that action at this time.
0 commit comments