Skip to content

Commit 42b7846

Browse files
authored
Add pluggable controllers via ControllerProvider (#18)
* Publish to LinkedInJFrog * Publish hoptimator-cli and hoptimator-operator * Add pluggable controllers via ControllerProvider * Add user-defined properties to operator * Fix logging dependencies * Make Operator properties optional h/t @ehoner * Drop extraneous log4j.properties h/t @ehoner
1 parent b3c1008 commit 42b7846

File tree

19 files changed

+193
-66
lines changed

19 files changed

+193
-66
lines changed

gradle/libs.versions.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ junit = "junit:junit:4.12"
2525
kafkaClients = "org.apache.kafka:kafka-clients:2.7.1"
2626
kubernetesClient = "io.kubernetes:client-java:16.0.2"
2727
kubernetesExtendedClient = "io.kubernetes:client-java-extended:16.0.2"
28-
slf4jLog4j = "org.slf4j:slf4j-log4j12:1.7.36"
28+
slf4jSimple = "org.slf4j:slf4j-simple:1.7.30"
29+
slf4jApi = "org.slf4j:slf4j-api:1.7.30"
2930
sqlline = "sqlline:sqlline:1.12.0"

hoptimator-cli/build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ dependencies {
1818
integration project(':hoptimator-kafka-adapter')
1919
integration project(':hoptimator-mysql-adapter')
2020
integration libs.flinkCsv
21+
integration libs.flinkConnectorKafka
22+
integration libs.slf4jSimple
2123

2224
implementation libs.avro
2325
implementation libs.sqlline
24-
implementation libs.slf4jLog4j
26+
implementation libs.slf4jApi
2527
implementation libs.flinkClients
2628
implementation libs.flinkTableRuntime
2729
implementation libs.flinkTablePlanner
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
plugins {
2+
id 'java-library'
3+
id 'idea'
4+
}
5+
6+
dependencies {
7+
implementation project(':hoptimator-catalog')
8+
implementation project(':hoptimator-models')
9+
implementation project(':hoptimator-operator')
10+
implementation libs.kubernetesClient
11+
implementation libs.kubernetesExtendedClient
12+
13+
testImplementation libs.junit
14+
testImplementation libs.assertj
15+
}
16+
17+
idea {
18+
module {
19+
downloadJavadoc = true
20+
downloadSources = true
21+
}
22+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.linkedin.hoptimator.operator.flink;
2+
3+
import com.linkedin.hoptimator.operator.ControllerProvider;
4+
import com.linkedin.hoptimator.operator.Operator;
5+
import com.linkedin.hoptimator.models.V1alpha1SqlJob;
6+
7+
import io.kubernetes.client.extended.controller.Controller;
8+
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
9+
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
10+
11+
import java.util.Collection;
12+
import java.util.Collections;
13+
14+
/** Provides a Controller plugin for FlinkDeployments. */
15+
public class FlinkControllerProvider implements ControllerProvider {
16+
17+
@Override
18+
public Collection<Controller> controllers(Operator operator) {
19+
operator.registerApi("FlinkDeployment", "flinkdeployment", "flinkdeployments",
20+
"flink.apache.org", "v1beta1");
21+
22+
Reconciler reconciler = new FlinkSqlJobReconciler(operator);
23+
Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory())
24+
.withReconciler(reconciler)
25+
.withName("flink-deployment-controller")
26+
.withWorkerCount(1)
27+
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1SqlJob.class, x).build())
28+
.build();
29+
30+
return Collections.singleton(controller);
31+
}
32+
}
File renamed without changes.
Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
import com.linkedin.hoptimator.operator.RequestEnvironment;
77
import com.linkedin.hoptimator.models.V1alpha1SqlJob;
88

9-
import io.kubernetes.client.extended.controller.Controller;
10-
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
119
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
1210
import io.kubernetes.client.extended.controller.reconciler.Request;
1311
import io.kubernetes.client.extended.controller.reconciler.Result;
@@ -72,20 +70,6 @@ public Result reconcile(Request request) {
7270
return new Result(false);
7371
}
7472

