Skip to content

Commit 29ad343

Browse files
authored
Merge pull request kroxylicious#1970 from SamBarker/filter-reconciler
Stub `KafkaServiceReconciler`
2 parents 74192ad + 70293db commit 29ad343

File tree

10 files changed

+355
-1
lines changed

10 files changed

+355
-1
lines changed

kroxylicious-operator/install/01.ClusterRole.kroxylicious-operator-watched.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ rules:
3232
- "kroxylicious.io"
3333
resources:
3434
- kafkaproxies/status
35+
- kafkaservices/status
3536
- kafkaproxyingresses/status
3637
verbs:
3738
- get

kroxylicious-operator/pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,9 @@
355355
<io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxyingressstatus.Conditions>
356356
io.kroxylicious.kubernetes.api.common.Condition
357357
</io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxyingressstatus.Conditions>
358+
<io.kroxylicious.kubernetes.api.v1alpha1.kafkaservicestatus.Conditions>
359+
io.kroxylicious.kubernetes.api.common.Condition
360+
</io.kroxylicious.kubernetes.api.v1alpha1.kafkaservicestatus.Conditions>
358361
<io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate>
359362
java.lang.Object
360363
</io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate>

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/OperatorMain.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
2828
import io.prometheus.metrics.exporter.httpserver.MetricsHandler;
2929

