Skip to content

Commit 86d2e61

Browse files
authored
Add VirtualKafkaClusterReconciler (kroxylicious#1992)
* Add `VirtualKafkaClusterReconciler` * Bump operator memory limit and request to avoid observed OOMKilled pods. Signed-off-by: Tom Bentley <tbentley@redhat.com>
1 parent bc93f1c commit 86d2e61

File tree

11 files changed

+542
-7
lines changed

11 files changed

+542
-7
lines changed

kroxylicious-operator/install/01.ClusterRole.kroxylicious-operator-watched.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ rules:
3131
- "kroxylicious.io"
3232
resources:
3333
- kafkaproxies/status
34+
- virtualkafkaclusters/status
3435
- kafkaservices/status
3536
- kafkaproxyingresses/status
3637
verbs:

kroxylicious-operator/install/03.Deployment.kroxylicious-operator.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ spec:
3232
args: [ ]
3333
resources:
3434
limits:
35-
memory: 300M
35+
memory: 400M
3636
cpu: "1"
3737
requests:
38-
memory: 300M
38+
memory: 400M
3939
cpu: "1"
4040
ports:
4141
- containerPort: 8080

kroxylicious-operator/pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,9 @@
349349
<io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.Conditions>
350350
io.kroxylicious.kubernetes.api.common.Condition
351351
</io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.Conditions>
352+
<io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterstatus.Conditions>
353+
io.kroxylicious.kubernetes.api.common.Condition
354+
</io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterstatus.Conditions>
352355
<io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.clusters.Conditions>
353356
io.kroxylicious.kubernetes.api.common.Condition
354357
</io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.clusters.Conditions>

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/OperatorMain.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public static void main(String[] args) {
8080
void start() {
8181
operator.installShutdownHook(Duration.ofSeconds(10));
8282
operator.register(new KafkaProxyReconciler(SecureConfigInterpolator.DEFAULT_INTERPOLATOR));
83+
operator.register(new VirtualKafkaClusterReconciler(Clock.systemUTC()));
8384
operator.register(new KafkaProxyIngressReconciler(Clock.systemUTC()));
8485
operator.register(new KafkaServiceReconciler(Clock.systemUTC()));
8586
operator.register(new KafkaProtocolFilterReconciler(Clock.systemUTC(), SecureConfigInterpolator.DEFAULT_INTERPOLATOR));

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ResourcesUtil.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
4646
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceStatus;
4747
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
48+
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterStatus;
4849
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
4950
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterStatus;
5051

@@ -301,6 +302,22 @@ private static <R extends CustomResource<?, S>, S> R newStatus(R ingress,
301302
return result;
302303
}
303304

305+
@NonNull
306+
static VirtualKafkaCluster patchWithCondition(VirtualKafkaCluster cluster, Condition condition) {
307+
return newStatus(
308+
cluster,
309+
VirtualKafkaCluster::new,
310+
VirtualKafkaClusterStatus::new,
311+
VirtualKafkaClusterStatus::setConditions,
312+
VirtualKafkaClusterStatus::setObservedGeneration,
313+
maybeAddOrUpdateCondition(
314+
Optional.of(cluster)
315+
.map(VirtualKafkaCluster::getStatus)
316+
.map(VirtualKafkaClusterStatus::getConditions)
317+
.orElse(List.of()),
318+
condition));
319+
}
320+
304321
@NonNull
305322
static KafkaService patchWithCondition(KafkaService service, Condition condition) {
306323
return newStatus(
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.kubernetes.operator;
8+
9+
import java.time.Clock;
10+
import java.util.List;
11+
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
16+
import io.javaoperatorsdk.operator.api.reconciler.Context;
17+
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
18+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
19+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
20+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
21+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
22+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
23+
24+
import io.kroxylicious.kubernetes.api.common.Condition;
25+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
26+
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
27+
28+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name;
29+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace;
30+
31+
public final class VirtualKafkaClusterReconciler implements
32+
Reconciler<VirtualKafkaCluster> {
33+
34+
private static final Logger LOGGER = LoggerFactory.getLogger(VirtualKafkaClusterReconciler.class);
35+
public static final String PROXY_EVENT_SOURCE_NAME = "proxy";
36+
37+
private final Clock clock;
38+
39+
public VirtualKafkaClusterReconciler(Clock clock) {
40+
this.clock = clock;
41+
}
42+
43+
@Override
44+
public UpdateControl<VirtualKafkaCluster> reconcile(VirtualKafkaCluster cluster, Context<VirtualKafkaCluster> context) {
45+
var proxyOpt = context.getSecondaryResource(KafkaProxy.class, PROXY_EVENT_SOURCE_NAME);
46+
LOGGER.debug("spec.proxyRef.name resolves to: {}", proxyOpt);
47+
48+
Condition condition;
49+
if (proxyOpt.isPresent()) {
50+
condition = ResourcesUtil.newResolvedRefsTrue(clock, cluster);
51+
}
52+
else {
53+
condition = ResourcesUtil.newResolvedRefsFalse(clock,
54+
cluster,
55+
"spec.proxyRef.name",
56+
"KafkaProxy not found");
57+
}
58+
59+
UpdateControl<VirtualKafkaCluster> uc = UpdateControl.patchStatus(ResourcesUtil.patchWithCondition(cluster, condition));
60+
if (LOGGER.isInfoEnabled()) {
61+
LOGGER.info("Completed reconciliation of {}/{}", namespace(cluster), name(cluster));
62+
}
63+
return uc;
64+
}
65+
66+
@Override
67+
public List<EventSource<?, VirtualKafkaCluster>> prepareEventSources(EventSourceContext<VirtualKafkaCluster> context) {
68+
InformerEventSourceConfiguration<KafkaProxy> configuration = InformerEventSourceConfiguration.from(
69+
KafkaProxy.class,
70+
VirtualKafkaCluster.class)
71+
.withName(PROXY_EVENT_SOURCE_NAME)
72+
.withPrimaryToSecondaryMapper((VirtualKafkaCluster cluster) -> ResourcesUtil.localRefAsResourceId(cluster, cluster.getSpec().getProxyRef()))
73+
.withSecondaryToPrimaryMapper(proxy -> ResourcesUtil.findReferrers(context,
74+
proxy,
75+
VirtualKafkaCluster.class,
76+
cluster -> cluster.getSpec().getProxyRef()))
77+
.build();
78+
return List.of(new InformerEventSource<>(configuration, context));
79+
}
80+
81+
@Override
82+
public ErrorStatusUpdateControl<VirtualKafkaCluster> updateErrorStatus(VirtualKafkaCluster cluster, Context<VirtualKafkaCluster> context, Exception e) {
83+
// ResolvedRefs to UNKNOWN
84+
Condition condition = ResourcesUtil.newUnknownCondition(clock, cluster, Condition.Type.ResolvedRefs, e);
85+
ErrorStatusUpdateControl<VirtualKafkaCluster> uc = ErrorStatusUpdateControl.patchStatus(ResourcesUtil.patchWithCondition(cluster, condition));
86+
if (LOGGER.isInfoEnabled()) {
87+
LOGGER.info("Completed reconciliation of {}/{} for error {}", namespace(cluster), name(cluster), e.toString());
88+
}
89+
return uc;
90+
}
91+
}

kroxylicious-operator/src/main/resources/META-INF/fabric8/virtualkafkaclusters.kroxylicious.io-v1.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,51 @@ spec:
8686
name:
8787
maxLength: 253
8888
minLength: 1
89+
type: string
90+
status:
91+
type: object
92+
properties:
93+
observedGeneration:
94+
description: |
95+
The metadata.generation that was observed during the last reconciliation by the operator.
96+
type: integer
97+
conditions:
98+
# Mapped to Java type io.kroxylicious.kubernetes.api.common.Condition
99+
type: array
100+
items:
101+
type: object
102+
properties:
103+
lastTransitionTime:
104+
description: |
105+
lastTransitionTime is the last time the condition transitioned from one status to another.
106+
This should be when the underlying condition changed.
107+
If that is not known, then using the time when the API field changed is acceptable.
108+
type: string
109+
format: date-time
110+
message:
111+
description: |
112+
message is a human readable message indicating details about the transition.
113+
This may be an empty string.
114+
type: string
115+
observedGeneration:
116+
description: |
117+
observedGeneration represents the .metadata.generation that the condition was set based upon.
118+
For instance, if .metadata.generation is currently 12, but the
119+
.status.conditions[x].observedGeneration is 9, the condition is out of date with
120+
respect to the current state of the instance.
121+
type: integer
122+
reason:
123+
description: |
124+
reason contains a programmatic identifier indicating the reason for the condition's last transition.
125+
Producers of specific condition types may define expected values and meanings for this field,
126+
and whether the values are considered a guaranteed API.
127+
The value should be a CamelCase string.
128+
This field may not be empty.
129+
type: string
130+
status:
131+
description: status of the condition, one of True, False, Unknown.
132+
type: string
133+
enum: [ "True", "False", "Unknown" ]
134+
type:
135+
description: type of condition in CamelCase or in foo.example.com/CamelCase.
89136
type: string

kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/KafkaServiceReconcilerIT.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class KafkaServiceReconcilerIT {
3333

3434
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServiceReconcilerIT.class);
3535
private static final String UPDATED_BOOTSTRAP = "bar.bootstrap:9090";
36+
public static final String SERVICE_A = "service-a";
3637

3738
private KubernetesClient client;
3839
private static final ConditionFactory AWAIT = await().timeout(Duration.ofSeconds(60));
@@ -60,16 +61,15 @@ void stopOperator() {
6061
@Test
6162
void shouldAcceptKafkaService() {
6263
// Given
63-
final KafkaService kafkaService = extension.create(new KafkaServiceBuilder().withNewMetadata().withName("mycoolkafkaservice").endMetadata().editOrNewSpec()
64-
.withBootstrapServers("foo.bootstrap:9090").endSpec().build());
64+
final KafkaService kafkaService = extension.create(kafkaService(SERVICE_A));
6565
final KafkaService updated = kafkaService.edit().editSpec().withBootstrapServers(UPDATED_BOOTSTRAP).endSpec().build();
6666

6767
// When
6868
extension.replace(updated);
6969

7070
// Then
7171
AWAIT.untilAsserted(() -> {
72-
final KafkaService mycoolkafkaservice = extension.get(KafkaService.class, "mycoolkafkaservice");
72+
final KafkaService mycoolkafkaservice = extension.get(KafkaService.class, SERVICE_A);
7373
Assertions.assertThat(mycoolkafkaservice.getSpec().getBootstrapServers()).isEqualTo(UPDATED_BOOTSTRAP);
7474
assertThat(mycoolkafkaservice.getStatus())
7575
.isNotNull()
@@ -84,16 +84,29 @@ void shouldAcceptUpdateToKafkaService() {
8484
// Given
8585

8686
// When
87-
extension.create(new KafkaServiceBuilder().withNewMetadata().withName("mycoolkafkaservice").endMetadata().build());
87+
extension.create(new KafkaServiceBuilder().withNewMetadata().withName(SERVICE_A).endMetadata().build());
8888

8989
// Then
9090
AWAIT.untilAsserted(() -> {
91-
final KafkaService mycoolkafkaservice = extension.get(KafkaService.class, "mycoolkafkaservice");
91+
final KafkaService mycoolkafkaservice = extension.get(KafkaService.class, SERVICE_A);
9292
assertThat(mycoolkafkaservice.getStatus())
9393
.isNotNull()
9494
.singleCondition()
9595
.hasObservedGenerationInSyncWithMetadataOf(mycoolkafkaservice)
9696
.isAcceptedTrue();
9797
});
9898
}
99+
100+
private static KafkaService kafkaService(String name) {
101+
// @formatter:off
102+
return new KafkaServiceBuilder()
103+
.withNewMetadata()
104+
.withName(name)
105+
.endMetadata()
106+
.editOrNewSpec()
107+
.withBootstrapServers("foo.bootstrap:9090")
108+
.endSpec()
109+
.build();
110+
// @formatter:on
111+
}
99112
}

0 commit comments

Comments
 (0)