75-
public static Controller controller(Operator operator) {
76-
Reconciler reconciler = new FlinkSqlJobReconciler(operator);
77-
return ControllerBuilder.defaultBuilder(operator.informerFactory())
78-
.withReconciler(reconciler)
79-
.withName("flink-deployment-controller")
80-
.withWorkerCount(1)
81-
//.withReadyFunc(resourceInformer::hasSynced) // optional, only starts controller when the
82-
// cache has synced up
83-
//.withWorkQueue(resourceWorkQueue)
84-
//.watch()
85-
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1SqlJob.class, x).build())
86-
.build();
87-
}
88-
8973
private static <T> List<T> list(List<T> maybeNull) {
9074
if (maybeNull == null) {
9175
return Collections.emptyList();
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.linkedin.hoptimator.operator.flink.FlinkControllerProvider

hoptimator-kafka-adapter/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ plugins {
55

66
dependencies {
77
implementation project(':hoptimator-catalog')
8+
implementation project(':hoptimator-models')
9+
implementation project(':hoptimator-operator')
810
implementation libs.calciteCore
911
implementation libs.kafkaClients
10-
implementation libs.flinkConnectorKafka
12+
implementation libs.kubernetesClient
13+
implementation libs.kubernetesExtendedClient
1114

1215
testImplementation libs.junit
1316
testImplementation libs.assertj
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.linkedin.hoptimator.operator.kafka;
2+
3+
import com.linkedin.hoptimator.operator.ControllerProvider;
4+
import com.linkedin.hoptimator.operator.Operator;
5+
import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
6+
import com.linkedin.hoptimator.models.V1alpha1KafkaTopicList;
7+
8+
import io.kubernetes.client.extended.controller.Controller;
9+
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
10+
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
11+
12+
import java.util.Collection;
13+
import java.util.Collections;
14+
15+
/** Provides a Controller plugin for KafkaTopics. */
16+
public class KafkaControllerProvider implements ControllerProvider {
17+
18+
@Override
19+
public Collection<Controller> controllers(Operator operator) {
20+
operator.registerApi("KafkaTopic", "kafkatopic", "kafkatopics", "hoptimator.linkedin.com",
21+
"v1alpha1", V1alpha1KafkaTopic.class, V1alpha1KafkaTopicList.class);
22+
23+
Reconciler reconciler = new KafkaTopicReconciler(operator);
24+
Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory())
25+
.withReconciler(reconciler)
26+
.withName("kafka-topic-controller")
27+
.withWorkerCount(1)
28+
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaTopic.class, x).build())
29+
.build();
30+
31+
return Collections.singleton(controller);
32+
}
33+
}
Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import com.linkedin.hoptimator.operator.ConfigAssembler;
55
import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
66

7-
import io.kubernetes.client.extended.controller.Controller;
8-
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
97
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
108
import io.kubernetes.client.extended.controller.reconciler.Request;
119
import io.kubernetes.client.extended.controller.reconciler.Result;
@@ -98,20 +96,6 @@ public Result reconcile(Request request) {
9896
return new Result(false);
9997
}
10098

101-
public static Controller controller(Operator operator) {
102-
Reconciler reconciler = new KafkaTopicReconciler(operator);
103-
return ControllerBuilder.defaultBuilder(operator.informerFactory())
104-
.withReconciler(reconciler)
105-
.withName("kafka-topic-controller")
106-
.withWorkerCount(1)
107-
//.withReadyFunc(resourceInformer::hasSynced) // optional, only starts controller when the
108-
// cache has synced up
109-
//.withWorkQueue(resourceWorkQueue)
110-
//.watch()
111-
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaTopic.class, x).build())
112-
.build();
113-
}
114-
11599
private static <T> List<T> list(List<T> maybeNull) {
116100
if (maybeNull == null) {
117101
return Collections.emptyList();

0 commit comments

Comments
 (0)