30+
import io.kroxylicious.kubernetes.operator.kafkaservice.KafkaServiceReconciler;
3031
import io.kroxylicious.kubernetes.operator.management.UnsupportedHttpMethodFilter;
3132
import io.kroxylicious.proxy.service.HostPort;
3233
import io.kroxylicious.proxy.tag.VisibleForTesting;
@@ -81,6 +82,7 @@ void start() {
8182
operator.installShutdownHook(Duration.ofSeconds(10));
8283
operator.register(new KafkaProxyReconciler());
8384
operator.register(new KafkaProxyIngressReconciler(Clock.systemUTC()));
85+
operator.register(new KafkaServiceReconciler(Clock.systemUTC()));
8486
addHttpGetHandler("/", () -> 404);
8587
managementServer.start();
8688
operator.start();
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.kubernetes.operator.kafkaservice;
8+
9+
import java.time.Clock;
10+
import java.util.List;
11+
12+
import io.javaoperatorsdk.operator.api.reconciler.Context;
13+
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
14+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
15+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
16+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
17+
18+
import io.kroxylicious.kubernetes.api.common.Condition;
19+
import io.kroxylicious.kubernetes.api.common.ConditionBuilder;
20+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
21+
22+
public final class KafkaServiceReconciler implements
23+
io.javaoperatorsdk.operator.api.reconciler.Reconciler<KafkaService> {
24+
25+
private final Clock clock;
26+
27+
public KafkaServiceReconciler(Clock clock) {
28+
this.clock = clock;
29+
}
30+
31+
@Override
32+
public UpdateControl<KafkaService> reconcile(KafkaService resource, Context<KafkaService> context) {
33+
final Condition acceptedCondition = new ConditionBuilder()
34+
.withType(Condition.Type.Accepted)
35+
.withLastTransitionTime(clock.instant().atZone(clock.getZone()))
36+
.withObservedGeneration(resource.getMetadata().getGeneration())
37+
.withStatus(Condition.Status.TRUE)
38+
.build();
39+
final KafkaService amended = resource
40+
.edit()
41+
.withNewStatus()
42+
.withObservedGeneration(resource.getMetadata().getGeneration())
43+
.withConditions(acceptedCondition)
44+
.endStatus()
45+
.build();
46+
return UpdateControl.patchStatus(amended);
47+
}
48+
49+
@Override
50+
public List<EventSource<?, KafkaService>> prepareEventSources(EventSourceContext<KafkaService> context) {
51+
return io.javaoperatorsdk.operator.api.reconciler.Reconciler.super.prepareEventSources(context);
52+
}
53+
54+
@Override
55+
public ErrorStatusUpdateControl<KafkaService> updateErrorStatus(KafkaService resource, Context<KafkaService> context, Exception e) {
56+
return io.javaoperatorsdk.operator.api.reconciler.Reconciler.super.updateErrorStatus(resource, context, e);
57+
}
58+
}

kroxylicious-operator/src/main/resources/META-INF/fabric8/kafkaservices.kroxylicious.io-v1.yml

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ spec:
2121
singular: kafkaservice
2222
kind: KafkaService
2323
shortNames:
24-
- kcr
24+
- ks
2525
# list of versions supported by this CustomResourceDefinition
2626
versions:
2727
- name: v1alpha1
@@ -44,3 +44,49 @@ spec:
4444
type: string
4545
minLength: 1
4646
pattern: ^(([^:]+:[0-9]{1,5}),)*([^:]+:[0-9]{1,5})$
47+
status:
48+
type: object
49+
properties:
50+
observedGeneration:
51+
description: |
52+
The metadata.generation that was observed during the last reconciliation by the operator.
53+
type: integer
54+
conditions:
55+
type: array
56+
items:
57+
type: object
58+
properties:
59+
lastTransitionTime:
60+
description: |
61+
lastTransitionTime is the last time the condition transitioned from one status to another.
62+
This should be when the underlying condition changed.
63+
If that is not known, then using the time when the API field changed is acceptable.
64+
type: string
65+
format: date-time
66+
message:
67+
description: |
68+
message is a human readable message indicating details about the transition.
69+
This may be an empty string.
70+
type: string
71+
observedGeneration:
72+
description: |
73+
observedGeneration represents the .metadata.generation that the condition was set based upon.
74+
For instance, if .metadata.generation is currently 12, but the
75+
.status.conditions[x].observedGeneration is 9, the condition is out of date with
76+
respect to the current state of the instance.
77+
type: integer
78+
reason:
79+
description: |
80+
reason contains a programmatic identifier indicating the reason for the condition's last transition.
81+
Producers of specific condition types may define expected values and meanings for this field,
82+
and whether the values are considered a guaranteed API.
83+
The value should be a CamelCase string.
84+
This field may not be empty.
85+
type: string
86+
status:
87+
description: status of the condition, one of True, False, Unknown.
88+
type: string
89+
enum: [ "True", "False", "Unknown" ]
90+
type:
91+
description: type of condition in CamelCase or in foo.example.com/CamelCase.
92+
type: string
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.kubernetes.operator;
8+
9+
import java.time.Clock;
10+
import java.time.Duration;
11+
12+
import org.assertj.core.api.Assertions;
13+
import org.awaitility.core.ConditionFactory;
14+
import org.junit.jupiter.api.AfterEach;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Test;
17+
import org.junit.jupiter.api.condition.EnabledIf;
18+
import org.junit.jupiter.api.extension.RegisterExtension;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
import io.fabric8.kubernetes.client.KubernetesClient;
23+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
24+
25+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
26+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceBuilder;
27+
import io.kroxylicious.kubernetes.operator.kafkaservice.KafkaServiceReconciler;
28+
29+
import static io.kroxylicious.kubernetes.operator.assertj.OperatorAssertions.assertThat;
30+
import static org.awaitility.Awaitility.await;
31+
32+
@EnabledIf(value = "io.kroxylicious.kubernetes.operator.OperatorTestUtils#isKubeClientAvailable", disabledReason = "no viable kube client available")
33+
class KafkaServiceReconcilerIT {
34+
35+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServiceReconcilerIT.class);
36+
private static final String UPDATED_BOOTSTRAP = "bar.bootstrap:9090";
37+
38+
private KubernetesClient client;
39+
private static final ConditionFactory AWAIT = await().timeout(Duration.ofSeconds(60));
40+
41+
@BeforeEach
42+
void beforeEach() {
43+
client = OperatorTestUtils.kubeClient();
44+
}
45+
46+
@SuppressWarnings("JUnitMalformedDeclaration") // The beforeAll and beforeEach have the same effect so we can use it as an instance field.
47+
@RegisterExtension
48+
LocallyRunOperatorExtension extension = LocallyRunOperatorExtension.builder()
49+
.withReconciler(new KafkaServiceReconciler(Clock.systemUTC()))
50+
.withKubernetesClient(client)
51+
.waitForNamespaceDeletion(true)
52+
.withConfigurationService(x -> x.withCloseClientOnStop(false))
53+
.build();
54+
55+
@AfterEach
56+
void stopOperator() {
57+
extension.getOperator().stop();
58+
LOGGER.atInfo().log("Test finished");
59+
}
60+
61+
@Test
62+
void shouldAcceptKafkaService() {
63+
// Given
64+
final KafkaService kafkaService = extension.create(new KafkaServiceBuilder().withNewMetadata().withName("mycoolkafkaservice").endMetadata().editOrNewSpec()
65+
.withBootstrapServers("foo.bootstrap:9090").endSpec().build());
66+
final KafkaService updated = kafkaService.edit().editSpec().withBootstrapServers(UPDATED_BOOTSTRAP).endSpec().build();
67+
68+
// When
69+
extension.replace(updated);
70+
71+
// Then
72+
AWAIT.untilAsserted(() -> {
73+
final KafkaService mycoolkafkaservice = extension.get(KafkaService.class, "mycoolkafkaservice");
74+
Assertions.assertThat(mycoolkafkaservice.getSpec().getBootstrapServers()).isEqualTo(UPDATED_BOOTSTRAP);
75+
assertThat(mycoolkafkaservice.getStatus())
76+
.isNotNull()
77+
.singleCondition()
78+
.hasObservedGenerationInSyncWithMetadataOf(mycoolkafkaservice)
79+
.isAcceptedTrue();
80+
});
81+
}
82+
83+
@Test
84+
void shouldAcceptUpdateToKafkaService() {
85+
// Given
86+
87+
// When
88+
extension.create(new KafkaServiceBuilder().withNewMetadata().withName("mycoolkafkaservice").endMetadata().build());
89+
90+
// Then
91+
AWAIT.untilAsserted(() -> {
92+
final KafkaService mycoolkafkaservice = extension.get(KafkaService.class, "mycoolkafkaservice");
93+
assertThat(mycoolkafkaservice.getStatus())
94+
.isNotNull()
95+
.singleCondition()
96+
.hasObservedGenerationInSyncWithMetadataOf(mycoolkafkaservice)
97+
.isAcceptedTrue();
98+
});
99+
}
100+
}

kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/assertj/KafkaProxyStatusAssert.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66

77
package io.kroxylicious.kubernetes.operator.assertj;
88

9+
import org.assertj.core.api.AbstractLongAssert;
910
import org.assertj.core.api.Assertions;
1011
import org.assertj.core.api.InstanceOfAssertFactories;
1112
import org.assertj.core.api.ListAssert;
1213

14+
import io.kroxylicious.kubernetes.api.common.Condition;
1315
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyStatus;
1416
import io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.Clusters;
1517

@@ -25,6 +27,22 @@ public static KafkaProxyStatusAssert assertThat(KafkaProxyStatus actual) {
2527
return new KafkaProxyStatusAssert(actual);
2628
}
2729

30+
@Override
31+
public AbstractLongAssert<?> observedGeneration() {
32+
return Assertions.assertThat(actual.getObservedGeneration());
33+
}
34+
35+
@Override
36+
public ListAssert<Condition.Status> conditions() {
37+
return Assertions.assertThat(actual.getConditions())
38+
.asInstanceOf(InstanceOfAssertFactories.list(Condition.Status.class));
39+
}
40+
41+
@Override
42+
public ConditionAssert singleCondition() {
43+
return conditions().singleElement(AssertFactory.condition());
44+
}
45+
2846
public ListAssert<Clusters> clusters() {
2947
return Assertions.assertThat(actual.getClusters())
3048
.asInstanceOf(InstanceOfAssertFactories.list(io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.Clusters.class));
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.kubernetes.operator.assertj;
8+
9+
import org.assertj.core.api.AbstractLongAssert;
10+
import org.assertj.core.api.AbstractObjectAssert;
11+
import org.assertj.core.api.Assertions;
12+
import org.assertj.core.api.InstanceOfAssertFactories;
13+
import org.assertj.core.api.ListAssert;
14+
15+
import io.kroxylicious.kubernetes.api.common.Condition;
16+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceStatus;
17+
18+
public class KafkaServiceStatusAssert extends AbstractObjectAssert<KafkaServiceStatusAssert, KafkaServiceStatus> {
19+
protected KafkaServiceStatusAssert(
20+
KafkaServiceStatus o) {
21+
super(o, KafkaServiceStatusAssert.class);
22+
}
23+
24+
public static KafkaServiceStatusAssert assertThat(KafkaServiceStatus actual) {
25+
return new KafkaServiceStatusAssert(actual);
26+
}
27+
28+
public AbstractLongAssert<?> observedGeneration() {
29+
return Assertions.assertThat(actual.getObservedGeneration());
30+
}
31+
32+
public ListAssert<Condition.Status> conditions() {
33+
return Assertions.assertThat(actual.getConditions())
34+
.asInstanceOf(InstanceOfAssertFactories.list(Condition.Status.class));
35+
}
36+
37+
public ConditionAssert singleCondition() {
38+
return conditions().singleElement(AssertFactory.condition());
39+
}
40+
41+
}

kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/assertj/OperatorAssertions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@
88

99
import io.kroxylicious.kubernetes.api.common.Condition;
1010
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyStatus;
11+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceStatus;
1112
import io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.Clusters;
1213

1314
public class OperatorAssertions {
1415
public static KafkaProxyStatusAssert assertThat(KafkaProxyStatus actual) {
1516
return KafkaProxyStatusAssert.assertThat(actual);
1617
}
1718

19+
public static KafkaServiceStatusAssert assertThat(KafkaServiceStatus actual) {
20+
return KafkaServiceStatusAssert.assertThat(actual);
21+
}
22+
1823
public static ClusterAssert assertThat(Clusters actual) {
1924
return ClusterAssert.assertThat(actual);
2025
}

0 commit comments

Comments
 (0)