Skip to content

Commit 814c3f5

Browse files
authored
Clean up several subscription artifacts (#150)
* Fix AvroConverter to match flink key name as now supplied by ConnectionService * Remove duplicate Subscription generated files - normalize to k8s models * Remove HoptimatorOperatorApp and normalize under PipelineOperatorApp w/ new ControllerProvider using K8sContext
1 parent 717d794 commit 814c3f5

File tree

24 files changed

+171
-1270
lines changed

24 files changed

+171
-1270
lines changed

deploy/subscriptions.crd.yaml

Lines changed: 0 additions & 106 deletions
This file was deleted.

hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
/** Converts between Avro and Calcite's RelDataType */
2323
public final class AvroConverter {
2424

25-
private static final String KEY_OPTION = "keys";
26-
private static final String KEY_PREFIX_OPTION = "keyPrefix";
25+
private static final String KEY_OPTION = "key.fields";
26+
private static final String KEY_PREFIX_OPTION = "key.fields-prefix";
2727
private static final String PRIMITIVE_KEY = "KEY";
2828

2929
private AvroConverter() {

hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroConverterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ public void testAvroKeyPayloadSchemaValidKeyOptions() {
131131
List.of("KEY_field1", "field2"));
132132

133133
Map<String, String> keyOptions = Map.of(
134-
"keys", "KEY_field1",
135-
"keyPrefix", "KEY_"
134+
"key.fields", "KEY_field1",
135+
"key.fields-prefix", "KEY_"
136136
);
137137
Pair<Schema, Schema> result = AvroConverter.avroKeyPayloadSchema("namespace", "keySchema", "payloadSchema", dataType, keyOptions);
138138

@@ -160,7 +160,7 @@ public void testAvroKeyPayloadSchemaPrimitiveKey() {
160160
List.of("field1", "KEY"));
161161

162162
Map<String, String> keyOptions = Map.of(
163-
"keys", "KEY"
163+
"key.fields", "KEY"
164164
);
165165
Pair<Schema, Schema> result = AvroConverter.avroKeyPayloadSchema("namespace", "keySchema", "payloadSchema", dataType, keyOptions);
166166

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
public final class K8sContext {
3434
public static final String DEFAULT_NAMESPACE = "default";
3535
public static final String NAMESPACE_KEY = "k8s.namespace";
36+
public static final String WATCH_NAMESPACE_KEY = "k8s.watch.namespace";
3637
public static final String SERVER_KEY = "k8s.server";
3738
public static final String USER_KEY = "k8s.user";
3839
public static final String KUBECONFIG_KEY = "k8s.kubeconfig";
@@ -44,17 +45,19 @@ public final class K8sContext {
4445
public static final String SSL_TRUSTSTORE_LOCATION_KEY = "k8s.ssl.truststore.location";
4546

4647
private final String namespace;
48+
private final String watchNamespace;
4749
private final String clientInfo;
4850
private final ApiClient apiClient;
4951
private final SharedInformerFactory informerFactory;
5052
private final V1OwnerReference ownerReference;
5153
private final Map<String, String> labels;
5254
private final HoptimatorConnection connection;
5355

54-
private K8sContext(String namespace, String clientInfo, ApiClient apiClient,
56+
private K8sContext(String namespace, String watchNamespace, String clientInfo, ApiClient apiClient,
5557
SharedInformerFactory informerFactory, V1OwnerReference ownerReference, Map<String, String> labels,
5658
HoptimatorConnection connection) {
5759
this.namespace = namespace;
60+
this.watchNamespace = watchNamespace;
5861
this.clientInfo = clientInfo;
5962
this.apiClient = apiClient;
6063
this.informerFactory = informerFactory;
@@ -75,6 +78,7 @@ public static K8sContext create(Connection connection) {
7578
} else {
7679
namespace = getPodNamespace();
7780
}
81+
String watchNamespace = connectionProperties.getProperty(WATCH_NAMESPACE_KEY);
7882
String kubeconfig = connectionProperties.getProperty(KUBECONFIG_KEY);
7983
String server = connectionProperties.getProperty(SERVER_KEY);
8084
String user = connectionProperties.getProperty(USER_KEY);
@@ -142,19 +146,23 @@ public static K8sContext create(Connection connection) {
142146
}
143147
}
144148

145-
return new K8sContext(namespace, info, apiClient, new SharedInformerFactory(apiClient),
149+
if (watchNamespace == null) {
150+
watchNamespace = "";
151+
}
152+
153+
return new K8sContext(namespace, watchNamespace, info, apiClient, new SharedInformerFactory(apiClient),
146154
null, Collections.emptyMap(), hoptimatorConnection);
147155
}
148156

149157
public K8sContext withOwner(V1OwnerReference owner) {
150-
return new K8sContext(namespace, clientInfo + " Owner is " + owner.getName() + ".", apiClient,
158+
return new K8sContext(namespace, watchNamespace, clientInfo + " Owner is " + owner.getName() + ".", apiClient,
151159
informerFactory, owner, labels, connection);
152160
}
153161

154162
public K8sContext withLabel(String key, String value) {
155163
Map<String, String> newLabels = new HashMap<>(labels);
156164
newLabels.put(key, value);
157-
return new K8sContext(namespace, clientInfo + " Label " + key + "=" + value + ".", apiClient,
165+
return new K8sContext(namespace, watchNamespace, clientInfo + " Label " + key + "=" + value + ".", apiClient,
158166
informerFactory, ownerReference, newLabels, connection);
159167
}
160168

@@ -166,10 +174,19 @@ public String namespace() {
166174
return namespace;
167175
}
168176

177+
public String watchNamespace() {
178+
return watchNamespace;
179+
}
180+
169181
public SharedInformerFactory informerFactory() {
170182
return informerFactory;
171183
}
172184

185+
public <T extends KubernetesObject, U extends KubernetesListObject> void registerInformer(
186+
K8sApiEndpoint<T, U> endpoint, Duration resyncPeriod) {
187+
registerInformer(endpoint, resyncPeriod, watchNamespace);
188+
}
189+
173190
public <T extends KubernetesObject, U extends KubernetesListObject> void registerInformer(
174191
K8sApiEndpoint<T, U> endpoint, Duration resyncPeriod, String watchNamespace) {
175192
informerFactory.sharedIndexInformerFor(generic(endpoint), endpoint.elementType(), resyncPeriod.toMillis(), watchNamespace);

hoptimator-kafka-controller/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ plugins {
66

77
dependencies {
88
implementation project(':hoptimator-k8s')
9+
implementation project(':hoptimator-util')
910
implementation project(':hoptimator-operator')
1011
implementation project(':hoptimator-models') // <-- marked for deletion
1112
implementation libs.calcite.core

hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaControllerProvider.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.linkedin.hoptimator.operator.kafka;
22

3+
import com.linkedin.hoptimator.k8s.K8sApiEndpoint;
4+
import com.linkedin.hoptimator.k8s.K8sContext;
5+
import java.time.Duration;
36
import java.util.Arrays;
47
import java.util.Collection;
58

@@ -12,31 +15,34 @@
1215
import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
1316
import com.linkedin.hoptimator.models.V1alpha1KafkaTopicList;
1417
import com.linkedin.hoptimator.operator.ControllerProvider;
15-
import com.linkedin.hoptimator.operator.Operator;
1618

1719

1820
/** Provides a Controller plugin for KafkaTopics. */
1921
public class KafkaControllerProvider implements ControllerProvider {
2022

21-
@Override
22-
public Collection<Controller> controllers(Operator operator) {
23-
operator.registerApi("KafkaTopic", "kafkatopic", "kafkatopics", "hoptimator.linkedin.com", "v1alpha1",
24-
V1alpha1KafkaTopic.class, V1alpha1KafkaTopicList.class);
23+
public static final K8sApiEndpoint<V1alpha1KafkaTopic, V1alpha1KafkaTopicList> KAFKA_TOPICS =
24+
new K8sApiEndpoint<>("KafkaTopic", "hoptimator.linkedin.com", "v1alpha1", "kafkatopics", false,
25+
V1alpha1KafkaTopic.class, V1alpha1KafkaTopicList.class);
26+
public static final K8sApiEndpoint<V1alpha1Acl, V1alpha1AclList> ACLS =
27+
new K8sApiEndpoint<>("Acl", "hoptimator.linkedin.com", "v1alpha1", "acls", false,
28+
V1alpha1Acl.class, V1alpha1AclList.class);
2529

30+
@Override
31+
public Collection<Controller> controllers(K8sContext context) {
32+
context.registerInformer(KAFKA_TOPICS, Duration.ofMinutes(5));
2633
// N.B. this shared CRD may be re-registered by other ControllerProviders
27-
operator.registerApi("Acl", "acl", "acls", "hoptimator.linkedin.com", "v1alpha1", V1alpha1Acl.class,
28-
V1alpha1AclList.class);
34+
context.registerInformer(ACLS, Duration.ofMinutes(5));
2935

30-
Reconciler topicReconciler = new KafkaTopicReconciler(operator);
31-
Controller topicController = ControllerBuilder.defaultBuilder(operator.informerFactory())
36+
Reconciler topicReconciler = new KafkaTopicReconciler(context);
37+
Controller topicController = ControllerBuilder.defaultBuilder(context.informerFactory())
3238
.withReconciler(topicReconciler)
3339
.withName("kafka-topic-controller")
3440
.withWorkerCount(1)
3541
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaTopic.class, x).build())
3642
.build();
3743

38-
Reconciler topicAclReconciler = new KafkaTopicAclReconciler(operator);
39-
Controller topicAclController = ControllerBuilder.defaultBuilder(operator.informerFactory())
44+
Reconciler topicAclReconciler = new KafkaTopicAclReconciler(context);
45+
Controller topicAclController = ControllerBuilder.defaultBuilder(context.informerFactory())
4046
.withReconciler(topicAclReconciler)
4147
.withName("kafka-topic-acl-controller")
4248
.withWorkerCount(1)

hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicAclReconciler.java

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package com.linkedin.hoptimator.operator.kafka;
22

3+
import com.linkedin.hoptimator.k8s.K8sApi;
4+
import com.linkedin.hoptimator.k8s.K8sContext;
5+
import com.linkedin.hoptimator.models.V1alpha1AclList;
6+
import com.linkedin.hoptimator.models.V1alpha1KafkaTopicList;
7+
import java.sql.SQLException;
8+
import java.time.Duration;
39
import java.util.Collections;
410
import java.util.List;
511
import java.util.Map;
@@ -25,18 +31,25 @@
2531
import com.linkedin.hoptimator.models.V1alpha1AclSpec;
2632
import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
2733
import com.linkedin.hoptimator.operator.ConfigAssembler;
28-
import com.linkedin.hoptimator.operator.Operator;
2934

3035

3136
public class KafkaTopicAclReconciler implements Reconciler {
3237
private static final Logger log = LoggerFactory.getLogger(KafkaTopicAclReconciler.class);
33-
private static final String ACL = "hoptimator.linkedin.com/v1alpha1/Acl";
34-
private static final String KAFKATOPIC = "hoptimator.linkedin.com/v1alpha1/KafkaTopic";
3538

36-
private final Operator operator;
39+
private final K8sContext context;
40+
private final K8sApi<V1alpha1KafkaTopic, V1alpha1KafkaTopicList> kafkaTopicApi;
41+
private final K8sApi<V1alpha1Acl, V1alpha1AclList> aclApi;
3742

38-
public KafkaTopicAclReconciler(Operator operator) {
39-
this.operator = operator;
43+
public KafkaTopicAclReconciler(K8sContext context) {
44+
this(context, new K8sApi<>(context, KafkaControllerProvider.KAFKA_TOPICS),
45+
new K8sApi<>(context, KafkaControllerProvider.ACLS));
46+
}
47+
48+
KafkaTopicAclReconciler(K8sContext context, K8sApi<V1alpha1KafkaTopic, V1alpha1KafkaTopicList> kafkaTopicApi,
49+
K8sApi<V1alpha1Acl, V1alpha1AclList> aclApi) {
50+
this.context = context;
51+
this.kafkaTopicApi = kafkaTopicApi;
52+
this.aclApi = aclApi;
4053
}
4154

4255
@Override
@@ -46,11 +59,15 @@ public Result reconcile(Request request) {
4659
String namespace = request.getNamespace();
4760

4861
try {
49-
V1alpha1Acl object = operator.fetch(ACL, namespace, name);
50-
51-
if (object == null) {
52-
log.info("Object {}/{} deleted. Skipping.", namespace, name);
53-
return new Result(false);
62+
V1alpha1Acl object;
63+
try {
64+
object = aclApi.get(namespace, name);
65+
} catch (SQLException e) {
66+
if (e.getErrorCode() == 404) {
67+
log.info("Object {} deleted. Skipping.", name);
68+
return new Result(false);
69+
}
70+
throw e;
5471
}
5572

5673
String targetKind = Objects.requireNonNull(object.getSpec()).getResource().getKind();
@@ -77,15 +94,15 @@ public Result reconcile(Request request) {
7794
String targetName = object.getSpec().getResource().getName();
7895
String principal = object.getSpec().getPrincipal();
7996

80-
V1alpha1KafkaTopic target = operator.fetch(KAFKATOPIC, namespace, targetName);
97+
V1alpha1KafkaTopic target = kafkaTopicApi.get(namespace, targetName);
8198

8299
if (target == null) {
83100
log.info("Target KafkaTopic {}/{} not found. Retrying.", namespace, targetName);
84-
return new Result(true, operator.failureRetryDuration());
101+
return new Result(true, failureRetryDuration());
85102
}
86103

87104
// assemble AdminClient config
88-
ConfigAssembler assembler = new ConfigAssembler(operator);
105+
ConfigAssembler assembler = new ConfigAssembler(context);
89106
list(Objects.requireNonNull(target.getSpec()).getClientConfigs()).forEach(
90107
x -> assembler.addRef(namespace, Objects.requireNonNull(x.getConfigMapRef()).getName()));
91108
map(target.getSpec().getClientOverrides()).forEach(assembler::addOverride);
@@ -102,7 +119,7 @@ public Result reconcile(Request request) {
102119
}
103120
} catch (Exception e) {
104121
log.error("Encountered exception while reconciling KafkaTopic Acl {}/{}", namespace, name, e);
105-
return new Result(true, operator.failureRetryDuration());
122+
return new Result(true, failureRetryDuration());
106123
}
107124
log.info("Done reconciling {}/{}", namespace, name);
108125
return new Result(false);
@@ -115,5 +132,15 @@ private static <T> List<T> list(List<T> maybeNull) {
115132
private static <K, V> Map<K, V> map(Map<K, V> maybeNull) {
116133
return Objects.requireNonNullElse(maybeNull, Collections.emptyMap());
117134
}
135+
136+
// TODO load from configuration
137+
protected Duration failureRetryDuration() {
138+
return Duration.ofMinutes(5);
139+
}
140+
141+
// TODO load from configuration
142+
protected Duration pendingRetryDuration() {
143+
return Duration.ofMinutes(1);
144+
}
118145
}
119146

0 commit comments

Comments
 (0)